Skip to content

Commit da32e50

Browse files
authored
feat: Migrate from IRV1 to KV-IR serializer. (#15)
1 parent b655567 commit da32e50

File tree

14 files changed

+172
-263
lines changed

14 files changed

+172
-263
lines changed

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.24
44

55
require (
66
github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c
7-
github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb
7+
github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855
88
)
99

1010
require (
@@ -39,6 +39,8 @@ require (
3939
github.com/go-playground/universal-translator v0.18.1 // indirect
4040
github.com/jmespath/go-jmespath v0.4.0 // indirect
4141
github.com/leodido/go-urn v1.4.0 // indirect
42+
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
43+
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
4244
golang.org/x/crypto v0.19.0 // indirect
4345
golang.org/x/net v0.21.0 // indirect
4446
golang.org/x/sys v0.17.0 // indirect

go.sum

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,12 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
6969
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
7070
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
7171
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
72-
github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb h1:MAuKBGpfQIIl63810kEYZxUv8tfpI9y0nZlyi7tS8A8=
73-
github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb/go.mod h1:EkeM7lP5AWNRcmBWt3MvjAkRx7RT0gzisW4sh+SJYUw=
72+
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
73+
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
74+
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
75+
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
76+
github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855 h1:yrwcVsQs6qpCiRpCHgOk7g+jo1hBVwT9MhJA1hEQLso=
77+
github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855/go.mod h1:EuJRZ9fcHuedhtPHsVCsB84isZkqVECbvfAfhj9JGFI=
7478
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
7579
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
7680
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=

internal/decoder/decoder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func New(data unsafe.Pointer, length int) *codec.Decoder {
5858
mh.RawToString = true
5959
mh.WriteExt = true
6060
mh.ErrorIfNoArrayExpand = true
61+
mh.MapType = reflect.TypeOf(map[string]interface{}{})
6162

6263
// Set up custom extension for Fluent Bit timestamp format.
6364
mh.SetBytesExt(reflect.TypeOf(FlbTime{}), 0, &FlbTime{})

internal/irzstd/disk.go

Lines changed: 59 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ import (
1616
// 2 MB threshold to buffer IR before compressing to Zstd.
1717
const irSizeThreshold = 2 << 20
1818

19+
// IR end of stream byte used to terminate IR stream.
20+
const irEndOfStreamByte = 0x0
21+
1922
// Converts log events into Zstd compressed IR using "trash compactor" design. Log events are
2023
// converted to uncompressed IR and buffered into "bins". Uncompressed IR represents uncompressed
2124
// trash in "trash compactor". Once the bin is full, the bin is "compacted" into its own separate
@@ -36,8 +39,6 @@ type diskWriter struct {
3639
irFile *os.File
3740
zstdFile *os.File
3841
irWriter *ir.Writer
39-
size int
40-
timezone string
4142
irTotalBytes int
4243
zstdWriter *zstd.Encoder
4344
}
@@ -46,33 +47,24 @@ type diskWriter struct {
4647
// is on.
4748
//
4849
// Parameters:
49-
// - timezone: Time zone of the log source
50-
// - size: Byte length
5150
// - irPath: Path to IR disk buffer file
5251
// - zstdPath: Path to Zstd disk buffer file
5352
//
5453
// Returns:
5554
// - diskWriter: Disk writer for Zstd compressed IR
5655
// - err: Error creating new buffers, error opening Zstd/IR writers
57-
func NewDiskWriter(
58-
timezone string,
59-
size int,
60-
irPath string,
61-
zstdPath string,
62-
) (*diskWriter, error) {
56+
func NewDiskWriter(irPath string, zstdPath string) (*diskWriter, error) {
6357
irFile, zstdFile, err := newFileBuffers(irPath, zstdPath)
6458
if err != nil {
6559
return nil, err
6660
}
6761

68-
irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, timezone, size)
62+
irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, irFile)
6963
if err != nil {
7064
return nil, err
7165
}
7266

7367
diskWriter := diskWriter{
74-
size: size,
75-
timezone: timezone,
7668
irPath: irPath,
7769
irFile: irFile,
7870
zstdPath: zstdPath,
@@ -84,43 +76,37 @@ func NewDiskWriter(
8476
return &diskWriter, nil
8577
}
8678

87-
// Recovers a [diskWriter] opening buffer files from a previous execution of output plugin.
88-
// Recovery of files necessitates that use_disk_store is on. IR preamble is removed for
89-
// recovered store. Avoid use with empty disk stores as there will be no preamble.
79+
// Recovers a [diskWriter] by opening buffer files from a previous execution of the output plugin.
80+
// Requires use_disk_store to be enabled. The recovered writer must be closed with [CloseStreams]
81+
// before it can be used for future writes, since it does not initialize an IR writer. Returns an
82+
// error if both disk buffers are empty, since the IR would not have a preamble and would be
83+
// invalid.
9084
//
9185
// Parameters:
92-
// - timezone: Time zone of the log source
93-
// - size: Byte length
9486
// - irPath: Path to IR disk buffer file
9587
// - zstdPath: Path to Zstd disk buffer file
9688
//
9789
// Returns:
9890
// - diskWriter: Disk writer for Zstd compressed IR
99-
// - err: Error opening buffers, error opening Zstd/IR writers, error getting file sizes
100-
func RecoverWriter(
101-
timezone string,
102-
size int,
103-
irPath string,
104-
zstdPath string,
105-
) (*diskWriter, error) {
91+
// - err: Error opening buffers, error opening Zstd/IR writers, error getting file sizes,
92+
// error empty buffers
93+
func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) {
10694
irFile, zstdFile, err := openBufferFiles(irPath, zstdPath)
10795
if err != nil {
10896
return nil, fmt.Errorf("error opening files: %w", err)
10997
}
11098

111-
irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, timezone, size)
99+
zstdWriter, err := zstd.NewWriter(zstdFile)
112100
if err != nil {
113-
return nil, err
101+
return nil, fmt.Errorf("error opening Zstd writer: %w", err)
114102
}
115103

116104
diskWriter := diskWriter{
117-
size: size,
118-
timezone: timezone,
119105
irPath: irPath,
120106
irFile: irFile,
121107
zstdPath: zstdPath,
122108
zstdFile: zstdFile,
123-
irWriter: irWriter,
109+
irWriter: nil,
124110
zstdWriter: zstdWriter,
125111
}
126112

@@ -129,11 +115,16 @@ func RecoverWriter(
129115
return nil, fmt.Errorf("error getting size of IR file: %w", err)
130116
}
131117

132-
// During recovery, IR buffer may not be empty, so the size must be set. In addition,
133-
// the non-empty disk buffers already have existing preamble so remove it. Disk buffer
134-
// must have non-zero size or else would be deleted in recover.
118+
zstdFileSize, err := diskWriter.getZstdFileSize()
119+
if err != nil {
120+
return nil, fmt.Errorf("error getting size of Zstd file: %w", err)
121+
}
122+
123+
if (irFileSize == 0) && (zstdFileSize == 0) {
124+
return nil, fmt.Errorf("error both IR and Zstd buffers are empty")
125+
}
126+
135127
diskWriter.irTotalBytes = irFileSize
136-
irWriter.Reset()
137128

138129
return &diskWriter, nil
139130
}
@@ -149,12 +140,7 @@ func RecoverWriter(
149140
// - numEvents: Number of log events successfully written to IR writer buffer
150141
// - err: Error writing IR/Zstd, error flushing buffers
151142
func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) {
152-
numEvents, err := writeIr(w.irWriter, logEvents)
153-
if err != nil {
154-
return numEvents, err
155-
}
156-
157-
numBytes, err := w.irWriter.WriteTo(w.irFile)
143+
numBytes, numEvents, err := writeIr(w.irWriter, logEvents)
158144
if err != nil {
159145
return numEvents, err
160146
}
@@ -173,9 +159,12 @@ func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) {
173159
return numEvents, nil
174160
}
175161

176-
// Closes IR stream and Zstd frame. Add trailing byte(s) required for IR/Zstd decoding.
177-
// The IR buffer is also flushed before ending stream. After calling close,
178-
// [diskWriter] must be reset prior to calling write.
162+
// Closes IR stream and Zstd frame. Add trailing byte(s) required for IR/Zstd decoding. The IR
163+
// buffer is also flushed before ending stream. After calling close, [diskWriter] must be reset
164+
// prior to calling write. For recovered [diskWriter], [ir.Writer] will be nil so closing the
165+
// IR writer is skipped. The IR trailing byte is written directly to [zstdWriter] as an
166+
// optimization to avoid an extra flush when the IR buffer is empty. [flushIrBuffer] exits early
167+
// if the IR buffer is empty.
179168
//
180169
// Returns:
181170
// - err: Error flushing/closing buffers
@@ -186,13 +175,15 @@ func (w *diskWriter) CloseStreams() error {
186175
return fmt.Errorf("error flushing IR buffer: %w", err)
187176
}
188177

189-
_, err = w.irWriter.CloseTo(w.zstdWriter)
190-
if err != nil {
191-
return err
178+
if w.irWriter != nil {
179+
err := w.irWriter.Serializer.Close()
180+
if err != nil {
181+
return fmt.Errorf("error could not close irWriter: %w", err)
182+
}
183+
w.irWriter = nil
192184
}
193185

194-
w.irWriter = nil
195-
186+
w.zstdWriter.Write([]byte{irEndOfStreamByte})
196187
err = w.zstdWriter.Close()
197188
if err != nil {
198189
return err
@@ -213,7 +204,7 @@ func (w *diskWriter) CloseStreams() error {
213204
// - err: Error opening IR writer, error IR buffer not empty
214205
func (w *diskWriter) Reset() error {
215206
var err error
216-
w.irWriter, err = ir.NewWriterSize[ir.FourByteEncoding](w.size, w.timezone)
207+
w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.irFile)
217208
if err != nil {
218209
return err
219210
}
@@ -285,21 +276,12 @@ func (w *diskWriter) GetZstdOutput() io.Reader {
285276
return w.zstdFile
286277
}
287278

288-
// Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write.
289-
// Therefore, cannot keep track of size with variable as implemented for IR with [IrTotalBytes].
290-
// Instead, must always use stat.
279+
// Get size of Zstd output.
291280
//
292281
// Returns:
293-
// - err: Error calling stat
282+
// - err: Error getting size
294283
func (w *diskWriter) GetZstdOutputSize() (int, error) {
295-
zstdFileInfo, err := w.zstdFile.Stat()
296-
if err != nil {
297-
return 0, err
298-
}
299-
300-
zstdFileSize := int(zstdFileInfo.Size())
301-
302-
return zstdFileSize, err
284+
return w.getZstdFileSize()
303285
}
304286

305287
// Compresses contents of the IR file and outputs it to the Zstd file. The IR file is then
@@ -451,3 +433,19 @@ func (w *diskWriter) getIrFileSize() (int, error) {
451433
irFileSize := int(irFileInfo.Size())
452434
return irFileSize, err
453435
}
436+
437+
// Get size of Zstd file. [zstd] does not provide the amount of bytes written with each write.
438+
// Therefore, cannot keep track of size with variable as implemented for IR with [irTotalBytes].
439+
// Instead, must always use stat.
440+
//
441+
// Returns:
442+
// - err: Error calling stat
443+
func (w *diskWriter) getZstdFileSize() (int, error) {
444+
zstdFileInfo, err := w.zstdFile.Stat()
445+
if err != nil {
446+
return 0, err
447+
}
448+
449+
zstdFileSize := int(zstdFileInfo.Size())
450+
return zstdFileSize, err
451+
}

internal/irzstd/memory.go

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,29 @@ import (
1717
type memoryWriter struct {
1818
zstdBuffer *bytes.Buffer
1919
irWriter *ir.Writer
20-
size int
21-
timezone string
2220
zstdWriter *zstd.Encoder
2321
}
2422

2523
// Opens a new [memoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is
2624
// off.
2725
//
28-
// Parameters:
29-
// - timezone: Time zone of the log source
30-
// - size: Byte length
31-
//
3226
// Returns:
3327
// - memoryWriter: Memory writer for Zstd compressed IR
3428
// - err: Error opening Zstd/IR writers
35-
func NewMemoryWriter(timezone string, size int) (*memoryWriter, error) {
29+
func NewMemoryWriter() (*memoryWriter, error) {
3630
var zstdBuffer bytes.Buffer
37-
irWriter, zstdWriter, err := newIrZstdWriters(&zstdBuffer, timezone, size)
31+
32+
zstdWriter, err := zstd.NewWriter(&zstdBuffer)
3833
if err != nil {
39-
return nil, err
34+
return nil, fmt.Errorf("error opening Zstd writer: %w", err)
35+
}
36+
37+
irWriter, err := ir.NewWriter[ir.FourByteEncoding](zstdWriter)
38+
if err != nil {
39+
return nil, fmt.Errorf("error opening IR writer: %w", err)
4040
}
4141

4242
memoryWriter := memoryWriter{
43-
size: size,
44-
timezone: timezone,
4543
irWriter: irWriter,
4644
zstdWriter: zstdWriter,
4745
zstdBuffer: &zstdBuffer,
@@ -59,12 +57,10 @@ func NewMemoryWriter(timezone string, size int) (*memoryWriter, error) {
5957
// - numEvents: Number of log events successfully written to IR writer buffer
6058
// - err: Error writing IR/Zstd
6159
func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) {
62-
numEvents, err := writeIr(w.irWriter, logEvents)
60+
_, numEvents, err := writeIr(w.irWriter, logEvents)
6361
if err != nil {
6462
return numEvents, err
6563
}
66-
67-
_, err = w.irWriter.WriteTo(w.zstdWriter)
6864
return numEvents, err
6965
}
7066

@@ -74,15 +70,12 @@ func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) {
7470
// Returns:
7571
// - err: Error closing buffers
7672
func (w *memoryWriter) CloseStreams() error {
77-
_, err := w.irWriter.CloseTo(w.zstdWriter)
78-
if err != nil {
73+
if err := w.irWriter.Close(); err != nil {
7974
return err
8075
}
81-
8276
w.irWriter = nil
8377

84-
err = w.zstdWriter.Close()
85-
return err
78+
return w.zstdWriter.Close()
8679
}
8780

8881
// Reinitialize [memoryWriter] after calling CloseStreams(). Resets individual IR and Zstd writers
@@ -92,13 +85,14 @@ func (w *memoryWriter) CloseStreams() error {
9285
// - err: Error opening IR writer
9386
func (w *memoryWriter) Reset() error {
9487
var err error
95-
w.irWriter, err = ir.NewWriterSize[ir.FourByteEncoding](w.size, w.timezone)
88+
w.zstdBuffer.Reset()
89+
w.zstdWriter.Reset(w.zstdBuffer)
90+
91+
w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.zstdWriter)
9692
if err != nil {
9793
return err
9894
}
9995

100-
w.zstdBuffer.Reset()
101-
w.zstdWriter.Reset(w.zstdBuffer)
10296
return nil
10397
}
10498

0 commit comments

Comments
 (0)