Skip to content

Commit daa76f9

Browse files
authored
feat: Apply upload criteria to memory buffering; Upload the memory buffer to S3 on exit. (#17)
1 parent da32e50 commit daa76f9

File tree

8 files changed

+122
-71
lines changed

8 files changed

+122
-71
lines changed

internal/irzstd/disk.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -260,14 +260,6 @@ func (w *diskWriter) Close() error {
260260
return nil
261261
}
262262

263-
// Getter for useDiskBuffer.
264-
//
265-
// Returns:
266-
// - useDiskBuffer: On/off for disk buffering
267-
func (w *diskWriter) GetUseDiskBuffer() bool {
268-
return true
269-
}
270-
271263
// Getter for Zstd Output.
272264
//
273265
// Returns:
@@ -284,6 +276,21 @@ func (w *diskWriter) GetZstdOutputSize() (int, error) {
284276
return w.getZstdFileSize()
285277
}
286278

279+
// Checks if writer is empty. True if no events are buffered.
280+
//
281+
// Returns:
282+
// - empty: Boolean value that is true if buffer is empty
283+
// - err: Error calling stat
284+
func (w *diskWriter) Empty() (bool, error) {
285+
zstdFileInfo, err := w.zstdFile.Stat()
286+
if err != nil {
287+
return false, err
288+
}
289+
290+
empty := (zstdFileInfo.Size() == 0) && (w.irTotalBytes == 0)
291+
return empty, nil
292+
}
293+
287294
// Compresses contents of the IR file and outputs it to the Zstd file. The IR file is then
288295
// truncated.
289296
//

internal/irzstd/memory.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,13 @@ import (
1111
"github.com/y-scope/clp-ffi-go/ir"
1212
)
1313

14-
// Converts log events into Zstd compressed IR. Log events provided to writer are immediately
15-
// converted to Zstd compressed IR and stored in [memoryWriter.ZstdBuffer]. After the Zstd buffer
16-
// receives logs, they are immediately sent to s3.
14+
// Converts log events into Zstd compressed IR. Log events are immediately converted to Zstd
15+
// compressed IR and stored in [memoryWriter.zstdBuffer].
1716
type memoryWriter struct {
18-
zstdBuffer *bytes.Buffer
19-
irWriter *ir.Writer
20-
zstdWriter *zstd.Encoder
17+
zstdBuffer *bytes.Buffer
18+
irWriter *ir.Writer
19+
zstdWriter *zstd.Encoder
20+
irTotalBytes int
2121
}
2222

2323
// Opens a new [memoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is
@@ -57,7 +57,8 @@ func NewMemoryWriter() (*memoryWriter, error) {
5757
// - numEvents: Number of log events successfully written to IR writer buffer
5858
// - err: Error writing IR/Zstd
5959
func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) {
60-
_, numEvents, err := writeIr(w.irWriter, logEvents)
60+
numBytes, numEvents, err := writeIr(w.irWriter, logEvents)
61+
w.irTotalBytes += numBytes
6162
if err != nil {
6263
return numEvents, err
6364
}
@@ -87,6 +88,7 @@ func (w *memoryWriter) Reset() error {
8788
var err error
8889
w.zstdBuffer.Reset()
8990
w.zstdWriter.Reset(w.zstdBuffer)
91+
w.irTotalBytes = 0
9092

9193
w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.zstdWriter)
9294
if err != nil {
@@ -96,14 +98,6 @@ func (w *memoryWriter) Reset() error {
9698
return nil
9799
}
98100

99-
// Getter for useDiskBuffer.
100-
//
101-
// Returns:
102-
// - useDiskBuffer: On/off for disk buffering
103-
func (w *memoryWriter) GetUseDiskBuffer() bool {
104-
return false
105-
}
106-
107101
// Getter for Zstd Output.
108102
//
109103
// Returns:
@@ -113,7 +107,8 @@ func (w *memoryWriter) GetZstdOutput() io.Reader {
113107
}
114108

115109
// Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write.
116-
// Instead, calling Len() on buffer.
110+
// Instead, calling Len() on buffer. Size may slightly lag the real size since some data in the
111+
// current block will be in the [zstd] encoder's internal buffer.
117112
//
118113
// Returns:
119114
// - size: Bytes written
@@ -139,3 +134,12 @@ func (w *memoryWriter) Close() error {
139134
}
140135
return nil
141136
}
137+
138+
// Checks if writer is empty. True if no events are buffered.
139+
//
140+
// Returns:
141+
// - empty: Boolean value that is true if buffer is empty
142+
// - err: nil error to comply with interface
143+
func (w *memoryWriter) Empty() (bool, error) {
144+
return w.irTotalBytes == 0, nil
145+
}

internal/irzstd/writer.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,6 @@ type Writer interface {
4343
// - err
4444
Reset() error
4545

46-
// Getter for useDiskBuffer.
47-
//
48-
// Returns:
49-
// - useDiskBuffer: On/off for disk buffering
50-
GetUseDiskBuffer() bool
51-
5246
// Getter for Zstd Output.
5347
//
5448
// Returns:
@@ -61,6 +55,13 @@ type Writer interface {
6155
// - size: Bytes written
6256
// - err
6357
GetZstdOutputSize() (int, error)
58+
59+
// Checks if writer is empty. True if no events are buffered.
60+
//
61+
// Returns:
62+
// - empty: Boolean value that is true if buffer is empty
63+
// - err
64+
Empty() (bool, error)
6465
}
6566

6667
// Writes log events to a IR Writer.

plugins/out_clp_s3/README.md

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,21 +97,23 @@ More detailed information for specifying credentials from AWS can be found [here
9797
| `s3_bucket_prefix` | Bucket prefix path | `logs/` |
9898
| `role_arn` | ARN of an IAM role to assume | `None` |
9999
| `id` | Name of output plugin | Random UUID |
100-
| `use_disk_buffer` | Buffer logs on disk prior to sending to S3. See [Disk Buffering](#disk-buffering) for more info. | `TRUE` |
101-
| `disk_buffer_path` | Directory for disk buffer | `tmp/out_clp_s3/` |
102-
| `upload_size_mb` | Set upload size in MB when disk store is enabled. Size refers to the compressed size. | `16` |
100+
| `use_disk_buffer` | Buffer logs on disk. See [Disk Buffering](#disk-buffering) for more info. | `TRUE` |
101+
| `disk_buffer_path` | Directory for disk buffer | `tmp/out_clp_s3/` |
102+
| `upload_size_mb` | Set upload size in MB. Size refers to the compressed size. | `16` |
103103

104104
#### Disk Buffering
105105

106-
The output plugin recieves raw logs from Fluent Bit in small chunks. With `use_disk_buffer` set, the
107-
output plugin will accumulate logs on disk until the upload size is reached. Buffering logs will
108-
reduce the amount of S3 API requests and improve the compression ratio. However, the plugin will use
109-
disk space and have higher memory requirements. The amount of system resources will be proportional
110-
to the amount of Fluent Bit tags. With `use_disk_buffer` off, the plugin will immediately process
111-
each chunk and send it to S3.
106+
The output plugin recieves raw logs from Fluent Bit in small chunks and accumulates them in a compressed
107+
buffer until the upload size is reached before sending to S3.
112108

113-
Logs are stored on the disk as IR and Zstd compressed IR. If the plugin were to crash, stored logs
114-
will be sent to S3 when Fluent Bit restarts. The upload index restarts on recovery.
109+
With `use_disk_buffer` set, logs are stored on disk as IR and Zstd compressed IR. On a graceful shutdown
110+
or abrupt crash, stored logs will be sent to S3 when Fluent Bit restarts. For an abrupt crash, there is
111+
a very small chance of data corruption if the plugin crashed mid write. The upload index restarts on
112+
recovery.
113+
114+
With `use_disk_buffer` off, logs are stored in memory as Zstd compressed IR. On a graceful shutdown, the
115+
plugin will attempt to upload any buffered data to S3 before Fluent Bit terminates it. On an abrupt
116+
crash, in-memory data is lost.
115117

116118
### S3 Objects
117119

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Package exit provides functions for gracefully shutting down the plugin. Exit functions are only
2+
// called when Fluent Bit receives a kill signal, not during an abrupt crash. The plugin is given
3+
// limited time to clean up resources before Fluent Bit terminates it.
4+
5+
package exit
6+
7+
import (
8+
"github.com/y-scope/fluent-bit-clp/internal/outctx"
9+
)
10+
11+
// NoUpload gracefully exits the plugin by closing writers without uploading.
12+
//
13+
// Parameters:
14+
// - ctx: Plugin context
15+
//
16+
// Returns:
17+
// - err: Error closing file
18+
func NoUpload(ctx *outctx.S3Context) error {
19+
for _, eventManager := range ctx.EventManagers {
20+
err := eventManager.Writer.Close()
21+
if err != nil {
22+
return err
23+
}
24+
eventManager.Writer = nil
25+
}
26+
27+
return nil
28+
}
29+
30+
// S3 gracefully exits the plugin by flushing buffered data to S3. Makes a best-effort attempt,
31+
// however Fluent Bit may kill the plugin before the upload completes.
32+
//
33+
// Parameters:
34+
// - ctx: Plugin context
35+
//
36+
// Returns:
37+
// - err: Error closing file
38+
func S3(ctx *outctx.S3Context) error {
39+
for _, eventManager := range ctx.EventManagers {
40+
empty, err := eventManager.Writer.Empty()
41+
if err != nil {
42+
return err
43+
}
44+
if empty {
45+
continue
46+
}
47+
err = eventManager.ToS3(ctx.Config, ctx.Uploader)
48+
if err != nil {
49+
return err
50+
}
51+
err = eventManager.Writer.Close()
52+
if err != nil {
53+
return err
54+
}
55+
eventManager.Writer = nil
56+
}
57+
58+
return nil
59+
}

plugins/out_clp_s3/internal/flush/flush.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,7 @@ func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent,
111111
}
112112
}
113113

114-
// Checks if criteria are met to upload to s3. If useDiskBuffer is false, then the chunk is always
115-
// uploaded so always returns true. If useDiskBuffer is true, check if Zstd buffer size is greater
116-
// than upload size.
114+
// Checks whether Zstd buffer size is greater than or equal to upload size.
117115
//
118116
// Parameters:
119117
// - eventManager: Manager for Fluent Bit events with the same tag
@@ -123,10 +121,6 @@ func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent,
123121
// - readyToUpload: Boolean if upload criteria met or not
124122
// - err: Error getting Zstd buffer size
125123
func checkUploadCriteriaMet(eventManager *outctx.EventManager, uploadSizeMb int) (bool, error) {
126-
if !eventManager.Writer.GetUseDiskBuffer() {
127-
return true, nil
128-
}
129-
130124
bufferSize, err := eventManager.Writer.GetZstdOutputSize()
131125
if err != nil {
132126
return false, fmt.Errorf("error could not get size of buffer: %w", err)

plugins/out_clp_s3/internal/recovery/recovery.go

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,6 @@ import (
1313
"github.com/y-scope/fluent-bit-clp/internal/outctx"
1414
)
1515

16-
// If useDiskBuffer is set, close all files prior to exit. Graceful exit will only be called
17-
// if Fluent Bit receives a kill signal and not during an abrupt crash. Plugin is only
18-
// given a limited time to clean up resources, so output is not sent to s3. Instead
19-
// they are sent during startup.
20-
//
21-
// Parameters:
22-
// - ctx: Plugin context
23-
//
24-
// Returns:
25-
// - err: Error closing file
26-
func GracefulExit(ctx *outctx.S3Context) error {
27-
for _, eventManager := range ctx.EventManagers {
28-
err := eventManager.Writer.Close()
29-
if err != nil {
30-
return err
31-
}
32-
eventManager.Writer = nil
33-
}
34-
35-
return nil
36-
}
37-
3816
// Sends existing disk buffers to S3.
3917
//
4018
// Parameters:

plugins/out_clp_s3/out_clp_s3.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/fluent/fluent-bit-go/output"
1717

1818
"github.com/y-scope/fluent-bit-clp/internal/outctx"
19+
"github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/exit"
1920
"github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/flush"
2021
"github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/recovery"
2122
)
@@ -132,7 +133,12 @@ func FLBPluginExitCtx(ctx unsafe.Pointer) int {
132133

133134
log.Printf("[%s] Exit called for id: %s", s3PluginName, outCtx.Config.Id)
134135

135-
err := recovery.GracefulExit(outCtx)
136+
var err error
137+
if outCtx.Config.UseDiskBuffer {
138+
err = exit.NoUpload(outCtx)
139+
} else {
140+
err = exit.S3(outCtx)
141+
}
136142
if err != nil {
137143
log.Printf("Failed to exit gracefully")
138144
}

0 commit comments

Comments
 (0)