Add async retry logic to GRPCWriter for OTel Collector outages#722
Conversation
When the OTel Collector is temporarily unavailable, GRPCWriter previously dropped export batches silently. This adds a background retry mechanism that queues failed batches and retries them with exponential backoff, preventing data loss during collector restarts of up to ~91 seconds. Design: - Retry is asynchronous: withRetry enqueues a failed batch to a per-signal buffered channel and returns immediately, releasing the SignalBatcher mutex so the DiodeWriter ring buffer continues to drain without stalling. - Three independent retry workers (metrics, logs, traces) each maintain a pool of pending batches and replay the entire pool on each backoff tick, flushing the full backlog in one sweep once the collector recovers. - Backoff sequence: 1s,2s,4s,8s,16s,30s,30s (7 attempts, ~91s total). - Per-signal channel capacity 1024, sized to absorb the full retry window (91s*10 batches/s = 910 batches) without overflow.
There was a problem hiding this comment.
Hi @weili-broadcom, this is a great catch and great improvement towords more robust otel telemetry transmission.
I wonder one thing: Why did you use some custom logic to implement exponential backoff when gRPC has built in retry mechanism? Here is an example from the grpc-go repo.
Are there any benefits of having a custom logic?
jorbaum
left a comment
There was a problem hiding this comment.
LGTM in general. I like it. I just have some smaller remarks.
withRetry enqueues a failed batch to a per-signal buffered channel and returns immediately, releasing the SignalBatcher mutex so the DiodeWriter ring buffer continues to drain without stalling.
I think it makes sense to add this as a comment in the code as future coders will likely wonder why this retryer differs from others already present in the code base.
Besides this I have a proposal on how to improve logging. It would be great to also introduce metrics for retrying as well, but I would consider this optional as other retryers also do not expose metrics right now.
| pool = drainRetryQueue(pool, queue) | ||
| if prevLen == 0 && len(pool) > 0 { | ||
| delay = w.initialRetryDelay | ||
| w.l.Println("New item added to empty pool, delay set to", delay) |
There was a problem hiding this comment.
I think it would be nice to align this more with how syslog's https_batch.go is doing this and make it a little less chatty.
Some rough code idea:
func (w *GRPCWriter) runRetryWorker(name string, queue <-chan retryItem) {
var pool []retryItem
delay := w.initialRetryDelay
for {
// ... same drain/wait logic, no logging here ...
var remaining []retryItem
var failures int
var lastErr error
for _, item := range pool {
if isContextError(w.ctx.Err()) {
return
}
err := item.exportFn()
if err == nil {
continue
}
if isContextError(err) {
return
}
item.attempts++
if !isRetryable(err) || item.attempts >= w.maxRetries {
w.l.Printf("dropping %s batch after %d attempts: %v", name, item.attempts, err)
continue
}
failures++
lastErr = err
remaining = append(remaining, item)
}
if failures > 0 {
w.l.Printf("retrying %d %s batches in %s, last err: %v", failures, name, delay, lastErr)
}
if len(pool) > 0 && len(remaining) == 0 {
w.l.Printf("%s retry pool drained after recovery", name)
}
pool = remaining
if len(pool) == 0 {
delay = w.initialRetryDelay
} else {
delay = min(delay*2, w.maxRetryDelay)
}
}
}The ideas are:
- name parameter ("metrics"/"logs"/"traces") so log lines are grepable per signal type. The PR currently can't tell you which worker is struggling.
- One aggregated log line per cycle (retrying N batches in 8s, last err: ...) instead of one per item. During a 91s outage you get ~7 lines per signal, not hundreds.
- One line on full recovery (helps operators correlate "collector came back at HH:MM:SS").
- One line per dropped batch on exhaustion, including attempt count.
- Removed the noisy "delay set to" / "pool is empty" status lines entirely - they describe internal state, not anything an operator can act on.
| delay = w.initialRetryDelay | ||
| w.l.Println("Pool is empty, delay set to", delay) | ||
| } else { | ||
| delay = min(delay*2, w.maxRetryDelay) |
There was a problem hiding this comment.
OPTIONAL: Think about aligning this delay with ExponentialDuration (retry_writer.go:72).
I am not sure if it makes sense though as both retryers have a distinct logic and your's is async while the other is synchronous.
There was a problem hiding this comment.
ldelay = min(delay*2, w.maxRetryDelay) and ExponentialDuration are similar; they have the same conceptual pattern: exponential doubling with a cap.
|
@jorbaum thanks for you review. Chattiness might become a problem if the Otel collector doesn't start. The code shouldn't be that chatty as we already have a metric for the failed write requests egress_expired_total I agree that it would be nice if we have unified or at least similar implementation of retries. In this case, you are comparing the "externaly facing" writers with internal things and HTTP with gRPC. We could do our best to keep things similar, but it's not a must. @weili-broadcom The Otel Collector's restart time depends hardly on the configured pipelines. If someone configures an exporter with Persistent Queue for reliability the restart time is even longer (we don't have the File Storage Extension at the moment, but we might need to to this to improve the reliability of the exporters). I assume that you've measured the restart time of the Otel Collector for your current pipeline configuration and set reasonable defaults for the retry parameters for your case. I would suggest that we move the configuration parameters from constants to the configuration as envvars and set the current values as defaults. New config parameters in the bosh job spec should be added as well. Feel free to check #708 PR as an example ;) |
Introduce OTEL_RETRY_MAX_RETRIES and OTEL_RETRY_QUEUE_SIZE env vars so operators can tune the async retry behaviour without recompiling. The previous hard-coded values (7 retries, queue size 1024) are kept as defaults. Backoff delays (1 s initial, 30 s max) remain fixed constants as they do not need to be operator-facing. Updated retry log output to be less chatty.
I thought I replied to this comment but couldn't find my response anywhere. Try it again: gRPC retry is synchronous from the caller's perspective. The Export() call blocks until all retry attempts are exhausted or one succeeds. While Export() is blocking:
So we choose to handle retries asynchronously. |
chombium
left a comment
There was a problem hiding this comment.
Hi @weili-broadcom,
thanks for the quick changes.
LGTM apart from the go.sum merge conflicts
go mod tidy and go mod vendor and we can merge this.
Introduce OTEL_RETRY_MAX_RETRIES and OTEL_RETRY_QUEUE_SIZE env vars so operators can tune the async retry behaviour without recompiling. The previous hard-coded values (7 retries, queue size 1024) are kept as defaults. Backoff delays (1 s initial, 30 s max) remain fixed constants as they do not need to be operator-facing. Updated retry log output to be less chatty.
|
@chombium Thanks for reviewing! I've updated go.sum. |
There was a problem hiding this comment.
LGTM! Thanks for the quick changes @weili-broadcom
but it seems there are still merge conflicts :-/
|
Hi @chombium hopefully the conflict has been resolved. |
|
Thanks for the fix @weili-broadcom |
When the OTel Collector is temporarily unavailable, GRPCWriter previously dropped export batches silently. This adds a background retry mechanism that queues failed batches and retries them with exponential backoff, preventing data loss during collector restarts of up to ~91 seconds.
Design:
Description
Please include a summary of the change.
Type of change
Testing performed?
Checklist:
mainbranch, or relevant version branchIf you have any questions, or want to get attention for a PR or issue please reach out on the #logging-and-metrics channel in the cloudfoundry slack