Skip to content

Commit 3e7eba2

Browse files
authored
pkg/leakybucket: replace Signal chan with explicit read/done chans (#4277)
* pkg/leakybucket: replace Signal chan with explicit read/done chans * fix: remove %w of nil err
1 parent 71338aa commit 3e7eba2

File tree

3 files changed

+37
-21
lines changed

3 files changed

+37
-21
lines changed

pkg/leakybucket/bucket.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,10 @@ type Leaky struct {
4343
CacheSize int
4444
// the unique identifier of the bucket (a hash)
4545
Mapkey string
46-
// chan for signaling
47-
Signal chan bool `json:"-"`
46+
ready chan struct{} // closed when LeakRoutine is ready
47+
readyOnce sync.Once // use to prevent double close
48+
done chan struct{} // closed when LeakRoutine has stopped processing
49+
doneOnce sync.Once // use to prevent double close
4850
Suicide chan bool `json:"-"`
4951
Reprocess bool
5052
Simulated bool
@@ -150,6 +152,8 @@ func (l *Leaky) LeakRoutine(ctx context.Context, gate pourGate) {
150152
firstEvent = true
151153
)
152154

155+
defer l.markDone()
156+
153157
defer func() {
154158
if durationTicker != nil {
155159
durationTicker.Stop()
@@ -170,13 +174,12 @@ func (l *Leaky) LeakRoutine(ctx context.Context, gate pourGate) {
170174
// and preventing them from being destroyed
171175
processors := deepcopy.Copy(l.BucketConfig.processors).([]Processor)
172176

173-
l.Signal <- true
177+
l.markReady()
174178

175179
for _, f := range processors {
176180
err := f.OnBucketInit(l.BucketConfig)
177181
if err != nil {
178182
l.logger.Errorf("Problem at bucket initializiation. Bail out %T : %v", f, err)
179-
close(l.Signal)
180183
return
181184
}
182185
}
@@ -237,7 +240,8 @@ func (l *Leaky) LeakRoutine(ctx context.Context, gate pourGate) {
237240
return
238241
// suiciiiide
239242
case <-l.Suicide:
240-
close(l.Signal)
243+
// don't wait defer to close the channel, in case we are blocked before returning
244+
l.markDone()
241245
metrics.BucketsCanceled.With(prometheus.Labels{"name": l.Name}).Inc()
242246
l.logger.Debugf("Suicide triggered")
243247
l.AllOut <- pipeline.Event{Type: pipeline.OVFLW, Overflow: pipeline.RuntimeAlert{Mapkey: l.Mapkey}}
@@ -250,7 +254,7 @@ func (l *Leaky) LeakRoutine(ctx context.Context, gate pourGate) {
250254
err error
251255
)
252256
l.Ovflw_ts = time.Now().UTC()
253-
close(l.Signal)
257+
l.markDone()
254258
ofw := l.Queue
255259
alert = pipeline.RuntimeAlert{Mapkey: l.Mapkey}
256260

@@ -316,7 +320,7 @@ func Pour(l *Leaky, gate pourGate, msg pipeline.Event) {
316320
}
317321

318322
func (l *Leaky) overflow(ofw *pipeline.Queue) {
319-
close(l.Signal)
323+
l.markDone()
320324
alert, err := NewAlert(l, ofw)
321325
if err != nil {
322326
log.Errorf("%s", err)
@@ -339,3 +343,16 @@ func (l *Leaky) overflow(ofw *pipeline.Queue) {
339343

340344
l.AllOut <- pipeline.Event{Overflow: alert, Type: pipeline.OVFLW, MarshaledTime: string(mt)}
341345
}
346+
347+
func (l *Leaky) markReady() {
348+
l.readyOnce.Do(func() {
349+
close(l.ready)
350+
})
351+
}
352+
353+
func (l *Leaky) markDone() {
354+
l.doneOnce.Do(func() {
355+
close(l.done)
356+
})
357+
}
358+

pkg/leakybucket/manager_load.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ func (f *BucketFactory) LoadBucket() error {
262262

263263
impl, ok := bucketTypes[f.Type]
264264
if !ok {
265-
return fmt.Errorf("invalid type '%s' in %s: %w", f.Type, f.Filename, err)
265+
return fmt.Errorf("invalid type '%s' in %s", f.Type, f.Filename)
266266
}
267267

268268
f.processors = impl.BuildProcessors(f)

pkg/leakybucket/manager_run.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -87,18 +87,16 @@ func PourItemToBucket(
8787

8888
/* check if leak routine is up */
8989
select {
90-
case _, ok := <-bucket.Signal:
91-
if !ok {
92-
// the bucket was found and dead, get a new one and continue
93-
bucket.logger.Tracef("Bucket %s found dead, cleanup the body", buckey)
94-
bucketStore.Delete(buckey)
95-
sigclosed += 1
96-
bucket, err = LoadOrStoreBucketFromHolder(ctx, buckey, bucketStore, holder, parsed.ExpectMode)
97-
if err != nil {
98-
return err
99-
}
100-
continue
90+
case <-bucket.done:
91+
// the bucket was found and dead, get a new one and continue
92+
bucket.logger.Tracef("Bucket %s found dead, cleanup the body", buckey)
93+
bucketStore.Delete(buckey)
94+
sigclosed += 1
95+
bucket, err = LoadOrStoreBucketFromHolder(ctx, buckey, bucketStore, holder, parsed.ExpectMode)
96+
if err != nil {
97+
return err
10198
}
99+
continue
102100
// holder.logger.Tracef("Signal exists, try to pour :)")
103101
default:
104102
// nothing to read, but not closed, try to pour
@@ -177,7 +175,8 @@ func LoadOrStoreBucketFromHolder(
177175
}
178176
fresh_bucket.In = make(chan *pipeline.Event)
179177
fresh_bucket.Mapkey = partitionKey
180-
fresh_bucket.Signal = make(chan bool, 1)
178+
fresh_bucket.ready = make(chan struct{})
179+
fresh_bucket.done = make(chan struct{})
181180
actual, stored := buckets.LoadOrStore(partitionKey, fresh_bucket)
182181
if !stored {
183182
go func() {
@@ -187,7 +186,7 @@ func LoadOrStoreBucketFromHolder(
187186
}()
188187
leaky = fresh_bucket
189188
// once the created goroutine is ready to process event, we can return it
190-
<-fresh_bucket.Signal
189+
<-fresh_bucket.ready
191190
} else {
192191
holder.logger.Debugf("Unexpectedly found exisint bucket for %s", partitionKey)
193192
leaky = actual

0 commit comments

Comments
 (0)