@@ -82,7 +82,7 @@ func startParserRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconf
8282 }
8383}
8484
85- func startBucketRoutines (ctx context.Context , g * errgroup.Group , cConfig * csconfig.Config , pourCollector * leakybucket.PourCollector ) {
85+ func startBucketRoutines (ctx context.Context , g * errgroup.Group , cConfig * csconfig.Config , pourCollector * leakybucket.PourCollector , bucketStore * leakybucket. BucketStore ) {
8686 for idx := range cConfig .Crowdsec .BucketsRoutinesCount {
8787 log .WithField ("idx" , idx ).Info ("Starting bucket routine" )
8888 g .Go (func () error {
@@ -98,7 +98,7 @@ func startHeartBeat(ctx context.Context, _ *csconfig.Config, apiClient *apiclien
9898 apiClient .HeartBeat .StartHeartBeat (ctx )
9999}
100100
101- func startOutputRoutines (ctx context.Context , cConfig * csconfig.Config , parsers * parser.Parsers , apiClient * apiclient.ApiClient , sd * StateDumper ) {
101+ func startOutputRoutines (ctx context.Context , cConfig * csconfig.Config , parsers * parser.Parsers , apiClient * apiclient.ApiClient , sd * StateDumper , bucketStore * leakybucket. BucketStore ) {
102102 for idx := range cConfig .Crowdsec .OutputRoutinesCount {
103103 log .WithField ("idx" , idx ).Info ("Starting output routine" )
104104 outputsTomb .Go (func () error {
@@ -144,12 +144,13 @@ func runCrowdsec(
144144 hub * cwhub.Hub ,
145145 datasources []acquisitionTypes.DataSource ,
146146 sd * StateDumper ,
147+ bucketStore * leakybucket.BucketStore ,
147148) error {
148149 inEvents = make (chan pipeline.Event )
149150 logLines = make (chan pipeline.Event )
150151
151152 startParserRoutines (ctx , g , cConfig , parsers , sd .StageParse )
152- startBucketRoutines (ctx , g , cConfig , sd .Pour )
153+ startBucketRoutines (ctx , g , cConfig , sd .Pour , bucketStore )
153154
154155 apiClient , err := apiclient .GetLAPIClient ()
155156 if err != nil {
@@ -158,7 +159,7 @@ func runCrowdsec(
158159
159160 startHeartBeat (ctx , cConfig , apiClient )
160161
161- startOutputRoutines (ctx , cConfig , parsers , apiClient , sd )
162+ startOutputRoutines (ctx , cConfig , parsers , apiClient , sd , bucketStore )
162163
163164 if err := startLPMetrics (ctx , cConfig , apiClient , hub , datasources ); err != nil {
164165 return err
@@ -187,6 +188,8 @@ func serveCrowdsec(
187188
188189 var g errgroup.Group
189190
191+ bucketStore := leakybucket .NewBucketStore ()
192+
190193 crowdsecTomb .Go (func () error {
191194 defer trace .CatchPanic ("crowdsec/serveCrowdsec" )
192195
@@ -197,7 +200,7 @@ func serveCrowdsec(
197200
198201 agentReady <- true
199202
200- if err := runCrowdsec (cctx , & g , cConfig , parsers , hub , datasources , sd ); err != nil {
203+ if err := runCrowdsec (cctx , & g , cConfig , parsers , hub , datasources , sd , bucketStore ); err != nil {
201204 log .Fatalf ("unable to start crowdsec routines: %s" , err )
202205 }
203206 }()
0 commit comments