Skip to content

Commit 43f7a1e

Browse files
authored
refact pkg/leakybucket: trim down redundant Leaky struct fields (#4290)
* rename Leaky.BucketConfig -> Leaky.Factory * remove Leaky.Name * remove Leaky.CacheSize * remove Leaky.Capacity * remove Leaky.Leakspeed * remove Leaky.Reprocess * remove Leaky.scopeType * remove Leaky.scenarioVersion * remove Leaky.orderEvent * rename BucketFactory.hash -> BucketFactory.scenarioHash * remove Leaky.hash * comments
1 parent 4624f98 commit 43f7a1e

File tree

6 files changed

+65
-87
lines changed

6 files changed

+65
-87
lines changed

pkg/leakybucket/bayesian.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (e *BayesianEvent) bayesianUpdate(p *BayesianProcessor, msg pipeline.Event,
100100
}
101101

102102
l.logger.Debugf("running condition expression: %s", e.rawCondition.ConditionalFilterName)
103-
ret, err := exprhelpers.Run(e.conditionalFilterRuntime, map[string]any{"evt": &msg, "queue": l.Queue, "leaky": l}, l.logger, l.BucketConfig.Spec.Debug)
103+
ret, err := exprhelpers.Run(e.conditionalFilterRuntime, map[string]any{"evt": &msg, "queue": l.Queue, "leaky": l}, l.logger, l.Factory.Spec.Debug)
104104
if err != nil {
105105
return fmt.Errorf("unable to run conditional filter: %w", err)
106106
}

pkg/leakybucket/bucket.go

Lines changed: 23 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ type pourGate interface {
2424

2525
// Leaky represents one instance of a bucket
2626
type Leaky struct {
27-
Name string
2827
Mode int // LIVE or TIMEMACHINE
2928
// the limiter is what holds the proper "leaky aspect", it determines when/if we can pour objects
3029
Limiter rate.RateLimiter `json:"-"`
@@ -37,35 +36,25 @@ type Leaky struct {
3736
Out chan *pipeline.Queue `json:"-"`
3837
// shared for all buckets (the idea is to kill this afterward)
3938
AllOut chan pipeline.Event `json:"-"`
40-
// max capacity (for burst)
41-
Capacity int
42-
// CacheRatio is the number of elements that should be kept in memory (compared to capacity)
43-
CacheSize int
4439
// the unique identifier of the bucket (a hash)
4540
Mapkey string
4641
ready chan struct{} // closed when LeakRoutine is ready
4742
readyOnce sync.Once // use to prevent double close
4843
done chan struct{} // closed when LeakRoutine has stopped processing
4944
doneOnce sync.Once // use to prevent double close
5045
Suicide chan bool `json:"-"`
51-
Reprocess bool
5246
Uuid string
5347
First_ts time.Time
5448
Last_ts time.Time
5549
Ovflw_ts time.Time
5650
Total_count int
57-
Leakspeed time.Duration
58-
BucketConfig *BucketFactory
51+
Factory *BucketFactory
5952
Duration time.Duration
6053
Pour func(*Leaky, pourGate, pipeline.Event) `json:"-"`
6154
timedOverflow bool
6255
conditionalOverflow bool
6356
logger *log.Entry
64-
scopeType ScopeType
65-
hash string
66-
scenarioVersion string
6757
mutex *sync.Mutex // used only for TIMEMACHINE mode to allow garbage collection without races
68-
orderEvent bool
6958
cancel context.CancelFunc
7059
}
7160

@@ -99,25 +88,16 @@ func NewLeakyFromFactory(f *BucketFactory) *Leaky {
9988

10089
// create the leaky bucket per se
10190
l := &Leaky{
102-
Name: f.Spec.Name,
10391
Limiter: limiter,
10492
Uuid: seed.Generate(),
10593
Queue: pipeline.NewQueue(Qsize),
106-
CacheSize: f.Spec.CacheSize,
10794
Out: make(chan *pipeline.Queue, 1),
10895
Suicide: make(chan bool, 1),
10996
AllOut: f.ret,
110-
Capacity: f.Spec.Capacity,
111-
Leakspeed: f.leakspeed,
112-
BucketConfig: f,
97+
Factory: f,
11398
Pour: Pour,
114-
Reprocess: f.Spec.Reprocess,
11599
Mode: pipeline.LIVE,
116-
scopeType: f.Spec.ScopeType,
117-
scenarioVersion: f.Spec.ScenarioVersion,
118-
hash: f.hash,
119100
mutex: &sync.Mutex{},
120-
orderEvent: f.orderEvent,
121101
}
122102
if f.Spec.Capacity > 0 && f.leakspeed != time.Duration(0) {
123103
l.Duration = time.Duration(f.Spec.Capacity+1) * f.leakspeed
@@ -155,24 +135,24 @@ func (l *Leaky) LeakRoutine(ctx context.Context, gate pourGate) {
155135
}
156136
}()
157137

158-
defer trace.CatchPanic(fmt.Sprintf("crowdsec/LeakRoutine/%s", l.Name))
138+
defer trace.CatchPanic(fmt.Sprintf("crowdsec/LeakRoutine/%s", l.Factory.Spec.Name))
159139

160-
metrics.BucketsCurrentCount.With(prometheus.Labels{"name": l.Name}).Inc()
161-
defer metrics.BucketsCurrentCount.With(prometheus.Labels{"name": l.Name}).Dec()
140+
metrics.BucketsCurrentCount.With(prometheus.Labels{"name": l.Factory.Spec.Name}).Inc()
141+
defer metrics.BucketsCurrentCount.With(prometheus.Labels{"name": l.Factory.Spec.Name}).Dec()
162142

163143
// TODO: we create a logger at runtime while we want leakroutine to be up asap, might not be a good idea
164-
l.logger = l.BucketConfig.logger.WithFields(log.Fields{"partition": l.Mapkey, "bucket_id": l.Uuid})
144+
l.logger = l.Factory.logger.WithFields(log.Fields{"partition": l.Mapkey, "bucket_id": l.Uuid})
165145

166146
// We copy the processors, as they are coming from the BucketFactory, and thus are shared between buckets
167147
// If we don't copy, processors using local cache (such as Uniq) are subject to race conditions
168148
// This can lead to creating buckets that will discard their first events, preventing the underflow ticker from being initialized
169149
// and preventing them from being destroyed
170-
processors := deepcopy.Copy(l.BucketConfig.processors).([]Processor)
150+
processors := deepcopy.Copy(l.Factory.processors).([]Processor)
171151

172152
l.markReady()
173153

174154
for _, f := range processors {
175-
err := f.OnBucketInit(l.BucketConfig)
155+
err := f.OnBucketInit(l.Factory)
176156
if err != nil {
177157
l.logger.Errorf("Problem at bucket initializiation. Bail out %T : %v", f, err)
178158
return
@@ -186,10 +166,10 @@ func (l *Leaky) LeakRoutine(ctx context.Context, gate pourGate) {
186166
case msg := <-l.In:
187167
// the msg var use is confusing and is redeclared in a different type :/
188168
for _, processor := range processors {
189-
msg = processor.OnBucketPour(l.BucketConfig, *msg, l)
169+
msg = processor.OnBucketPour(l.Factory, *msg, l)
190170
// if &msg == nil we stop processing
191171
if msg == nil {
192-
if l.orderEvent {
172+
if l.Factory.orderEvent {
193173
orderEvent[l.Mapkey].Done()
194174
}
195175
goto End
@@ -198,14 +178,14 @@ func (l *Leaky) LeakRoutine(ctx context.Context, gate pourGate) {
198178
if l.logger.Level >= log.TraceLevel {
199179
l.logger.Tracef("Pour event: %s", spew.Sdump(msg))
200180
}
201-
metrics.BucketsPour.With(prometheus.Labels{"name": l.Name, "source": msg.Line.Src, "type": msg.Line.Module}).Inc()
181+
metrics.BucketsPour.With(prometheus.Labels{"name": l.Factory.Spec.Name, "source": msg.Line.Src, "type": msg.Line.Module}).Inc()
202182

203183
l.Pour(l, gate, *msg) // glue for now
204184

205185
for _, processor := range processors {
206-
msg = processor.AfterBucketPour(l.BucketConfig, *msg, l)
186+
msg = processor.AfterBucketPour(l.Factory, *msg, l)
207187
if msg == nil {
208-
if l.orderEvent {
188+
if l.Factory.orderEvent {
209189
orderEvent[l.Mapkey].Done()
210190
}
211191
goto End
@@ -227,7 +207,7 @@ func (l *Leaky) LeakRoutine(ctx context.Context, gate pourGate) {
227207
}
228208
firstEvent = false
229209
// we overflowed
230-
if l.orderEvent {
210+
if l.Factory.orderEvent {
231211
orderEvent[l.Mapkey].Done()
232212
}
233213
case ofw := <-l.Out:
@@ -237,7 +217,7 @@ func (l *Leaky) LeakRoutine(ctx context.Context, gate pourGate) {
237217
case <-l.Suicide:
238218
// don't wait defer to close the channel, in case we are blocked before returning
239219
l.markDone()
240-
metrics.BucketsCanceled.With(prometheus.Labels{"name": l.Name}).Inc()
220+
metrics.BucketsCanceled.With(prometheus.Labels{"name": l.Factory.Spec.Name}).Inc()
241221
l.logger.Debugf("Suicide triggered")
242222
l.AllOut <- pipeline.Event{Type: pipeline.OVFLW, Overflow: pipeline.RuntimeAlert{Mapkey: l.Mapkey}}
243223
l.logger.Tracef("Returning from leaky routine.")
@@ -254,14 +234,14 @@ func (l *Leaky) LeakRoutine(ctx context.Context, gate pourGate) {
254234
alert = pipeline.RuntimeAlert{Mapkey: l.Mapkey}
255235

256236
if l.timedOverflow {
257-
metrics.BucketsOverflow.With(prometheus.Labels{"name": l.Name}).Inc()
237+
metrics.BucketsOverflow.With(prometheus.Labels{"name": l.Factory.Spec.Name}).Inc()
258238

259239
alert, err = NewAlert(l, ofw)
260240
if err != nil {
261241
log.Error(err)
262242
}
263-
for _, f := range l.BucketConfig.processors {
264-
alert, ofw = f.OnBucketOverflow(l.BucketConfig, l, alert, ofw)
243+
for _, f := range l.Factory.processors {
244+
alert, ofw = f.OnBucketOverflow(l.Factory, l, alert, ofw)
265245
if ofw == nil {
266246
l.logger.Debugf("Overflow has been discarded (%T)", f)
267247
break
@@ -270,7 +250,7 @@ func (l *Leaky) LeakRoutine(ctx context.Context, gate pourGate) {
270250
l.logger.Infof("Timed Overflow")
271251
} else {
272252
l.logger.Debugf("bucket underflow, destroy")
273-
metrics.BucketsUnderflow.With(prometheus.Labels{"name": l.Name}).Inc()
253+
metrics.BucketsUnderflow.With(prometheus.Labels{"name": l.Factory.Spec.Name}).Inc()
274254
}
275255
if l.logger.Level >= log.TraceLevel {
276256
// don't sdump if it's not going to be printed, it's expensive
@@ -320,9 +300,9 @@ func (l *Leaky) overflow(ofw *pipeline.Queue) {
320300
if err != nil {
321301
log.Errorf("%s", err)
322302
}
323-
l.logger.Tracef("Overflow hooks time : %v", l.BucketConfig.processors)
324-
for _, f := range l.BucketConfig.processors {
325-
alert, ofw = f.OnBucketOverflow(l.BucketConfig, l, alert, ofw)
303+
l.logger.Tracef("Overflow hooks time : %v", l.Factory.processors)
304+
for _, f := range l.Factory.processors {
305+
alert, ofw = f.OnBucketOverflow(l.Factory, l, alert, ofw)
326306
if ofw == nil {
327307
l.logger.Debugf("Overflow has been discarded (%T)", f)
328308
break
@@ -334,7 +314,7 @@ func (l *Leaky) overflow(ofw *pipeline.Queue) {
334314
mt, _ := l.Ovflw_ts.MarshalText()
335315
l.logger.Tracef("overflow time : %s", mt)
336316

337-
metrics.BucketsOverflow.With(prometheus.Labels{"name": l.Name}).Inc()
317+
metrics.BucketsOverflow.With(prometheus.Labels{"name": l.Factory.Spec.Name}).Inc()
338318

339319
l.AllOut <- pipeline.Event{Overflow: alert, Type: pipeline.OVFLW, MarshaledTime: string(mt)}
340320
}

pkg/leakybucket/manager_load.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
2626
)
2727

28+
// BucketSpec is the declarative YAML config for a scenario bucket. It holds all the possible user-provided fields.
2829
type BucketSpec struct {
2930
FormatVersion string `yaml:"format"`
3031
Description string `yaml:"description"`
@@ -53,8 +54,7 @@ type BucketSpec struct {
5354
ScenarioVersion string `yaml:"version,omitempty"`
5455
}
5556

56-
// BucketFactory struct holds all fields for any bucket configuration. This is to have a
57-
// generic struct for buckets. This can be seen as a bucket factory.
57+
// BucketFactory is the compiled/validated, reusable template produced from a BucketSpec.
5858
type BucketFactory struct {
5959
Spec BucketSpec
6060

@@ -68,7 +68,7 @@ type BucketFactory struct {
6868
duration time.Duration // internal representation of `Duration`
6969
ret chan pipeline.Event // the bucket-specific output chan for overflows
7070
processors []Processor // processors is the list of hooks for pour/overflow/create (cf. uniq, blackhole etc.)
71-
hash string
71+
scenarioHash string
7272
Simulated bool // Set to true if the scenario instantiating the bucket was in the exclusion list
7373
orderEvent bool
7474
}
@@ -166,7 +166,7 @@ func loadBucketFactoriesFromFile(
166166
f.Simulated = simcheck.IsSimulated(f.Spec.Name)
167167

168168
f.Spec.ScenarioVersion = item.State.LocalVersion
169-
f.hash = item.State.LocalHash
169+
f.scenarioHash = item.State.LocalHash
170170

171171
err = f.LoadBucket()
172172
if err != nil {
@@ -353,8 +353,6 @@ func (f *BucketFactory) initDataFiles() {
353353

354354
// LoadBucket validates and prepares a BucketFactory for runtime use (compile expressions, init processors, init data).
355355
func (f *BucketFactory) LoadBucket() error {
356-
var err error
357-
358356
f.logger = bucketLogger(f)
359357
f.logger.Infof("Adding %s bucket", f.Spec.Type)
360358

pkg/leakybucket/manager_run.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@ func GarbageCollectBuckets(deadline time.Time, bucketStore *BucketStore) {
4141
const eps = 1e-9
4242

4343
tokat := val.Limiter.GetTokensCountAt(deadline)
44-
tokcapa := float64(val.Capacity)
44+
tokcapa := float64(val.Factory.Spec.Capacity)
4545

4646
// bucket actually underflowed based on log time, but no in real time
4747
if tokat+eps >= tokcapa {
48-
metrics.BucketsUnderflow.With(prometheus.Labels{"name": val.Name}).Inc()
48+
metrics.BucketsUnderflow.With(prometheus.Labels{"name": val.Factory.Spec.Name}).Inc()
4949
val.logger.Debugf("UNDERFLOW : first_ts:%s tokens_at:%f capcity:%f", val.First_ts, tokat, tokcapa)
5050
toflush = append(toflush, key)
5151
val.cancel()
@@ -135,7 +135,7 @@ func PourItemToBucket(
135135
// holder.logger.Tracef("Successfully sent !")
136136
if collector != nil {
137137
evt := deepcopy.Copy(*parsed).(pipeline.Event)
138-
collector.Add(bucket.Name, evt)
138+
collector.Add(bucket.Factory.Spec.Name, evt)
139139
}
140140
holder.logger.Debugf("bucket '%s' is poured", holder.Spec.Name)
141141
return nil
@@ -258,7 +258,7 @@ func PourItemToHolders(
258258
}
259259
// finally, pour the even into the bucket
260260

261-
if bucket.orderEvent {
261+
if bucket.Factory.orderEvent {
262262
if orderEvent == nil {
263263
orderEvent = make(map[string]*sync.WaitGroup)
264264
}
@@ -273,7 +273,7 @@ func PourItemToHolders(
273273

274274
err = PourItemToBucket(ctx, bucket, &holders[idx], buckets, &parsed, collector)
275275

276-
if bucket.orderEvent {
276+
if bucket.Factory.orderEvent {
277277
orderEvent[buckey].Wait()
278278
}
279279

0 commit comments

Comments
 (0)