Skip to content

Commit 114d91d

Browse files
committed
log_event_encoder: add direct-emit path for FLUENT_BIT_V2 format
For FLUENT_BIT_V2 log format, emit_record can append the record as raw msgpack array bytes [ [timestamp, metadata], body ] directly to the main buffer instead of building it in the root buffer and then copying it. This avoids root buffer construction and one full-record copy per committed log record. The record is emitted without a msgpack string wrapper so the on-wire format matches the existing path and decoders that expect a root array continue to work. Estimated gain is on the order of 1–3% in the encoder hot path per record; Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
1 parent ab34adb commit 114d91d

File tree

1 file changed

+109
-40
lines changed

1 file changed

+109
-40
lines changed

src/flb_log_event_encoder.c

Lines changed: 109 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,27 @@
1919

2020
#include <fluent-bit/flb_log_event_encoder.h>
2121
#include <fluent-bit/flb_log_event_encoder_primitives.h>
22+
#include <fluent-bit/flb_byteswap.h>
2223
#include <stdarg.h>
24+
#include <string.h>
25+
26+
/*
27+
* Encoder hot-path optimization notes (emit_record / commit_record / metadata):
28+
*
29+
* 1. emit_record: For FLUENT_BIT_V2 we use a direct-emit path that writes
30+
* the record string [ [timestamp, metadata], body ] directly into the
31+
* main buffer (no root buffer build/copy), matching the original format.
32+
*
33+
* 2. Possible future improvements:
34+
* - Batch metadata: add append_metadata_kv_batch() or allow multiple
35+
* key-value pairs in one call to cut va_list and call overhead.
36+
* - Inline / fast path: move begin_record/commit_record hot branches
37+
* into a header so the compiler can inline for the common case.
38+
* - Buffer growth: reserve space for the next record (e.g. typical
39+
* metadata + body size) to reduce reallocs in msgpack_sbuffer.
40+
* - dynamic_field_flush: when there is only one scope per field, avoid
41+
* the scope loop (single commit then set data/size).
42+
*/
2343

2444
void static inline flb_log_event_encoder_update_internal_state(
2545
struct flb_log_event_encoder *context)
@@ -144,71 +164,120 @@ int flb_log_event_encoder_emit_raw_record(struct flb_log_event_encoder *context,
144164
return result;
145165
}
146166

