Skip to content

Commit 46cc3e2

Browse files
authored
fix: Lazily instantiate the KV-IR writer when using disk buffering to prevent uploading files that only contain IR preamble. (#18)
1 parent daa76f9 commit 46cc3e2

File tree

2 files changed

+19
-44
lines changed

2 files changed

+19
-44
lines changed

internal/irzstd/disk.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,24 +52,23 @@ type diskWriter struct {
5252
//
5353
// Returns:
5454
// - diskWriter: Disk writer for Zstd compressed IR
55-
// - err: Error creating new buffers, error opening Zstd/IR writers
55+
// - err: Error creating new buffers, error opening Zstd writer
5656
func NewDiskWriter(irPath string, zstdPath string) (*diskWriter, error) {
5757
irFile, zstdFile, err := newFileBuffers(irPath, zstdPath)
5858
if err != nil {
5959
return nil, err
6060
}
6161

62-
irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, irFile)
62+
zstdWriter, err := zstd.NewWriter(zstdFile)
6363
if err != nil {
64-
return nil, err
64+
return nil, fmt.Errorf("error opening Zstd writer: %w", err)
6565
}
6666

6767
diskWriter := diskWriter{
6868
irPath: irPath,
6969
irFile: irFile,
7070
zstdPath: zstdPath,
7171
zstdFile: zstdFile,
72-
irWriter: irWriter,
7372
zstdWriter: zstdWriter,
7473
}
7574

@@ -106,7 +105,6 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) {
106105
irFile: irFile,
107106
zstdPath: zstdPath,
108107
zstdFile: zstdFile,
109-
irWriter: nil,
110108
zstdWriter: zstdWriter,
111109
}
112110

@@ -131,7 +129,10 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) {
131129

132130
// Converts log events to Zstd compressed IR and outputs to the Zstd file. IR is temporarily
133131
// stored in the IR file until it surpasses [irSizeThreshold] with compression to Zstd pushed out
134-
// to a later call. See [diskWriter] for more specific details on behaviour.
132+
// to a later call. See [diskWriter] for more specific details on behaviour. The IR writer is lazily
133+
// initialized on the first write. If initialized in [Reset], the preamble would make the IR file
134+
// non-empty even though there are no logs. Non-empty IR files persist across recovery and could
135+
// lead to empty files being uploaded to S3.
135136
//
136137
// Parameters:
137138
// - logEvents: A slice of log events to be encoded
@@ -140,6 +141,14 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) {
140141
// - numEvents: Number of log events successfully written to IR writer buffer
141142
// - err: Error writing IR/Zstd, error flushing buffers
142143
func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) {
144+
if w.irWriter == nil {
145+
var err error
146+
w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.irFile)
147+
if err != nil {
148+
return 0, fmt.Errorf("error creating IR writer: %w", err)
149+
}
150+
}
151+
143152
numBytes, numEvents, err := writeIr(w.irWriter, logEvents)
144153
if err != nil {
145154
return numEvents, err
@@ -197,26 +206,20 @@ func (w *diskWriter) CloseStreams() error {
197206
return nil
198207
}
199208

200-
// Reinitialize [diskWriter] after calling CloseStreams(). Resets individual IR and Zstd writers and
201-
// associated buffers.
209+
// Reinitialize [diskWriter] after calling CloseStreams(). Resets Zstd writer and associated
210+
// buffer.
202211
//
203212
// Returns:
204-
// - err: Error opening IR writer, error IR buffer not empty
213+
// - err: Error IR buffer not empty
205214
func (w *diskWriter) Reset() error {
206-
var err error
207-
w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.irFile)
208-
if err != nil {
209-
return err
210-
}
211-
212215
// Flush should be called prior to reset, so buffer should be empty. There may be a future
213216
// use case to truncate a non-empty IR buffer; however, there is currently no use case
214217
// so safer to throw an error.
215218
if w.irTotalBytes != 0 {
216219
return fmt.Errorf("error IR buffer is not empty")
217220
}
218221

219-
_, err = w.zstdFile.Seek(0, io.SeekStart)
222+
_, err := w.zstdFile.Seek(0, io.SeekStart)
220223
if err != nil {
221224
return err
222225
}

internal/irzstd/writer.go

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77
"fmt"
88
"io"
99

10-
"github.com/klauspost/compress/zstd"
11-
1210
"github.com/y-scope/clp-ffi-go/ffi"
1311
"github.com/y-scope/clp-ffi-go/ir"
1412
)
@@ -88,29 +86,3 @@ func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) (int, int, error) {
8886
}
8987
return numBytes, numEvents, nil
9088
}
91-
92-
// Opens a new [ir.Writer] and [zstd.Encoder].
93-
//
94-
// Parameters:
95-
// - zstdOutput: Output destination for Zstd
96-
// - irOutput: Output destination for IR
97-
//
98-
// Returns:
99-
// - irWriter: Writer for CLP IR
100-
// - zstdWriter: Writer for Zstd
101-
// - err: Error opening IR/Zstd writer
102-
func newIrZstdWriters(
103-
zstdOutput io.Writer,
104-
irOutput io.Writer,
105-
) (*ir.Writer, *zstd.Encoder, error) {
106-
zstdWriter, err := zstd.NewWriter(zstdOutput)
107-
if err != nil {
108-
return nil, nil, fmt.Errorf("error opening Zstd writer: %w", err)
109-
}
110-
111-
irWriter, err := ir.NewWriter[ir.FourByteEncoding](irOutput)
112-
if err != nil {
113-
return nil, nil, fmt.Errorf("error opening IR writer: %w", err)
114-
}
115-
return irWriter, zstdWriter, err
116-
}

0 commit comments

Comments
 (0)