Skip to content

Commit 8f9e2bc

Browse files
fix: improve duration executor order and use timeout handler for checks (#479)
* fix: improve duration executor order and use timeout handler for checks * fix: improve duration executor and add ci-load to config
1 parent 82675f1 commit 8f9e2bc

File tree

3 files changed

+93
-70
lines changed

3 files changed

+93
-70
lines changed

cmd/beekeeper/cmd/check.go

Lines changed: 63 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -29,71 +29,70 @@ func (c *command) initCheckCmd() error {
2929
Short: "runs integration tests on a Bee cluster",
3030
Long: `runs integration tests on a Bee cluster.`,
3131
RunE: func(cmd *cobra.Command, args []string) error {
32-
ctx, cancel := context.WithTimeout(cmd.Context(), c.globalConfig.GetDuration(optionNameTimeout))
33-
defer cancel()
34-
35-
checks := c.globalConfig.GetStringSlice(optionNameChecks)
36-
if len(checks) == 0 {
37-
return fmt.Errorf("no checks provided")
38-
}
39-
40-
clusterName := c.globalConfig.GetString(optionNameClusterName)
41-
if clusterName == "" {
42-
return errMissingClusterName
43-
}
44-
45-
// set cluster config
46-
cfgCluster, ok := c.config.Clusters[clusterName]
47-
if !ok {
48-
return fmt.Errorf("cluster %s not defined", clusterName)
49-
}
50-
51-
cluster, err := c.setupCluster(ctx, clusterName, c.globalConfig.GetBool(optionNameCreateCluster))
52-
if err != nil {
53-
return fmt.Errorf("cluster setup: %w", err)
54-
}
55-
56-
var (
57-
metricsPusher *push.Pusher
58-
metricsEnabled = c.globalConfig.GetBool(optionNameMetricsEnabled)
59-
cleanup func()
60-
)
61-
62-
if metricsEnabled {
63-
metricsPusher, cleanup = newMetricsPusher(c.globalConfig.GetString(optionNameMetricsPusherAddress), cfgCluster.GetNamespace(), c.log)
64-
// cleanup executes when the calling context terminates
65-
defer cleanup()
66-
}
67-
68-
// logger metrics
69-
if l, ok := c.log.(metrics.Reporter); ok && metricsEnabled {
70-
metrics.RegisterCollectors(metricsPusher, l.Report()...)
71-
}
72-
73-
// tracing
74-
tracingEndpoint := c.globalConfig.GetString(optionNameTracingEndpoint)
75-
if c.globalConfig.IsSet(optionNameTracingHost) && c.globalConfig.IsSet(optionNameTracingPort) {
76-
tracingEndpoint = strings.Join([]string{c.globalConfig.GetString(optionNameTracingHost), c.globalConfig.GetString(optionNameTracingPort)}, ":")
77-
}
78-
tracer, tracerCloser, err := tracing.NewTracer(&tracing.Options{
79-
Enabled: c.globalConfig.GetBool(optionNameTracingEnabled),
80-
Endpoint: tracingEndpoint,
81-
ServiceName: c.globalConfig.GetString(optionNameTracingServiceName),
32+
return c.withTimeoutHandler(cmd, func(ctx context.Context) error {
33+
checks := c.globalConfig.GetStringSlice(optionNameChecks)
34+
if len(checks) == 0 {
35+
return fmt.Errorf("no checks provided")
36+
}
37+
38+
clusterName := c.globalConfig.GetString(optionNameClusterName)
39+
if clusterName == "" {
40+
return errMissingClusterName
41+
}
42+
43+
// set cluster config
44+
cfgCluster, ok := c.config.Clusters[clusterName]
45+
if !ok {
46+
return fmt.Errorf("cluster %s not defined", clusterName)
47+
}
48+
49+
cluster, err := c.setupCluster(ctx, clusterName, c.globalConfig.GetBool(optionNameCreateCluster))
50+
if err != nil {
51+
return fmt.Errorf("cluster setup: %w", err)
52+
}
53+
54+
var (
55+
metricsPusher *push.Pusher
56+
metricsEnabled = c.globalConfig.GetBool(optionNameMetricsEnabled)
57+
cleanup func()
58+
)
59+
60+
if metricsEnabled {
61+
metricsPusher, cleanup = newMetricsPusher(c.globalConfig.GetString(optionNameMetricsPusherAddress), cfgCluster.GetNamespace(), c.log)
62+
// cleanup executes when the calling context terminates
63+
defer cleanup()
64+
}
65+
66+
// logger metrics
67+
if l, ok := c.log.(metrics.Reporter); ok && metricsEnabled {
68+
metrics.RegisterCollectors(metricsPusher, l.Report()...)
69+
}
70+
71+
// tracing
72+
tracingEndpoint := c.globalConfig.GetString(optionNameTracingEndpoint)
73+
if c.globalConfig.IsSet(optionNameTracingHost) && c.globalConfig.IsSet(optionNameTracingPort) {
74+
tracingEndpoint = strings.Join([]string{c.globalConfig.GetString(optionNameTracingHost), c.globalConfig.GetString(optionNameTracingPort)}, ":")
75+
}
76+
tracer, tracerCloser, err := tracing.NewTracer(&tracing.Options{
77+
Enabled: c.globalConfig.GetBool(optionNameTracingEnabled),
78+
Endpoint: tracingEndpoint,
79+
ServiceName: c.globalConfig.GetString(optionNameTracingServiceName),
80+
})
81+
if err != nil {
82+
return fmt.Errorf("tracer: %w", err)
83+
}
84+
defer tracerCloser.Close()
85+
86+
// set global config
87+
checkGlobalConfig := config.CheckGlobalConfig{
88+
Seed: c.globalConfig.GetInt64(optionNameSeed),
89+
GethURL: c.globalConfig.GetString(optionNameGethURL),
90+
}
91+
92+
checkRunner := check.NewCheckRunner(checkGlobalConfig, c.config.Checks, cluster, metricsPusher, tracer, c.log)
93+
94+
return checkRunner.Run(ctx, checks)
8295
})
83-
if err != nil {
84-
return fmt.Errorf("tracer: %w", err)
85-
}
86-
defer tracerCloser.Close()
87-
88-
// set global config
89-
checkGlobalConfig := config.CheckGlobalConfig{
90-
Seed: c.globalConfig.GetInt64(optionNameSeed),
91-
GethURL: c.globalConfig.GetString(optionNameGethURL),
92-
}
93-
94-
checkRunner := check.NewCheckRunner(checkGlobalConfig, c.config.Checks, cluster, metricsPusher, tracer, c.log)
95-
96-
return checkRunner.Run(ctx, checks)
9796
},
9897
PreRunE: c.preRunE,
9998
}

config/local.yaml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,23 @@ checks:
293293
duration: 10m
294294
timeout: 11m
295295
type: smoke
296+
ci-load:
297+
options:
298+
content-size: 50000
299+
postage-ttl: 24h
300+
postage-depth: 21
301+
postage-label: test-label
302+
duration: 10m
303+
uploader-count: 2
304+
downloader-count: 0
305+
max-committed-depth: 2
306+
committed-depth-check-wait: 5m
307+
upload-groups:
308+
- bee
309+
download-groups:
310+
- light
311+
timeout: 30m
312+
type: load
296313
ci-soc:
297314
options:
298315
postage-ttl: 24h

pkg/scheduler/duration.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package scheduler
22

33
import (
44
"context"
5+
"errors"
56
"time"
67

78
"github.com/ethersphere/beekeeper/pkg/logging"
@@ -23,23 +24,29 @@ func NewDurationExecutor(duration time.Duration, log logging.Logger) *DurationEx
2324

2425
// Run executes the given task and waits for the specified duration before stopping.
2526
func (de *DurationExecutor) Run(ctx context.Context, task func(ctx context.Context) error) error {
27+
if task == nil {
28+
return errors.New("task cannot be nil")
29+
}
30+
2631
ctx, cancel := context.WithCancel(ctx)
2732
defer cancel()
2833

2934
doneCh := make(chan error, 1)
30-
defer close(doneCh)
3135

3236
go func() {
33-
doneCh <- task(ctx)
37+
select {
38+
case <-ctx.Done():
39+
case doneCh <- task(ctx):
40+
}
3441
}()
3542

3643
select {
37-
case err := <-doneCh:
38-
return err
44+
case <-ctx.Done():
45+
return ctx.Err()
3946
case <-time.After(de.duration):
4047
de.log.Infof("Duration of %s expired, stopping executor", de.duration)
4148
return nil
42-
case <-ctx.Done():
43-
return ctx.Err()
49+
case err := <-doneCh:
50+
return err
4451
}
4552
}

0 commit comments

Comments
 (0)