From 75965cc8b3014d1edd528d7198abc5a540106a4b Mon Sep 17 00:00:00 2001 From: Thanatat Tamtan Date: Mon, 25 May 2026 11:14:17 +0700 Subject: [PATCH] fix: call OnDiscard exactly once per item on encode failure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously flush() called invokeOnDiscard immediately when json.Encode failed, but left the item in the buffer. On every HTTP retry the same item was re-encoded (failing again) and the callback fired a second (third, …) time. Two related issues fixed together: - Collect encode failures in a local slice; call invokeOnDiscard only after a successful HTTP response so retries cannot double-fire it. - If every item in a batch is unencodable the encoded buffer is empty. Guard against sending a zero-byte body (which the server may reject, causing an infinite retry loop): return true immediately and discard all failures once, without making an HTTP request. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- quickwit.go | 17 ++++++++++- quickwit_test.go | 77 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 1 deletion(-) diff --git a/quickwit.go b/quickwit.go index c86ffb9..7e95dc7 100644 --- a/quickwit.go +++ b/quickwit.go @@ -202,14 +202,24 @@ func (c *Client) loop() { buf.Reset() + var encodeFailures []any for _, x := range buffer { if err := jsonEnc.Encode(x); err != nil { slog.Error("quickwit: failed to encode record, discarding", "error", err) - c.invokeOnDiscard(x) + encodeFailures = append(encodeFailures, x) continue } } + // All items were unencodable — discard them and report success so the + // caller clears the buffer and does not retry with the same items. + if buf.Len() == 0 { + for _, x := range encodeFailures { + c.invokeOnDiscard(x) + } + return true + } + ctx := context.Background() if t := c.getIngestTimeout(); t != 0 { var cancel context.CancelFunc @@ -255,6 +265,11 @@ func (c *Client) loop() { return false } + // HTTP succeeded — now safe to discard items that failed encoding. + for _, x := range encodeFailures { + c.invokeOnDiscard(x) + } + if !resetBatchSizeAfter.IsZero() && time.Now().After(resetBatchSizeAfter) { beforeSize := batchSize batchSize = c.getBatchSize() diff --git a/quickwit_test.go b/quickwit_test.go index 99428ff..ec1e885 100644 --- a/quickwit_test.go +++ b/quickwit_test.go @@ -124,6 +124,83 @@ func TestIngest_JSONEncodeError_CallsOnDiscard(t *testing.T) { } } +// Regression #11: unencodable items caused OnDiscard to fire on every retry when the HTTP +// request also failed — the callback must fire exactly once per item. +func TestIngest_EncodeError_DiscardedExactlyOnce_OnHTTPFailure(t *testing.T) { + attempts := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attempts++ + if attempts < 3 { + w.WriteHeader(http.StatusInternalServerError) + return + } + io.Copy(io.Discard, r.Body) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + var mu sync.Mutex + var discarded []any + + c := quickwit.NewClient(server.URL + "/api/v1/test") + c.SetConcurrent(1) + c.OnDiscard(func(data any) { + mu.Lock() + discarded = append(discarded, data) + mu.Unlock() + }) + + ch := make(chan int) // not JSON-encodable + c.Ingest(map[string]any{"ok": true}, ch) + c.Close() + + mu.Lock() + n := len(discarded) + mu.Unlock() + + if n != 1 { + t.Errorf("OnDiscard called %d times, want exactly 1 (got %d HTTP attempts)", n, attempts) + } +} + +// Regression #12: when all items in a batch fail to encode, flush sent an empty HTTP body. +// If the server rejects an empty body, retryFlush looped indefinitely re-discarding items. +func TestIngest_AllEncodeErrors_NoHTTPRequest(t *testing.T) { + requestCount := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestCount++ + w.WriteHeader(http.StatusBadRequest) // server rejects empty body + })) + defer server.Close() + + var mu sync.Mutex + var discarded []any + + c := quickwit.NewClient(server.URL + "/api/v1/test") + c.SetConcurrent(1) + c.OnDiscard(func(data any) { + mu.Lock() + discarded = append(discarded, data) + mu.Unlock() + }) + + ch1 := make(chan int) + ch2 := make(chan int) + c.Ingest(ch1, ch2) + c.Close() + + mu.Lock() + n := len(discarded) + mu.Unlock() + + if requestCount != 0 { + t.Errorf("expected no HTTP requests when all items are unencodable, got %d", requestCount) + } + if n != 2 { + t.Errorf("OnDiscard called %d times, want 2", n) + } +} + // Regression #10: when a 413 triggered batch-size reduction, re-flushing the oversized // buffer in smaller chunks silently reversed or lost the tail of the record sequence. func TestIngest_OversizeBatchPreservesOrder(t *testing.T) {