Skip to content

Commit d7fe26a

Browse files
fix(worker): eliminate race condition and cleanup issues in asynq server test
Replace shared mutable strings with channels for synchronization, use select with timeout instead of a fixed time.Sleep, fix require.Equal argument order, and reorder cleanup so the asynq server shuts down before Redis is terminated.
1 parent 440f638 commit d7fe26a

File tree

1 file changed

+26
-13
lines changed

1 file changed

+26
-13
lines changed

pkg/worker/asynq/server_test.go

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,32 @@ func TestServer(t *testing.T) {
2424
redisContainer, err := redis.Run(ctx, image)
2525
require.NoError(t, err)
2626

27-
t.Cleanup(func() {
28-
require.NoError(t, redisContainer.Terminate(ctx))
29-
})
30-
3127
redisConnStr, err := redisContainer.ConnectionString(ctx)
3228
require.NoError(t, err)
3329

3430
// Setup server and handlers
3531
srv := asynq.NewServer(redisConnStr, asynq.BatchConfig(2, 1, 1))
36-
defer srv.Shutdown()
3732

38-
assertTaskPayload := ""
33+
// Shutdown the server before terminating Redis to avoid connection refused spam.
34+
t.Cleanup(func() {
35+
srv.Shutdown()
36+
require.NoError(t, redisContainer.Terminate(ctx))
37+
})
38+
39+
taskCalled := make(chan string, 1)
3940
srv.HandleTask("queue:task", func(_ context.Context, payload []byte) error {
40-
assertTaskPayload = string(payload)
41+
taskCalled <- string(payload)
4142

4243
return nil
4344
})
4445

45-
assertCronPayload := ""
46+
cronCalled := make(chan struct{})
4647
srv.HandleCron("* * * * *", func(_ context.Context) error {
47-
assertCronPayload = "cron was called"
48+
select {
49+
case <-cronCalled:
50+
default:
51+
close(cronCalled)
52+
}
4853

4954
return nil
5055
})
@@ -58,8 +63,16 @@ func TestServer(t *testing.T) {
5863
_, err = asynqClient.Enqueue(asynqlib.NewTask("queue:task", []byte("task was called")), asynqlib.Queue("queue"))
5964
require.NoError(t, err)
6065

61-
// Assert that tasks was called. We sleep for 1 minute to wait the server process the cronjob
62-
time.Sleep(1 * time.Minute)
63-
require.Equal(t, assertTaskPayload, "task was called")
64-
require.Equal(t, assertCronPayload, "cron was called")
66+
select {
67+
case payload := <-taskCalled:
68+
require.Equal(t, "task was called", payload)
69+
case <-time.After(10 * time.Second):
70+
t.Fatal("task was not processed within 10s")
71+
}
72+
73+
select {
74+
case <-cronCalled:
75+
case <-time.After(2 * time.Minute):
76+
t.Fatal("cron did not fire within 2 minutes")
77+
}
6578
}

0 commit comments

Comments
 (0)