Skip to content

Commit 371be9d

Browse files
authored
refact pkg/leakybucket: extract methods from LoadBucket() part 2 (#4282)
* extract BucketFactory.buildOptionalProcessors() * extract initDataFiles()
1 parent 07c95f5 commit 371be9d

File tree

1 file changed

+61
-43
lines changed

1 file changed

+61
-43
lines changed

pkg/leakybucket/manager_load.go

Lines changed: 61 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -260,47 +260,31 @@ func (f *BucketFactory) compileExpr() error {
260260
return nil
261261
}
262262

263-
// LoadBucket validates and prepares a BucketFactory for runtime use (compile expressions, init processors, init data).
264-
func (f *BucketFactory) LoadBucket() error {
265-
var err error
266-
267-
f.logger = bucketLogger(f)
268-
269-
if err := f.parseDurations(); err != nil {
270-
return err
271-
}
272-
273-
if err := f.compileExpr(); err != nil {
274-
return err
275-
}
276-
277-
f.logger.Infof("Adding %s bucket", f.Type)
278-
// return the Holder corresponding to the type of bucket
279-
280-
impl, ok := bucketTypes[f.Type]
281-
if !ok {
282-
return fmt.Errorf("invalid type '%s' in %s", f.Type, f.Filename)
263+
func (f *BucketFactory) buildOptionalProcessors() ([]Processor, error) {
264+
// Some optional processors depend on expressions. We compile those expressions here
265+
// during loading (and discard the compiled program) so misconfigurations fail fast.
266+
check := func(bucketType, ex string, extra map[string]any) error {
267+
if _, err := compile(ex, extra); err != nil {
268+
return fmt.Errorf("invalid %s '%s' in %s: %w", bucketType, ex, f.Filename, err)
269+
}
270+
return nil
283271
}
284272

285-
f.processors = impl.BuildProcessors(f)
273+
var procs []Processor
286274

287275
if f.Distinct != "" {
288276
f.logger.Tracef("Adding a non duplicate filter")
289-
f.processors = append(f.processors, &UniqProcessor{})
290-
// we're compiling and discarding the expression to be able to detect it during loading
291-
_, err = compile(f.Distinct, nil)
292-
if err != nil {
293-
return fmt.Errorf("invalid distinct '%s' in %s: %w", f.Distinct, f.Filename, err)
277+
procs = append(procs, &UniqProcessor{})
278+
if err := check("distinct", f.Distinct, nil); err != nil {
279+
return nil, err
294280
}
295281
}
296282

297283
if f.CancelOnFilter != "" {
298284
f.logger.Tracef("Adding a cancel_on filter")
299-
f.processors = append(f.processors, &CancelProcessor{})
300-
// we're compiling and discarding the expression to be able to detect it during loading
301-
_, err = compile(f.CancelOnFilter, nil)
302-
if err != nil {
303-
return fmt.Errorf("invalid cancel_on '%s' in %s: %w", f.CancelOnFilter, f.Filename, err)
285+
procs = append(procs, &CancelProcessor{})
286+
if err := check("cancel_on", f.CancelOnFilter, nil); err != nil {
287+
return nil, err
304288
}
305289
}
306290

@@ -310,10 +294,10 @@ func (f *BucketFactory) LoadBucket() error {
310294
filovflw, err := NewOverflowProcessor(f)
311295
if err != nil {
312296
f.logger.Errorf("Error creating overflow_filter : %s", err)
313-
return fmt.Errorf("error creating overflow_filter: %w", err)
297+
return nil, fmt.Errorf("error creating overflow_filter: %w", err)
314298
}
315299

316-
f.processors = append(f.processors, filovflw)
300+
procs = append(procs, filovflw)
317301
}
318302

319303
if f.Blackhole != "" {
@@ -322,35 +306,36 @@ func (f *BucketFactory) LoadBucket() error {
322306
blackhole, err := NewBlackholeProcessor(f)
323307
if err != nil {
324308
f.logger.Errorf("Error creating blackhole : %s", err)
325-
return fmt.Errorf("error creating blackhole : %w", err)
309+
return nil, fmt.Errorf("error creating blackhole : %w", err)
326310
}
327311

328-
f.processors = append(f.processors, blackhole)
312+
procs = append(procs, blackhole)
329313
}
330314

331315
if f.ConditionalOverflow != "" {
332316
f.logger.Tracef("Adding conditional overflow")
333-
f.processors = append(f.processors, &ConditionalProcessor{})
334-
// we're compiling and discarding the expression to be able to detect it during loading
335-
_, err = compile(f.ConditionalOverflow, map[string]any{"queue": &pipeline.Queue{}, "leaky": &Leaky{}})
336-
if err != nil {
337-
return fmt.Errorf("invalid condition '%s' in %s: %w", f.ConditionalOverflow, f.Filename, err)
317+
procs = append(procs, &ConditionalProcessor{})
318+
if err := check("condition", f.ConditionalOverflow, map[string]any{"queue": &pipeline.Queue{}, "leaky": &Leaky{}}); err != nil {
319+
return nil, err
338320
}
339321
}
340322

341323
if f.BayesianThreshold != 0 {
342324
f.logger.Tracef("Adding bayesian processor")
343-
f.processors = append(f.processors, &BayesianProcessor{})
325+
procs = append(procs, &BayesianProcessor{})
344326
}
345327

328+
return procs, nil
329+
}
330+
331+
func (f *BucketFactory) initDataFiles() {
346332
for _, data := range f.Data {
347333
if data.DestPath == "" {
348334
f.logger.Errorf("no dest_file provided for '%s'", f.Name)
349335
continue
350336
}
351337

352-
err = exprhelpers.FileInit(f.DataDir, data.DestPath, data.Type)
353-
if err != nil {
338+
if err := exprhelpers.FileInit(f.DataDir, data.DestPath, data.Type); err != nil {
354339
f.logger.Errorf("unable to init data for file '%s': %s", data.DestPath, err)
355340
}
356341

@@ -360,6 +345,39 @@ func (f *BucketFactory) LoadBucket() error {
360345
}
361346
}
362347
}
348+
}
349+
350+
// LoadBucket validates and prepares a BucketFactory for runtime use (compile expressions, init processors, init data).
351+
func (f *BucketFactory) LoadBucket() error {
352+
var err error
353+
354+
f.logger = bucketLogger(f)
355+
f.logger.Infof("Adding %s bucket", f.Type)
356+
357+
if err := f.parseDurations(); err != nil {
358+
return err
359+
}
360+
361+
if err := f.compileExpr(); err != nil {
362+
return err
363+
}
364+
365+
impl, ok := bucketTypes[f.Type]
366+
if !ok {
367+
return fmt.Errorf("invalid type '%s' in %s", f.Type, f.Filename)
368+
}
369+
370+
procs := impl.BuildProcessors(f)
371+
372+
optProcs, err := f.buildOptionalProcessors()
373+
if err != nil {
374+
return err
375+
}
376+
377+
procs = append(procs, optProcs...)
378+
f.processors = procs
379+
380+
f.initDataFiles()
363381

364382
if err := f.Validate(); err != nil {
365383
return fmt.Errorf("invalid bucket from %s: %w", f.Filename, err)

0 commit comments

Comments
 (0)