167+
/*
168+
* Optimized emit_record: for FLUENT_BIT_V2 appends the record as raw
169+
* msgpack array bytes [ [timestamp, metadata], body ] to the main buffer
170+
* (no root buffer build/copy), matching the slow path format.
171+
*/
147172
int flb_log_event_encoder_emit_record(struct flb_log_event_encoder *context)
148173
{
149-
int result;
174+
int result;
175+
char ts_buf[8];
176+
char rec_header[5];
177+
uint32_t sec;
178+
uint32_t nsec;
150179

151180
if (context == NULL) {
152181
return FLB_EVENT_ENCODER_ERROR_INVALID_CONTEXT;
153182
}
154183

155184
result = FLB_EVENT_ENCODER_SUCCESS;
156185

157-
/* This function needs to be improved and optimized to avoid excessive
158-
* memory copying operations.
159-
*/
160-
161-
/* This conditional accounts for external raw record emission as
162-
* performed by some filters using either
163-
* flb_log_event_encoder_set_root_from_raw_msgpack
164-
* or
165-
* flb_log_event_encoder_set_root_from_msgpack_object
166-
*/
167186
if (context->root.size == 0) {
168-
result = flb_log_event_encoder_root_begin_array(context);
187+
if (context->format == FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2 &&
188+
context->metadata.data != NULL && context->body.data != NULL) {
189+
/*
190+
* Direct-emit: append raw array bytes [ [timestamp, metadata], body ]
191+
* (fixarray 2, fixarray 2, fixext 8 type 0, 8b ts, metadata, body).
192+
* Do not wrap in a msgpack string; match slow path which uses
193+
* msgpack_pack_str_body(root.data, root.size) with no str header.
194+
*/
195+
rec_header[0] = (char) 0x92;
196+
rec_header[1] = (char) 0x92;
197+
rec_header[2] = (char) 0xd7;
198+
rec_header[3] = (char) 0x00;
199+
200+
result = msgpack_pack_str_body(&context->packer,
201+
rec_header, (size_t) 4);
202+
if (result == 0) {
203+
sec = FLB_UINT32_TO_NETWORK_BYTE_ORDER((uint32_t) context->timestamp.tm.tv_sec);
204+
nsec = FLB_UINT32_TO_NETWORK_BYTE_ORDER((uint32_t) context->timestamp.tm.tv_nsec);
205+
memcpy(&ts_buf[0], &sec, 4);
206+
memcpy(&ts_buf[4], &nsec, 4);
207+
result = msgpack_pack_str_body(&context->packer, ts_buf, 8);
208+
}
209+
if (result == 0) {
210+
result = msgpack_pack_str_body(&context->packer,
211+
context->metadata.data,
212+
context->metadata.size);
213+
}
214+
if (result == 0) {
215+
result = msgpack_pack_str_body(&context->packer,
216+
context->body.data,
217+
context->body.size);
218+
}
219+
if (result != 0) {
220+
result = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
221+
}
222+
else {
223+
result = FLB_EVENT_ENCODER_SUCCESS;
224+
}
225+
}
226+
else {
227+
result = flb_log_event_encoder_root_begin_array(context);
228+
229+
if (context->format == FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2) {
230+
if (result == FLB_EVENT_ENCODER_SUCCESS) {
231+
result = flb_log_event_encoder_root_begin_array(context);
232+
}
233+
}
169234

170-
if (context->format == FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2) {
171235
if (result == FLB_EVENT_ENCODER_SUCCESS) {
172-
result = flb_log_event_encoder_root_begin_array(context);
236+
result = flb_log_event_encoder_append_root_timestamp(
237+
context, &context->timestamp);
173238
}
174-
}
175239

176-
if (result == FLB_EVENT_ENCODER_SUCCESS) {
177-
result = flb_log_event_encoder_append_root_timestamp(
178-
context, &context->timestamp);
179-
}
240+
if (context->format == FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2) {
241+
if (result == FLB_EVENT_ENCODER_SUCCESS) {
242+
result = flb_log_event_encoder_append_root_raw_msgpack(
243+
context,
244+
context->metadata.data,
245+
context->metadata.size);
246+
}
247+
248+
if (result == FLB_EVENT_ENCODER_SUCCESS) {
249+
result = flb_log_event_encoder_root_commit_array(context);
250+
}
251+
}
180252

181-
if (context->format == FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2) {
182253
if (result == FLB_EVENT_ENCODER_SUCCESS) {
183254
result = flb_log_event_encoder_append_root_raw_msgpack(
184255
context,
185-
context->metadata.data,
186-
context->metadata.size);
256+
context->body.data,
257+
context->body.size);
187258
}
188259

189-
/* We need to explicitly commit the current array (which
190-
* holds the timestamp and metadata elements so we leave
191-
* that scope and go back to the root scope where we can
192-
* append the body element.
193-
*/
194260
if (result == FLB_EVENT_ENCODER_SUCCESS) {
195-
result = flb_log_event_encoder_root_commit_array(context);
261+
result = flb_log_event_encoder_dynamic_field_flush(&context->root);
196262
}
197-
}
198-
199-
if (result == FLB_EVENT_ENCODER_SUCCESS) {
200-
result = flb_log_event_encoder_append_root_raw_msgpack(
201-
context,
202-
context->body.data,
203-
context->body.size);
204-
}
205263

206-
if (result == FLB_EVENT_ENCODER_SUCCESS) {
207-
result = flb_log_event_encoder_dynamic_field_flush(&context->root);
264+
if (result == FLB_EVENT_ENCODER_SUCCESS &&
265+
context->root.data != NULL && context->root.size > 0) {
266+
result = msgpack_pack_str_body(&context->packer,
267+
context->root.data,
268+
context->root.size);
269+
270+
if (result != 0) {
271+
result = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
272+
}
273+
else {
274+
result = FLB_EVENT_ENCODER_SUCCESS;
275+
}
276+
}
208277
}
209278
}
210-
211-
if (result == FLB_EVENT_ENCODER_SUCCESS) {
279+
else if (result == FLB_EVENT_ENCODER_SUCCESS &&
280+
context->root.data != NULL && context->root.size > 0) {
212281
result = msgpack_pack_str_body(&context->packer,
213282
context->root.data,
214283
context->root.size);

0 commit comments

Comments
 (0)