diff --git a/benchmark/benchmark.ts b/benchmark/benchmark.ts index 1592b68..5d3317f 100644 --- a/benchmark/benchmark.ts +++ b/benchmark/benchmark.ts @@ -474,6 +474,7 @@ class BullMQAdapter extends QueueAdapter { { stdio: ['pipe', 'pipe', 'pipe'], cwd: process.cwd(), + shell: true, }, ); @@ -739,6 +740,7 @@ class GroupMQAdapter extends QueueAdapter { { stdio: ['pipe', 'pipe', 'pipe'], cwd: process.cwd(), + shell: true, }, ); diff --git a/benchmark/results/groupmq.json b/benchmark/results/groupmq.json index b9c8839..12b94d1 100644 --- a/benchmark/results/groupmq.json +++ b/benchmark/results/groupmq.json @@ -2068,11 +2068,56 @@ "192.168.158.1:27005", "62" ], - [1, 1761573744, 16708, ["INFO"], "192.168.158.1:27005", "62"], - [3, 1761573744, 15298, ["INFO"], "192.168.158.1:29300", "64"], - [2, 1761573744, 15070, ["INFO"], "192.168.158.1:18719", "63"], - [0, 1761573744, 15185, ["INFO"], "192.168.158.1:21881", "61"], - [9, 1761573744, 14477, ["INFO"], "192.168.158.1:25249", "60"] + [ + 1, + 1761573744, + 16708, + [ + "INFO" + ], + "192.168.158.1:27005", + "62" + ], + [ + 3, + 1761573744, + 15298, + [ + "INFO" + ], + "192.168.158.1:29300", + "64" + ], + [ + 2, + 1761573744, + 15070, + [ + "INFO" + ], + "192.168.158.1:18719", + "63" + ], + [ + 0, + 1761573744, + 15185, + [ + "INFO" + ], + "192.168.158.1:21881", + "61" + ], + [ + 9, + 1761573744, + 14477, + [ + "INFO" + ], + "192.168.158.1:25249", + "60" + ] ], "latency": [], "info": { @@ -5992,5 +6037,2011 @@ "jobType": "cpu", "multiProcess": true } + }, + { + "timestamp": 1774447976563, + "queueType": "groupmq", + "jobType": "cpu", + "totalJobs": 500, + "workersCount": 4, + "completedJobs": 500, + "durationMs": 180924, + "throughputJobsPerSec": 2.76, + "avgPickupMs": 122396.25, + "avgProcessingMs": 691.23, + "avgTotalMs": 123087.48, + "p95PickupMs": 167186, + "p95ProcessingMs": 1072.62, + "p95TotalMs": 167743.53, + "peakCpuPercent": 60.9, + "peakMemoryMB": 111.6, + "avgCpuPercent": 0.3, + "avgMemoryMB": 70.9, + "redisStats": { + "commandstats": { + "config|resetstat": { + "calls": 1, + "usec": 92, + "usec_per_call": 92, + "rejected_calls": 0, + "failed_calls": 0 + }, + "incr": { + "calls": 500, + "usec": 3048, + "usec_per_call": 6.1, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zrangebyscore": { + "calls": 156, + "usec": 957, + "usec_per_call": 6.13, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zadd": { + "calls": 2134, + "usec": 15725, + "usec_per_call": 7.37, + "rejected_calls": 0, + "failed_calls": 0 + }, + "bzpopmin": { + "calls": 93, + "usec": 2263, + "usec_per_call": 24.33, + "rejected_calls": 0, + "failed_calls": 0 + }, + "evalsha": { + "calls": 2258, + "usec": 478746, + "usec_per_call": 212.02, + "rejected_calls": 0, + "failed_calls": 0 + }, + "sadd": { + "calls": 500, + "usec": 1427, + "usec_per_call": 2.85, + "rejected_calls": 0, + "failed_calls": 0 + }, + "hset": { + "calls": 2500, + "usec": 10905, + "usec_per_call": 4.36, + "rejected_calls": 0, + "failed_calls": 0 + }, + "srem": { + "calls": 50, + "usec": 162, + "usec_per_call": 3.24, + "rejected_calls": 0, + "failed_calls": 0 + }, + "hmget": { + "calls": 500, + "usec": 53949, + "usec_per_call": 107.9, + "rejected_calls": 0, + "failed_calls": 0 + }, + "get": { + "calls": 734, + "usec": 2756, + "usec_per_call": 3.75, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zpopmin": { + "calls": 550, + "usec": 3448, + "usec_per_call": 6.27, + "rejected_calls": 0, + "failed_calls": 0 + }, + "llen": { + "calls": 355, + "usec": 488, + "usec_per_call": 1.37, + "rejected_calls": 0, + "failed_calls": 0 + }, + "del": { + "calls": 1550, + "usec": 29752, + "usec_per_call": 19.19, + "rejected_calls": 0, + "failed_calls": 0 + }, + "info": { + "calls": 8, + "usec": 50598, + "usec_per_call": 6324.75, + "rejected_calls": 0, + "failed_calls": 0 + }, + "script|load": { + "calls": 28, + "usec": 13706, + "usec_per_call": 489.5, + "rejected_calls": 0, + "failed_calls": 0 + }, + "lindex": { + "calls": 500, + "usec": 4829, + "usec_per_call": 9.66, + "rejected_calls": 0, + "failed_calls": 0 + }, + "smembers": { + "calls": 276, + "usec": 6506, + "usec_per_call": 23.57, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zcard": { + "calls": 11042, + "usec": 12030, + "usec_per_call": 1.09, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zscore": { + "calls": 500, + "usec": 2145, + "usec_per_call": 4.29, + "rejected_calls": 0, + "failed_calls": 0 + }, + "hmset": { + "calls": 500, + "usec": 12474, + "usec_per_call": 24.95, + "rejected_calls": 0, + "failed_calls": 0 + }, + "set": { + "calls": 856, + "usec": 6859, + "usec_per_call": 8.01, + "rejected_calls": 0, + "failed_calls": 0 + }, + "time": { + "calls": 500, + "usec": 1455, + "usec_per_call": 2.91, + "rejected_calls": 0, + "failed_calls": 0 + }, + "hget": { + "calls": 550, + "usec": 4344, + "usec_per_call": 7.9, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zrem": { + "calls": 600, + "usec": 2702, + "usec_per_call": 4.5, + "rejected_calls": 0, + "failed_calls": 0 + }, + "lpop": { + "calls": 500, + "usec": 2084, + "usec_per_call": 4.17, + "rejected_calls": 0, + "failed_calls": 0 + }, + "lpush": { + "calls": 500, + "usec": 4363, + "usec_per_call": 8.73, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zrange": { + "calls": 1430, + "usec": 7102, + "usec_per_call": 4.97, + "rejected_calls": 0, + "failed_calls": 0 + }, + "slowlog|reset": { + "calls": 1, + "usec": 2, + "usec_per_call": 2, + "rejected_calls": 0, + "failed_calls": 0 + } + }, + "slowlog": [ + [ + 8, + 1774448131, + 12041, + [ + "evalsha", + "f5824e566e2e2dd09d7be5a88604eeb9bc4ddc3b", + "1", + "groupmq:benchmark-1774447976564-7scy33vgdxe}", + "94e522db-eb3a-4e8b-a48d-05103b78ee57", + "group-37", + "completed", + "1774448131924", + "null", + "0", + "0", + "1774448130422", + "1774448131924", + "0", + "3", + "1774448131924", + "30000" + ], + "192.168.16.1:46238", + "" + ], + [ + 7, + 1774448114, + 25922, + [ + "evalsha", + "f5824e566e2e2dd09d7be5a88604eeb9bc4ddc3b", + "1", + "groupmq:benchmark-1774447976564-7scy33vgdxe}", + "f2c16787-adcf-4ab3-bfa1-ff6778824496", + "group-28", + "completed", + "1774448114465", + "null", + "0", + "0", + "1774448113847", + "1774448114465", + "0", + "3", + "1774448114465", + "30000" + ], + "192.168.16.1:46244", + "" + ], + [ + 6, + 1774448114, + 25436, + [ + "DEL", + "groupmq:benchmark-1774447976564-7scy33vgdxe}:processing:f2c16787-adcf-4ab3-bfa1-ff6778824496" + ], + "", + "" + ], + [ + 5, + 1774448111, + 10684, + [ + "evalsha", + "f5824e566e2e2dd09d7be5a88604eeb9bc4ddc3b", + "1", + "groupmq:benchmark-1774447976564-7scy33vgdxe}", + "70852d02-d7fc-432e-812c-542796f4ef46", + "group-29", + "completed", + "1774448111518", + "null", + "0", + "0", + "1774448111040", + "1774448111518", + "0", + "3", + "1774448111518", + "30000" + ], + "192.168.16.1:46294", + "" + ], + [ + 4, + 1774448072, + 16241, + [ + "evalsha", + "f5824e566e2e2dd09d7be5a88604eeb9bc4ddc3b", + "1", + "groupmq:benchmark-1774447976564-7scy33vgdxe}", + "4d4bb834-c9fb-452b-8746-2d20a1cdefb2", + "group-8", + "completed", + "1774448072011", + "null", + "0", + "0", + "1774448071441", + "1774448072011", + "0", + "3", + "1774448072011", + "30000" + ], + "192.168.16.1:46244", + "" + ], + [ + 3, + 1774448072, + 15999, + [ + "HMGET", + "groupmq:benchmark-1774447976564-7scy33vgdxe}:job:ca8ddd70-a84d-4a0c-b7f6-1ee778ff1ea9", + "id", + "groupId", + "data", + "attempts", + "maxAttempts", + "seq", + "timestamp", + "orderMs", + "score" + ], + "", + "" + ], + [ + 2, + 1774448046, + 34033, + [ + "evalsha", + "f5824e566e2e2dd09d7be5a88604eeb9bc4ddc3b", + "1", + "groupmq:benchmark-1774447976564-7scy33vgdxe}", + "0e8ededd-a7ea-433b-a4bc-7d92b2e245b2", + "group-1", + "completed", + "1774448046358", + "null", + "0", + "0", + "1774448045841", + "1774448046358", + "0", + "3", + "1774448046358", + "30000" + ], + "192.168.16.1:46244", + "" + ], + [ + 1, + 1774448046, + 33665, + [ + "HMGET", + "groupmq:benchmark-1774447976564-7scy33vgdxe}:job:5d98200c-b400-410c-8c96-0f11b4c8fe96", + "id", + "groupId", + "data", + "attempts", + "maxAttempts", + "seq", + "timestamp", + "orderMs", + "score" + ], + "", + "" + ], + [ + 0, + 1774448044, + 49439, + [ + "info" + ], + "192.168.16.1:46238", + "" + ] + ], + "latency": [], + "info": { + "used_memory": 1885264, + "used_memory_human": "1.80M", + "used_memory_peak": 2148320, + "used_memory_peak_human": "2.05M", + "total_commands_processed": 29674, + "instantaneous_ops_per_sec": 259, + "total_net_input_bytes": 711183, + "total_net_output_bytes": 251646, + "keyspace_hits": 14246, + "keyspace_misses": 1797 + }, + "summary": { + "totalCalls": 29672, + "totalUsec": 734917, + "avgUsecPerCall": 24.768030466433, + "commandCount": 29, + "topCommands": [ + { + "command": "zcard", + "calls": 11042, + "usec": 12030, + "usec_per_call": 1.09, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "hset", + "calls": 2500, + "usec": 10905, + "usec_per_call": 4.36, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "evalsha", + "calls": 2258, + "usec": 478746, + "usec_per_call": 212.02, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "zadd", + "calls": 2134, + "usec": 15725, + "usec_per_call": 7.37, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "del", + "calls": 1550, + "usec": 29752, + "usec_per_call": 19.19, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "zrange", + "calls": 1430, + "usec": 7102, + "usec_per_call": 4.97, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "set", + "calls": 856, + "usec": 6859, + "usec_per_call": 8.01, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "get", + "calls": 734, + "usec": 2756, + "usec_per_call": 3.75, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "zrem", + "calls": 600, + "usec": 2702, + "usec_per_call": 4.5, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "zpopmin", + "calls": 550, + "usec": 3448, + "usec_per_call": 6.27, + "rejected_calls": 0, + "failed_calls": 0 + } + ] + } + }, + "settings": { + "mq": "groupmq", + "jobs": 500, + "workers": 4, + "jobType": "cpu", + "multiProcess": true + } + }, + { + "timestamp": 1774456776815, + "queueType": "groupmq", + "jobType": "cpu", + "totalJobs": 500, + "workersCount": 4, + "completedJobs": 500, + "durationMs": 119841, + "throughputJobsPerSec": 4.17, + "avgPickupMs": 55283.48, + "avgProcessingMs": 805.64, + "avgTotalMs": 56089.12, + "p95PickupMs": 95647, + "p95ProcessingMs": 1449.01, + "p95TotalMs": 96328.71, + "peakCpuPercent": 62.4, + "peakMemoryMB": 116.4, + "avgCpuPercent": 0.2, + "avgMemoryMB": 77.1, + "redisStats": { + "commandstats": { + "config|resetstat": { + "calls": 1, + "usec": 7245, + "usec_per_call": 7245, + "rejected_calls": 0, + "failed_calls": 0 + }, + "incr": { + "calls": 500, + "usec": 67743, + "usec_per_call": 135.49, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zrangebyscore": { + "calls": 149, + "usec": 145467, + "usec_per_call": 976.29, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zadd": { + "calls": 1957, + "usec": 122827, + "usec_per_call": 62.76, + "rejected_calls": 0, + "failed_calls": 0 + }, + "bzpopmin": { + "calls": 19, + "usec": 3004, + "usec_per_call": 158.11, + "rejected_calls": 0, + "failed_calls": 0 + }, + "evalsha": { + "calls": 1651, + "usec": 1087021, + "usec_per_call": 658.4, + "rejected_calls": 0, + "failed_calls": 0 + }, + "sadd": { + "calls": 500, + "usec": 2096, + "usec_per_call": 4.19, + "rejected_calls": 0, + "failed_calls": 0 + }, + "hset": { + "calls": 2500, + "usec": 10070, + "usec_per_call": 4.03, + "rejected_calls": 0, + "failed_calls": 0 + }, + "srem": { + "calls": 59, + "usec": 225, + "usec_per_call": 3.81, + "rejected_calls": 0, + "failed_calls": 0 + }, + "hmget": { + "calls": 500, + "usec": 4961, + "usec_per_call": 9.92, + "rejected_calls": 0, + "failed_calls": 0 + }, + "get": { + "calls": 294, + "usec": 1038, + "usec_per_call": 3.53, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zpopmin": { + "calls": 559, + "usec": 3871, + "usec_per_call": 6.92, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zremrangebyrank": { + "calls": 2, + "usec": 602, + "usec_per_call": 301, + "rejected_calls": 0, + "failed_calls": 0 + }, + "llen": { + "calls": 159, + "usec": 315, + "usec_per_call": 1.98, + "rejected_calls": 0, + "failed_calls": 0 + }, + "del": { + "calls": 1559, + "usec": 5460, + "usec_per_call": 3.5, + "rejected_calls": 0, + "failed_calls": 0 + }, + "info": { + "calls": 8, + "usec": 2496, + "usec_per_call": 312, + "rejected_calls": 0, + "failed_calls": 0 + }, + "script|load": { + "calls": 27, + "usec": 54994, + "usec_per_call": 2036.81, + "rejected_calls": 0, + "failed_calls": 0 + }, + "lindex": { + "calls": 600, + "usec": 2913, + "usec_per_call": 4.86, + "rejected_calls": 0, + "failed_calls": 0 + }, + "smembers": { + "calls": 146, + "usec": 4392, + "usec_per_call": 30.08, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zcard": { + "calls": 5396, + "usec": 17906, + "usec_per_call": 3.32, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zscore": { + "calls": 600, + "usec": 2780, + "usec_per_call": 4.63, + "rejected_calls": 0, + "failed_calls": 0 + }, + "hmset": { + "calls": 500, + "usec": 43007, + "usec_per_call": 86.01, + "rejected_calls": 0, + "failed_calls": 0 + }, + "set": { + "calls": 814, + "usec": 57533, + "usec_per_call": 70.68, + "rejected_calls": 0, + "failed_calls": 0 + }, + "time": { + "calls": 500, + "usec": 1555, + "usec_per_call": 3.11, + "rejected_calls": 0, + "failed_calls": 0 + }, + "hget": { + "calls": 659, + "usec": 5591, + "usec_per_call": 8.48, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zrem": { + "calls": 616, + "usec": 2794, + "usec_per_call": 4.54, + "rejected_calls": 0, + "failed_calls": 0 + }, + "lpop": { + "calls": 500, + "usec": 1991, + "usec_per_call": 3.98, + "rejected_calls": 0, + "failed_calls": 0 + }, + "lpush": { + "calls": 500, + "usec": 2653, + "usec_per_call": 5.31, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zrange": { + "calls": 1204, + "usec": 5434, + "usec_per_call": 4.51, + "rejected_calls": 0, + "failed_calls": 0 + }, + "slowlog|reset": { + "calls": 1, + "usec": 641, + "usec_per_call": 641, + "rejected_calls": 0, + "failed_calls": 0 + } + }, + "slowlog": [ + [ + 28, + 1774456894, + 18300, + [ + "script", + "load", + "-- Atomic reserve operation that checks lock and reserves in one operation\r\n-- argv: ns, nowEpochMs, vtMs, targetGroupId, allowe... (5658 more bytes)" + ], + "192.168.16.1:33890", + "" + ], + [ + 27, + 1774456875, + 10195, + [ + "evalsha", + "f5824e566e2e2dd09d7be5a88604eeb9bc4ddc3b", + "1", + "groupmq:benchmark-1774456776815-nk6558dw7yp}", + "d036dcd6-ac0f-4e03-a668-4b5c056ff0f7", + "group-41", + "completed", + "1774456875356", + "null", + "0", + "0", + "1774456874746", + "1774456875356", + "0", + "3", + "1774456875356", + "30000" + ], + "192.168.16.1:33890", + "" + ], + [ + 26, + 1774456873, + 26014, + [ + "evalsha", + "f5824e566e2e2dd09d7be5a88604eeb9bc4ddc3b", + "1", + "groupmq:benchmark-1774456776815-nk6558dw7yp}", + "823b3a4b-8593-4191-a166-d758470c9535", + "group-40", + "completed", + "1774456873658", + "null", + "0", + "0", + "1774456873171", + "1774456873658", + "0", + "3", + "1774456873658", + "30000" + ], + "192.168.16.1:33880", + "" + ], + [ + 25, + 1774456834, + 11523, + [ + "evalsha", + "f5824e566e2e2dd09d7be5a88604eeb9bc4ddc3b", + "1", + "groupmq:benchmark-1774456776815-nk6558dw7yp}", + "ccd0a46d-38c1-4711-9e00-ea72a0645aa6", + "group-20", + "completed", + "1774456834918", + "null", + "0", + "0", + "1774456834370", + "1774456834918", + "0", + "3", + "1774456834918", + "30000" + ], + "192.168.16.1:33890", + "" + ], + [ + 24, + 1774456819, + 29648, + [ + "zrangebyscore", + "groupmq:benchmark-1774456776815-nk6558dw7yp}:repeat:schedule", + "0", + "1774456814325", + "LIMIT", + "0", + "1" + ], + "192.168.16.1:33894", + "" + ], + [ + 23, + 1774456797, + 102897, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774456776815-nk6558dw7yp}", + "group-20", + "{\"id\":\"job-270\",\"enqueuedAt\":1774456797519}", + "3", + "1774456797519", + "0", + "3ae11392-b7d7-475b-aaf8-bb358656b0eb", + "1", + "1774456797519", + "0" + ], + "192.168.16.1:58278", + "" + ], + [ + 22, + 1774456797, + 102591, + [ + "ZADD", + "groupmq:benchmark-1774456776815-nk6558dw7yp}:g:group-20", + "70389597519271", + "3ae11392-b7d7-475b-aaf8-bb358656b0eb" + ], + "", + "" + ], + [ + 21, + 1774456794, + 61409, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774456776815-nk6558dw7yp}", + "group-0", + "{\"id\":\"job-200\",\"enqueuedAt\":1774456794072}", + "3", + "1774456794072", + "0", + "da3ada4e-d652-4576-975f-c881989a786d", + "1", + "1774456794072", + "0" + ], + "192.168.16.1:58278", + "" + ], + [ + 20, + 1774456794, + 60926, + [ + "INCR", + "groupmq:benchmark-1774456776815-nk6558dw7yp}:seq:20537" + ], + "", + "" + ], + [ + 19, + 1774456793, + 108335, + [ + "evalsha", + "d7ae771de7b56baf78e8edce3b3e78a7d9a2d1a2", + "1", + "groupmq:benchmark-1774456776815-nk6558dw7yp}", + "1774456793654", + "30000", + "1" + ], + "192.168.16.1:33920", + "" + ], + [ + 18, + 1774456793, + 108091, + [ + "ZRANGEBYSCORE", + "groupmq:benchmark-1774456776815-nk6558dw7yp}:processing", + "0", + "1774456793654" + ], + "", + "" + ], + [ + 17, + 1774456788, + 29518, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774456776815-nk6558dw7yp}", + "group-0", + "{\"id\":\"job-150\",\"enqueuedAt\":1774456788601}", + "3", + "1774456788601", + "0", + "f5afce1f-7047-471f-a222-566cca0862b2", + "1", + "1774456788601", + "0" + ], + "192.168.16.1:58278", + "" + ], + [ + 16, + 1774456788, + 29215, + [ + "HMSET", + "groupmq:benchmark-1774456776815-nk6558dw7yp}:job:f5afce1f-7047-471f-a222-566cca0862b2", + "id", + "f5afce1f-7047-471f-a222-566cca0862b2", + "groupId", + "group-0", + "data", + "{\"id\":\"job-150\",\"enqueuedAt\":1774456788601}", + "attempts", + "0", + "maxAttempts", + "3", + "seq", + "151", + "timestamp", + "1774456788601", + "orderMs", + "1774456788601", + "score", + "70389588601151", + "delayUntil", + "0" + ], + "", + "" + ], + [ + 15, + 1774456786, + 28541, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774456776815-nk6558dw7yp}", + "group-10", + "{\"id\":\"job-60\",\"enqueuedAt\":1774456786204}", + "3", + "1774456786204", + "0", + "b8cf5788-d7d3-44a6-8cbd-6617faa42024", + "1", + "1774456786204", + "0" + ], + "192.168.16.1:58278", + "" + ], + [ + 14, + 1774456785, + 44619, + [ + "evalsha", + "d7ae771de7b56baf78e8edce3b3e78a7d9a2d1a2", + "1", + "groupmq:benchmark-1774456776815-nk6558dw7yp}", + "1774456784589", + "30000", + "1" + ], + "192.168.16.1:33894", + "" + ], + [ + 13, + 1774456778, + 72356, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774456776815-nk6558dw7yp}", + "group-3", + "{\"id\":\"job-3\",\"enqueuedAt\":1774456778489}", + "3", + "1774456778489", + "0", + "497be6f7-079f-47f9-a2dc-9edab5109011", + "1", + "1774456778489", + "0" + ], + "192.168.16.1:58278", + "" + ], + [ + 12, + 1774456778, + 128219, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774456776815-nk6558dw7yp}", + "group-2", + "{\"id\":\"job-2\",\"enqueuedAt\":1774456778290}", + "3", + "1774456778290", + "0", + "e918dab3-6407-4c19-a9a9-75b31bdd623b", + "1", + "1774456778290", + "0" + ], + "192.168.16.1:58278", + "" + ], + [ + 11, + 1774456778, + 41781, + [ + "SET", + "groupmq:benchmark-1774456776815-nk6558dw7yp}:unique:e918dab3-6407-4c19-a9a9-75b31bdd623b", + "e918dab3-6407-4c19-a9a9-75b31bdd623b", + "NX" + ], + "", + "" + ], + [ + 10, + 1774456778, + 67314, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774456776815-nk6558dw7yp}", + "group-0", + "{\"id\":\"job-0\",\"enqueuedAt\":1774456777952}", + "3", + "1774456777952", + "0", + "dd042f74-67ba-40d1-845b-ab51e15d4659", + "1", + "1774456777953", + "0" + ], + "192.168.16.1:58278", + "" + ], + [ + 9, + 1774456778, + 30327, + [ + "script", + "load", + "-- argv: ns, groupId, dataJson, maxAttempts, orderMs, delayUntil, jobId, keepCompleted, clientTimestamp, orderingDelayMs\r\nlocal ... (5677 more bytes)" + ], + "192.168.16.1:58278", + "" + ] + ], + "latency": [], + "info": { + "used_memory": 1895640, + "used_memory_human": "1.81M", + "used_memory_peak": 2148320, + "used_memory_peak_human": "2.05M", + "total_commands_processed": 22500, + "instantaneous_ops_per_sec": 529, + "total_net_input_bytes": 590665, + "total_net_output_bytes": 242725, + "keyspace_hits": 8707, + "keyspace_misses": 1013 + }, + "summary": { + "totalCalls": 22480, + "totalUsec": 1668625, + "avgUsecPerCall": 74.22709074733096, + "commandCount": 30, + "topCommands": [ + { + "command": "zcard", + "calls": 5396, + "usec": 17906, + "usec_per_call": 3.32, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "hset", + "calls": 2500, + "usec": 10070, + "usec_per_call": 4.03, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "zadd", + "calls": 1957, + "usec": 122827, + "usec_per_call": 62.76, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "evalsha", + "calls": 1651, + "usec": 1087021, + "usec_per_call": 658.4, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "del", + "calls": 1559, + "usec": 5460, + "usec_per_call": 3.5, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "zrange", + "calls": 1204, + "usec": 5434, + "usec_per_call": 4.51, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "set", + "calls": 814, + "usec": 57533, + "usec_per_call": 70.68, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "hget", + "calls": 659, + "usec": 5591, + "usec_per_call": 8.48, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "zrem", + "calls": 616, + "usec": 2794, + "usec_per_call": 4.54, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "lindex", + "calls": 600, + "usec": 2913, + "usec_per_call": 4.86, + "rejected_calls": 0, + "failed_calls": 0 + } + ] + } + }, + "settings": { + "mq": "groupmq", + "jobs": 500, + "workers": 4, + "jobType": "cpu", + "multiProcess": true + } + }, + { + "timestamp": 1774458318858, + "queueType": "groupmq", + "jobType": "cpu", + "totalJobs": 500, + "workersCount": 4, + "completedJobs": 501, + "durationMs": 137749, + "throughputJobsPerSec": 3.64, + "avgPickupMs": 59782.75, + "avgProcessingMs": 789.23, + "avgTotalMs": 60571.99, + "p95PickupMs": 121346, + "p95ProcessingMs": 1425.02, + "p95TotalMs": 122311.9, + "peakCpuPercent": 101.5, + "peakMemoryMB": 105.3, + "avgCpuPercent": -0.3, + "avgMemoryMB": 69.2, + "redisStats": { + "commandstats": { + "config|resetstat": { + "calls": 1, + "usec": 2009, + "usec_per_call": 2009, + "rejected_calls": 0, + "failed_calls": 0 + }, + "lrange": { + "calls": 200, + "usec": 1443, + "usec_per_call": 7.22, + "rejected_calls": 0, + "failed_calls": 0 + }, + "incr": { + "calls": 500, + "usec": 43296, + "usec_per_call": 86.59, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zrangebyscore": { + "calls": 359, + "usec": 2510, + "usec_per_call": 6.99, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zadd": { + "calls": 2198, + "usec": 60115, + "usec_per_call": 27.35, + "rejected_calls": 0, + "failed_calls": 0 + }, + "bzpopmin": { + "calls": 219, + "usec": 4344, + "usec_per_call": 19.84, + "rejected_calls": 0, + "failed_calls": 0 + }, + "evalsha": { + "calls": 2295, + "usec": 807131, + "usec_per_call": 351.69, + "rejected_calls": 0, + "failed_calls": 0 + }, + "lrem": { + "calls": 1, + "usec": 9, + "usec_per_call": 9, + "rejected_calls": 0, + "failed_calls": 0 + }, + "sadd": { + "calls": 500, + "usec": 4907, + "usec_per_call": 9.81, + "rejected_calls": 0, + "failed_calls": 0 + }, + "hset": { + "calls": 2503, + "usec": 44688, + "usec_per_call": 17.85, + "rejected_calls": 0, + "failed_calls": 0 + }, + "srem": { + "calls": 50, + "usec": 144, + "usec_per_call": 2.88, + "rejected_calls": 0, + "failed_calls": 0 + }, + "hmget": { + "calls": 501, + "usec": 3872, + "usec_per_call": 7.73, + "rejected_calls": 0, + "failed_calls": 0 + }, + "get": { + "calls": 1225, + "usec": 30527, + "usec_per_call": 24.92, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zpopmin": { + "calls": 551, + "usec": 3656, + "usec_per_call": 6.64, + "rejected_calls": 0, + "failed_calls": 0 + }, + "llen": { + "calls": 419, + "usec": 781, + "usec_per_call": 1.86, + "rejected_calls": 0, + "failed_calls": 0 + }, + "del": { + "calls": 1551, + "usec": 4795, + "usec_per_call": 3.09, + "rejected_calls": 0, + "failed_calls": 0 + }, + "info": { + "calls": 8, + "usec": 1531, + "usec_per_call": 191.38, + "rejected_calls": 0, + "failed_calls": 0 + }, + "script|load": { + "calls": 28, + "usec": 944, + "usec_per_call": 33.71, + "rejected_calls": 0, + "failed_calls": 0 + }, + "lindex": { + "calls": 868, + "usec": 2290, + "usec_per_call": 2.64, + "rejected_calls": 0, + "failed_calls": 0 + }, + "smembers": { + "calls": 195, + "usec": 3943, + "usec_per_call": 20.22, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zcard": { + "calls": 5615, + "usec": 7020, + "usec_per_call": 1.25, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zscore": { + "calls": 869, + "usec": 3428, + "usec_per_call": 3.94, + "rejected_calls": 0, + "failed_calls": 0 + }, + "hmset": { + "calls": 500, + "usec": 167628, + "usec_per_call": 335.26, + "rejected_calls": 0, + "failed_calls": 0 + }, + "set": { + "calls": 1445, + "usec": 96855, + "usec_per_call": 67.03, + "rejected_calls": 0, + "failed_calls": 0 + }, + "time": { + "calls": 500, + "usec": 1786, + "usec_per_call": 3.57, + "rejected_calls": 0, + "failed_calls": 0 + }, + "hget": { + "calls": 921, + "usec": 5815, + "usec_per_call": 6.31, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zrem": { + "calls": 602, + "usec": 3418, + "usec_per_call": 5.68, + "rejected_calls": 0, + "failed_calls": 0 + }, + "lpop": { + "calls": 500, + "usec": 2019, + "usec_per_call": 4.04, + "rejected_calls": 0, + "failed_calls": 0 + }, + "lpush": { + "calls": 501, + "usec": 2299, + "usec_per_call": 4.59, + "rejected_calls": 0, + "failed_calls": 0 + }, + "zrange": { + "calls": 1619, + "usec": 6934, + "usec_per_call": 4.28, + "rejected_calls": 0, + "failed_calls": 0 + }, + "slowlog|reset": { + "calls": 1, + "usec": 379, + "usec_per_call": 379, + "rejected_calls": 0, + "failed_calls": 0 + } + }, + "slowlog": [ + [ + 48, + 1774458456, + 92361, + [ + "info", + "commandstats" + ], + "192.168.16.1:52890", + "" + ], + [ + 47, + 1774458335, + 26709, + [ + "get", + "groupmq:benchmark-1774456776815-nk6558dw7yp}:paused" + ], + "192.168.16.1:33890", + "" + ], + [ + 46, + 1774458326, + 55079, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-18", + "{\"id\":\"job-468\",\"enqueuedAt\":1774458326324}", + "3", + "1774458326324", + "0", + "91188998-3100-47d9-af93-21e4a857117e", + "1", + "1774458326324", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 45, + 1774458326, + 54866, + [ + "HMSET", + "groupmq:benchmark-1774458318858-vougqhix1g}:job:91188998-3100-47d9-af93-21e4a857117e", + "id", + "91188998-3100-47d9-af93-21e4a857117e", + "groupId", + "group-18", + "data", + "{\"id\":\"job-468\",\"enqueuedAt\":1774458326324}", + "attempts", + "0", + "maxAttempts", + "3", + "seq", + "469", + "timestamp", + "1774458326324", + "orderMs", + "1774458326324", + "score", + "70391126324469", + "delayUntil", + "0" + ], + "", + "" + ], + [ + 44, + 1774458325, + 34695, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-0", + "{\"id\":\"job-400\",\"enqueuedAt\":1774458325430}", + "3", + "1774458325430", + "0", + "78c31cb9-4294-4d27-b606-01f3a11ad88d", + "1", + "1774458325430", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 43, + 1774458325, + 34449, + [ + "HSET", + "groupmq:benchmark-1774458318858-vougqhix1g}:job:78c31cb9-4294-4d27-b606-01f3a11ad88d", + "status", + "waiting" + ], + "", + "" + ], + [ + 42, + 1774458324, + 41295, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-48", + "{\"id\":\"job-298\",\"enqueuedAt\":1774458324060}", + "3", + "1774458324060", + "0", + "1258b4c8-fbb8-425f-99ee-297e9a8398bb", + "1", + "1774458324060", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 41, + 1774458324, + 40960, + [ + "HMSET", + "groupmq:benchmark-1774458318858-vougqhix1g}:job:1258b4c8-fbb8-425f-99ee-297e9a8398bb", + "id", + "1258b4c8-fbb8-425f-99ee-297e9a8398bb", + "groupId", + "group-48", + "data", + "{\"id\":\"job-298\",\"enqueuedAt\":1774458324060}", + "attempts", + "0", + "maxAttempts", + "3", + "seq", + "299", + "timestamp", + "1774458324060", + "orderMs", + "1774458324060", + "score", + "70391124060299", + "delayUntil", + "0" + ], + "", + "" + ], + [ + 40, + 1774458323, + 28743, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-22", + "{\"id\":\"job-272\",\"enqueuedAt\":1774458323834}", + "3", + "1774458323834", + "0", + "560f0441-99e5-4479-a327-be7ec1870b68", + "1", + "1774458323834", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 39, + 1774458323, + 28299, + [ + "SET", + "groupmq:benchmark-1774458318858-vougqhix1g}:unique:560f0441-99e5-4479-a327-be7ec1870b68", + "560f0441-99e5-4479-a327-be7ec1870b68", + "NX" + ], + "", + "" + ], + [ + 38, + 1774458323, + 29830, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-9", + "{\"id\":\"job-259\",\"enqueuedAt\":1774458323674}", + "3", + "1774458323674", + "0", + "c2234003-3a45-4d11-9885-15010e8a6afe", + "1", + "1774458323674", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 37, + 1774458323, + 29407, + [ + "ZADD", + "groupmq:benchmark-1774458318858-vougqhix1g}:g:group-9", + "70391123674260", + "c2234003-3a45-4d11-9885-15010e8a6afe" + ], + "", + "" + ], + [ + 36, + 1774458322, + 40810, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-0", + "{\"id\":\"job-200\",\"enqueuedAt\":1774458322813}", + "3", + "1774458322813", + "0", + "e7bd56ae-4261-448c-a034-d0139b7e11e2", + "1", + "1774458322813", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 35, + 1774458322, + 40525, + [ + "INCR", + "groupmq:benchmark-1774458318858-vougqhix1g}:seq:20537" + ], + "", + "" + ], + [ + 34, + 1774458322, + 54166, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-49", + "{\"id\":\"job-149\",\"enqueuedAt\":1774458322058}", + "3", + "1774458322058", + "0", + "b03fdb14-d600-42f1-a447-27239219fdb7", + "1", + "1774458322058", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 33, + 1774458322, + 53923, + [ + "HMSET", + "groupmq:benchmark-1774458318858-vougqhix1g}:job:b03fdb14-d600-42f1-a447-27239219fdb7", + "id", + "b03fdb14-d600-42f1-a447-27239219fdb7", + "groupId", + "group-49", + "data", + "{\"id\":\"job-149\",\"enqueuedAt\":1774458322058}", + "attempts", + "0", + "maxAttempts", + "3", + "seq", + "150", + "timestamp", + "1774458322058", + "orderMs", + "1774458322058", + "score", + "70391122058150", + "delayUntil", + "0" + ], + "", + "" + ], + [ + 32, + 1774458321, + 14638, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-14", + "{\"id\":\"job-64\",\"enqueuedAt\":1774458321066}", + "3", + "1774458321066", + "0", + "78c3c1ad-18d8-49af-8c54-1aa2a5bf3412", + "1", + "1774458321066", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 31, + 1774458320, + 55177, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-17", + "{\"id\":\"job-17\",\"enqueuedAt\":1774458320443}", + "3", + "1774458320443", + "0", + "bfe5d77a-674d-4ed7-9345-c6837d21776c", + "1", + "1774458320443", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 30, + 1774458320, + 54890, + [ + "SET", + "groupmq:benchmark-1774458318858-vougqhix1g}:unique:bfe5d77a-674d-4ed7-9345-c6837d21776c", + "bfe5d77a-674d-4ed7-9345-c6837d21776c", + "NX" + ], + "", + "" + ], + [ + 29, + 1774458320, + 52438, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-0", + "{\"id\":\"job-0\",\"enqueuedAt\":1774458319988}", + "3", + "1774458319989", + "0", + "756beb43-3784-40b5-a4f7-a56b99df7148", + "1", + "1774458319990", + "0" + ], + "192.168.16.1:52902", + "" + ] + ], + "latency": [], + "info": { + "used_memory": 2147888, + "used_memory_human": "2.05M", + "used_memory_peak": 2423312, + "used_memory_peak_human": "2.31M", + "total_commands_processed": 27250, + "instantaneous_ops_per_sec": 232, + "total_net_input_bytes": 843278, + "total_net_output_bytes": 265854, + "keyspace_hits": 10518, + "keyspace_misses": 2273 + }, + "summary": { + "totalCalls": 27245, + "totalUsec": 1320516, + "avgUsecPerCall": 48.46819599926592, + "commandCount": 31, + "topCommands": [ + { + "command": "zcard", + "calls": 5615, + "usec": 7020, + "usec_per_call": 1.25, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "hset", + "calls": 2503, + "usec": 44688, + "usec_per_call": 17.85, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "evalsha", + "calls": 2295, + "usec": 807131, + "usec_per_call": 351.69, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "zadd", + "calls": 2198, + "usec": 60115, + "usec_per_call": 27.35, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "zrange", + "calls": 1619, + "usec": 6934, + "usec_per_call": 4.28, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "del", + "calls": 1551, + "usec": 4795, + "usec_per_call": 3.09, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "set", + "calls": 1445, + "usec": 96855, + "usec_per_call": 67.03, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "get", + "calls": 1225, + "usec": 30527, + "usec_per_call": 24.92, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "hget", + "calls": 921, + "usec": 5815, + "usec_per_call": 6.31, + "rejected_calls": 0, + "failed_calls": 0 + }, + { + "command": "zscore", + "calls": 869, + "usec": 3428, + "usec_per_call": 3.94, + "rejected_calls": 0, + "failed_calls": 0 + } + ] + } + }, + "settings": { + "mq": "groupmq", + "jobs": 500, + "workers": 4, + "jobType": "cpu", + "multiProcess": true + } } -] +] \ No newline at end of file diff --git a/benchmark/results/groupmq_redis.json b/benchmark/results/groupmq_redis.json index b3db527..1e11fab 100644 --- a/benchmark/results/groupmq_redis.json +++ b/benchmark/results/groupmq_redis.json @@ -1,316 +1,722 @@ { "commandstats": { - "config": { + "config|resetstat": { "calls": 1, - "usec": 631, - "usec_per_call": 631, + "usec": 2009, + "usec_per_call": 2009, "rejected_calls": 0, "failed_calls": 0 }, - "del": { - "calls": 15598, - "usec": 43047, - "usec_per_call": 2.75978, + "lrange": { + "calls": 200, + "usec": 1443, + "usec_per_call": 7.22, "rejected_calls": 0, "failed_calls": 0 }, - "evalsha": { - "calls": 10870, - "usec": 1469959, - "usec_per_call": 135.231, + "incr": { + "calls": 500, + "usec": 43296, + "usec_per_call": 86.59, "rejected_calls": 0, "failed_calls": 0 }, - "get": { - "calls": 1352, - "usec": 4029, - "usec_per_call": 2.98003, + "zrangebyscore": { + "calls": 359, + "usec": 2510, + "usec_per_call": 6.99, "rejected_calls": 0, "failed_calls": 0 }, - "hget": { - "calls": 5598, - "usec": 44615, - "usec_per_call": 7.96981, + "zadd": { + "calls": 2198, + "usec": 60115, + "usec_per_call": 27.35, "rejected_calls": 0, "failed_calls": 0 }, - "hmget": { - "calls": 5000, - "usec": 42453, - "usec_per_call": 8.4906, + "bzpopmin": { + "calls": 219, + "usec": 4344, + "usec_per_call": 19.84, "rejected_calls": 0, "failed_calls": 0 }, - "hmset": { - "calls": 5000, - "usec": 26314, - "usec_per_call": 5.2628, + "evalsha": { + "calls": 2295, + "usec": 807131, + "usec_per_call": 351.69, "rejected_calls": 0, "failed_calls": 0 }, - "hset": { - "calls": 25000, - "usec": 51691, - "usec_per_call": 2.06764, + "lrem": { + "calls": 1, + "usec": 9, + "usec_per_call": 9, "rejected_calls": 0, "failed_calls": 0 }, - "incr": { - "calls": 5000, - "usec": 4613, - "usec_per_call": 0.9226, + "sadd": { + "calls": 500, + "usec": 4907, + "usec_per_call": 9.81, "rejected_calls": 0, "failed_calls": 0 }, - "info": { - "calls": 20, - "usec": 19881, - "usec_per_call": 994.05, + "hset": { + "calls": 2503, + "usec": 44688, + "usec_per_call": 17.85, "rejected_calls": 0, "failed_calls": 0 }, - "lindex": { - "calls": 5000, - "usec": 13037, - "usec_per_call": 2.6074, + "srem": { + "calls": 50, + "usec": 144, + "usec_per_call": 2.88, "rejected_calls": 0, "failed_calls": 0 }, - "llen": { - "calls": 643, - "usec": 509, - "usec_per_call": 0.791602, + "hmget": { + "calls": 501, + "usec": 3872, + "usec_per_call": 7.73, "rejected_calls": 0, "failed_calls": 0 }, - "lpop": { - "calls": 5000, - "usec": 17404, - "usec_per_call": 3.4808, + "get": { + "calls": 1225, + "usec": 30527, + "usec_per_call": 24.92, "rejected_calls": 0, "failed_calls": 0 }, - "lpush": { - "calls": 5000, - "usec": 29783, - "usec_per_call": 5.9566, + "zpopmin": { + "calls": 551, + "usec": 3656, + "usec_per_call": 6.64, "rejected_calls": 0, "failed_calls": 0 }, - "ping": { - "calls": 1, - "usec": 37, - "usec_per_call": 37, + "llen": { + "calls": 419, + "usec": 781, + "usec_per_call": 1.86, "rejected_calls": 0, "failed_calls": 0 }, - "sadd": { - "calls": 5000, - "usec": 8771, - "usec_per_call": 1.7542, + "del": { + "calls": 1551, + "usec": 4795, + "usec_per_call": 3.09, "rejected_calls": 0, "failed_calls": 0 }, - "script": { - "calls": 52, - "usec": 13176, - "usec_per_call": 253.385, + "info": { + "calls": 8, + "usec": 1531, + "usec_per_call": 191.38, "rejected_calls": 0, "failed_calls": 0 }, - "set": { - "calls": 5298, - "usec": 32845, - "usec_per_call": 6.19951, + "script|load": { + "calls": 28, + "usec": 944, + "usec_per_call": 33.71, "rejected_calls": 0, "failed_calls": 0 }, - "slowlog": { - "calls": 1, - "usec": 127, - "usec_per_call": 127, + "lindex": { + "calls": 868, + "usec": 2290, + "usec_per_call": 2.64, "rejected_calls": 0, "failed_calls": 0 }, "smembers": { - "calls": 58, - "usec": 6979, - "usec_per_call": 120.328, + "calls": 195, + "usec": 3943, + "usec_per_call": 20.22, "rejected_calls": 0, "failed_calls": 0 }, - "srem": { - "calls": 598, - "usec": 2792, - "usec_per_call": 4.6689, + "zcard": { + "calls": 5615, + "usec": 7020, + "usec_per_call": 1.25, "rejected_calls": 0, "failed_calls": 0 }, - "time": { - "calls": 5000, - "usec": 1677, - "usec_per_call": 0.3354, + "zscore": { + "calls": 869, + "usec": 3428, + "usec_per_call": 3.94, "rejected_calls": 0, "failed_calls": 0 }, - "zadd": { - "calls": 19371, - "usec": 69815, - "usec_per_call": 3.6041, + "hmset": { + "calls": 500, + "usec": 167628, + "usec_per_call": 335.26, "rejected_calls": 0, "failed_calls": 0 }, - "zcard": { - "calls": 16240, - "usec": 5767, - "usec_per_call": 0.355111, + "set": { + "calls": 1445, + "usec": 96855, + "usec_per_call": 67.03, "rejected_calls": 0, "failed_calls": 0 }, - "zpopmin": { - "calls": 5598, - "usec": 36325, - "usec_per_call": 6.48892, + "time": { + "calls": 500, + "usec": 1786, + "usec_per_call": 3.57, "rejected_calls": 0, "failed_calls": 0 }, - "zrange": { - "calls": 11293, - "usec": 46850, - "usec_per_call": 4.14859, + "hget": { + "calls": 921, + "usec": 5815, + "usec_per_call": 6.31, "rejected_calls": 0, "failed_calls": 0 }, - "zrangebyscore": { - "calls": 77, - "usec": 479, - "usec_per_call": 6.22078, + "zrem": { + "calls": 602, + "usec": 3418, + "usec_per_call": 5.68, "rejected_calls": 0, "failed_calls": 0 }, - "zrem": { - "calls": 6188, - "usec": 22278, - "usec_per_call": 3.60019, + "lpop": { + "calls": 500, + "usec": 2019, + "usec_per_call": 4.04, "rejected_calls": 0, "failed_calls": 0 }, - "zremrangebyrank": { - "calls": 8, - "usec": 15, - "usec_per_call": 1.875, + "lpush": { + "calls": 501, + "usec": 2299, + "usec_per_call": 4.59, "rejected_calls": 0, "failed_calls": 0 }, - "zscore": { - "calls": 5000, - "usec": 12537, - "usec_per_call": 2.5074, + "zrange": { + "calls": 1619, + "usec": 6934, + "usec_per_call": 4.28, + "rejected_calls": 0, + "failed_calls": 0 + }, + "slowlog|reset": { + "calls": 1, + "usec": 379, + "usec_per_call": 379, "rejected_calls": 0, "failed_calls": 0 } }, - "slowlog": [], + "slowlog": [ + [ + 48, + 1774458456, + 92361, + [ + "info", + "commandstats" + ], + "192.168.16.1:52890", + "" + ], + [ + 47, + 1774458335, + 26709, + [ + "get", + "groupmq:benchmark-1774456776815-nk6558dw7yp}:paused" + ], + "192.168.16.1:33890", + "" + ], + [ + 46, + 1774458326, + 55079, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-18", + "{\"id\":\"job-468\",\"enqueuedAt\":1774458326324}", + "3", + "1774458326324", + "0", + "91188998-3100-47d9-af93-21e4a857117e", + "1", + "1774458326324", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 45, + 1774458326, + 54866, + [ + "HMSET", + "groupmq:benchmark-1774458318858-vougqhix1g}:job:91188998-3100-47d9-af93-21e4a857117e", + "id", + "91188998-3100-47d9-af93-21e4a857117e", + "groupId", + "group-18", + "data", + "{\"id\":\"job-468\",\"enqueuedAt\":1774458326324}", + "attempts", + "0", + "maxAttempts", + "3", + "seq", + "469", + "timestamp", + "1774458326324", + "orderMs", + "1774458326324", + "score", + "70391126324469", + "delayUntil", + "0" + ], + "", + "" + ], + [ + 44, + 1774458325, + 34695, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-0", + "{\"id\":\"job-400\",\"enqueuedAt\":1774458325430}", + "3", + "1774458325430", + "0", + "78c31cb9-4294-4d27-b606-01f3a11ad88d", + "1", + "1774458325430", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 43, + 1774458325, + 34449, + [ + "HSET", + "groupmq:benchmark-1774458318858-vougqhix1g}:job:78c31cb9-4294-4d27-b606-01f3a11ad88d", + "status", + "waiting" + ], + "", + "" + ], + [ + 42, + 1774458324, + 41295, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-48", + "{\"id\":\"job-298\",\"enqueuedAt\":1774458324060}", + "3", + "1774458324060", + "0", + "1258b4c8-fbb8-425f-99ee-297e9a8398bb", + "1", + "1774458324060", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 41, + 1774458324, + 40960, + [ + "HMSET", + "groupmq:benchmark-1774458318858-vougqhix1g}:job:1258b4c8-fbb8-425f-99ee-297e9a8398bb", + "id", + "1258b4c8-fbb8-425f-99ee-297e9a8398bb", + "groupId", + "group-48", + "data", + "{\"id\":\"job-298\",\"enqueuedAt\":1774458324060}", + "attempts", + "0", + "maxAttempts", + "3", + "seq", + "299", + "timestamp", + "1774458324060", + "orderMs", + "1774458324060", + "score", + "70391124060299", + "delayUntil", + "0" + ], + "", + "" + ], + [ + 40, + 1774458323, + 28743, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-22", + "{\"id\":\"job-272\",\"enqueuedAt\":1774458323834}", + "3", + "1774458323834", + "0", + "560f0441-99e5-4479-a327-be7ec1870b68", + "1", + "1774458323834", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 39, + 1774458323, + 28299, + [ + "SET", + "groupmq:benchmark-1774458318858-vougqhix1g}:unique:560f0441-99e5-4479-a327-be7ec1870b68", + "560f0441-99e5-4479-a327-be7ec1870b68", + "NX" + ], + "", + "" + ], + [ + 38, + 1774458323, + 29830, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-9", + "{\"id\":\"job-259\",\"enqueuedAt\":1774458323674}", + "3", + "1774458323674", + "0", + "c2234003-3a45-4d11-9885-15010e8a6afe", + "1", + "1774458323674", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 37, + 1774458323, + 29407, + [ + "ZADD", + "groupmq:benchmark-1774458318858-vougqhix1g}:g:group-9", + "70391123674260", + "c2234003-3a45-4d11-9885-15010e8a6afe" + ], + "", + "" + ], + [ + 36, + 1774458322, + 40810, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-0", + "{\"id\":\"job-200\",\"enqueuedAt\":1774458322813}", + "3", + "1774458322813", + "0", + "e7bd56ae-4261-448c-a034-d0139b7e11e2", + "1", + "1774458322813", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 35, + 1774458322, + 40525, + [ + "INCR", + "groupmq:benchmark-1774458318858-vougqhix1g}:seq:20537" + ], + "", + "" + ], + [ + 34, + 1774458322, + 54166, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-49", + "{\"id\":\"job-149\",\"enqueuedAt\":1774458322058}", + "3", + "1774458322058", + "0", + "b03fdb14-d600-42f1-a447-27239219fdb7", + "1", + "1774458322058", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 33, + 1774458322, + 53923, + [ + "HMSET", + "groupmq:benchmark-1774458318858-vougqhix1g}:job:b03fdb14-d600-42f1-a447-27239219fdb7", + "id", + "b03fdb14-d600-42f1-a447-27239219fdb7", + "groupId", + "group-49", + "data", + "{\"id\":\"job-149\",\"enqueuedAt\":1774458322058}", + "attempts", + "0", + "maxAttempts", + "3", + "seq", + "150", + "timestamp", + "1774458322058", + "orderMs", + "1774458322058", + "score", + "70391122058150", + "delayUntil", + "0" + ], + "", + "" + ], + [ + 32, + 1774458321, + 14638, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-14", + "{\"id\":\"job-64\",\"enqueuedAt\":1774458321066}", + "3", + "1774458321066", + "0", + "78c3c1ad-18d8-49af-8c54-1aa2a5bf3412", + "1", + "1774458321066", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 31, + 1774458320, + 55177, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-17", + "{\"id\":\"job-17\",\"enqueuedAt\":1774458320443}", + "3", + "1774458320443", + "0", + "bfe5d77a-674d-4ed7-9345-c6837d21776c", + "1", + "1774458320443", + "0" + ], + "192.168.16.1:52902", + "" + ], + [ + 30, + 1774458320, + 54890, + [ + "SET", + "groupmq:benchmark-1774458318858-vougqhix1g}:unique:bfe5d77a-674d-4ed7-9345-c6837d21776c", + "bfe5d77a-674d-4ed7-9345-c6837d21776c", + "NX" + ], + "", + "" + ], + [ + 29, + 1774458320, + 52438, + [ + "evalsha", + "1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425", + "1", + "groupmq:benchmark-1774458318858-vougqhix1g}", + "group-0", + "{\"id\":\"job-0\",\"enqueuedAt\":1774458319988}", + "3", + "1774458319989", + "0", + "756beb43-3784-40b5-a4f7-a56b99df7148", + "1", + "1774458319990", + "0" + ], + "192.168.16.1:52902", + "" + ] + ], "latency": [], "info": { - "used_memory": 6366688, - "used_memory_human": "6.07MiB", - "used_memory_peak": 8907232, - "used_memory_peak_human": "8.49MiB", - "total_commands_processed": 168877, - "instantaneous_ops_per_sec": 3526, - "total_net_input_bytes": 11271294, - "total_net_output_bytes": 1926951, - "keyspace_hits": 47399, - "keyspace_misses": 2872 + "used_memory": 2147888, + "used_memory_human": "2.05M", + "used_memory_peak": 2423312, + "used_memory_peak_human": "2.31M", + "total_commands_processed": 27250, + "instantaneous_ops_per_sec": 232, + "total_net_input_bytes": 843278, + "total_net_output_bytes": 265854, + "keyspace_hits": 10518, + "keyspace_misses": 2273 }, "summary": { - "totalCalls": 168865, - "totalUsec": 2028436, - "avgUsecPerCall": 12.01217540638972, - "commandCount": 30, + "totalCalls": 27245, + "totalUsec": 1320516, + "avgUsecPerCall": 48.46819599926592, + "commandCount": 31, "topCommands": [ { - "command": "hset", - "calls": 25000, - "usec": 51691, - "usec_per_call": 2.06764, + "command": "zcard", + "calls": 5615, + "usec": 7020, + "usec_per_call": 1.25, "rejected_calls": 0, "failed_calls": 0 }, { - "command": "zadd", - "calls": 19371, - "usec": 69815, - "usec_per_call": 3.6041, + "command": "hset", + "calls": 2503, + "usec": 44688, + "usec_per_call": 17.85, "rejected_calls": 0, "failed_calls": 0 }, { - "command": "zcard", - "calls": 16240, - "usec": 5767, - "usec_per_call": 0.355111, + "command": "evalsha", + "calls": 2295, + "usec": 807131, + "usec_per_call": 351.69, "rejected_calls": 0, "failed_calls": 0 }, { - "command": "del", - "calls": 15598, - "usec": 43047, - "usec_per_call": 2.75978, + "command": "zadd", + "calls": 2198, + "usec": 60115, + "usec_per_call": 27.35, "rejected_calls": 0, "failed_calls": 0 }, { "command": "zrange", - "calls": 11293, - "usec": 46850, - "usec_per_call": 4.14859, + "calls": 1619, + "usec": 6934, + "usec_per_call": 4.28, "rejected_calls": 0, "failed_calls": 0 }, { - "command": "evalsha", - "calls": 10870, - "usec": 1469959, - "usec_per_call": 135.231, + "command": "del", + "calls": 1551, + "usec": 4795, + "usec_per_call": 3.09, "rejected_calls": 0, "failed_calls": 0 }, { - "command": "zrem", - "calls": 6188, - "usec": 22278, - "usec_per_call": 3.60019, + "command": "set", + "calls": 1445, + "usec": 96855, + "usec_per_call": 67.03, "rejected_calls": 0, "failed_calls": 0 }, { - "command": "hget", - "calls": 5598, - "usec": 44615, - "usec_per_call": 7.96981, + "command": "get", + "calls": 1225, + "usec": 30527, + "usec_per_call": 24.92, "rejected_calls": 0, "failed_calls": 0 }, { - "command": "zpopmin", - "calls": 5598, - "usec": 36325, - "usec_per_call": 6.48892, + "command": "hget", + "calls": 921, + "usec": 5815, + "usec_per_call": 6.31, "rejected_calls": 0, "failed_calls": 0 }, { - "command": "set", - "calls": 5298, - "usec": 32845, - "usec_per_call": 6.19951, + "command": "zscore", + "calls": 869, + "usec": 3428, + "usec_per_call": 3.94, "rejected_calls": 0, "failed_calls": 0 } ] } -} +} \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index cc8fe01..e6fc408 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,11 +8,17 @@ services: ports: - "6385:6379" ulimits: - memlock: -1 # recommended by Dragonfly + memlock: -1 nofile: 65535 + deploy: + resources: + limits: + memory: 2g command: - "--cluster_mode=emulated" - "--lock_on_hashtags" + - "--maxmemory=1gb" + - "--proactor_threads=2" redis: image: redis:7.2.5-alpine diff --git a/src/lua/reserve-atomic.lua b/src/lua/reserve-atomic.lua index ebb43ff..babc1d4 100644 --- a/src/lua/reserve-atomic.lua +++ b/src/lua/reserve-atomic.lua @@ -18,6 +18,49 @@ end -- BullMQ-style: Check if group has active jobs local activeCount = redis.call("LLEN", groupActiveKey) +-- Self-healing: detect and clean up ghost active entries left by ungraceful shutdown +if activeCount > 0 then + local firstActive = redis.call("LINDEX", groupActiveKey, 0) + if firstActive then + local isStale = false + local procScore = redis.call("ZSCORE", ns .. ":processing", firstActive) + if not procScore then + isStale = true + else + local sStatus = redis.call("HGET", ns .. ":job:" .. firstActive, "status") + if not sStatus or (sStatus ~= "processing" and sStatus ~= "completing") then + isStale = true + else + -- Heartbeat freshness: processing score = deadlineAt = lastHeartbeat + vt + -- If (deadlineAt - now) < (vt - threshold), heartbeat stopped refreshing + local deadline = tonumber(procScore) + if deadline then + local gap = deadline - now + local hbThreshold = math.max(30000, math.min(120000, math.floor(vt / 3))) + if gap < (vt - hbThreshold) then + isStale = true + end + end + end + end + if isStale then + redis.call("DEL", groupActiveKey) + local sJobKey = ns .. ":job:" .. firstActive + local sScore = redis.call("HGET", sJobKey, "score") + if sScore then + redis.call("ZADD", gZ, tonumber(sScore), firstActive) + redis.call("HSET", sJobKey, "status", "waiting") + end + redis.call("ZREM", ns .. ":processing", firstActive) + redis.call("DEL", ns .. ":processing:" .. firstActive) + activeCount = 0 + end + else + redis.call("DEL", groupActiveKey) + activeCount = 0 + end +end + if activeCount > 0 then -- If allowedJobId is provided, check if it matches the active job (grace collection) if allowedJobId then diff --git a/src/lua/reserve-batch.lua b/src/lua/reserve-batch.lua index 88bb381..55f3892 100644 --- a/src/lua/reserve-batch.lua +++ b/src/lua/reserve-batch.lua @@ -69,6 +69,48 @@ for i = 1, #groups, 2 do -- Check if group has no active jobs (BullMQ-style gating) local activeCount = redis.call("LLEN", groupActiveKey) + + -- Self-healing: detect and clean up ghost active entries left by ungraceful shutdown + if activeCount > 0 then + local firstActive = redis.call("LINDEX", groupActiveKey, 0) + if firstActive then + local isStale = false + local procScore = redis.call("ZSCORE", processingKey, firstActive) + if not procScore then + isStale = true + else + local sStatus = redis.call("HGET", ns .. ":job:" .. firstActive, "status") + if not sStatus or (sStatus ~= "processing" and sStatus ~= "completing") then + isStale = true + else + local deadline = tonumber(procScore) + if deadline then + local gap = deadline - now + local hbThreshold = math.max(30000, math.min(120000, math.floor(vt / 3))) + if gap < (vt - hbThreshold) then + isStale = true + end + end + end + end + if isStale then + redis.call("DEL", groupActiveKey) + local sJobKey = ns .. ":job:" .. firstActive + local sScore = redis.call("HGET", sJobKey, "score") + if sScore then + redis.call("ZADD", gZ, tonumber(sScore), firstActive) + redis.call("HSET", sJobKey, "status", "waiting") + end + redis.call("ZREM", processingKey, firstActive) + redis.call("DEL", ns .. ":processing:" .. firstActive) + activeCount = 0 + end + else + redis.call("DEL", groupActiveKey) + activeCount = 0 + end + end + if activeCount == 0 then local head = redis.call("ZRANGE", gZ, 0, 0, "WITHSCORES") if head and #head >= 2 then diff --git a/src/lua/reserve.lua b/src/lua/reserve.lua index 5b46e09..38b4e3b 100644 --- a/src/lua/reserve.lua +++ b/src/lua/reserve.lua @@ -79,6 +79,48 @@ for i = 1, #groups, 2 do -- Check if group has no active jobs (BullMQ-style gating) local activeCount = redis.call("LLEN", groupActiveKey) + + -- Self-healing: detect and clean up ghost active entries left by ungraceful shutdown + if activeCount > 0 then + local firstActive = redis.call("LINDEX", groupActiveKey, 0) + if firstActive then + local isStale = false + local procScore = redis.call("ZSCORE", processingKey, firstActive) + if not procScore then + isStale = true + else + local sStatus = redis.call("HGET", ns .. ":job:" .. firstActive, "status") + if not sStatus or (sStatus ~= "processing" and sStatus ~= "completing") then + isStale = true + else + local deadline = tonumber(procScore) + if deadline then + local gap = deadline - now + local hbThreshold = math.max(30000, math.min(120000, math.floor(vt / 3))) + if gap < (vt - hbThreshold) then + isStale = true + end + end + end + end + if isStale then + redis.call("DEL", groupActiveKey) + local sJobKey = ns .. ":job:" .. firstActive + local sScore = redis.call("HGET", sJobKey, "score") + if sScore then + redis.call("ZADD", gZ, tonumber(sScore), firstActive) + redis.call("HSET", sJobKey, "status", "waiting") + end + redis.call("ZREM", processingKey, firstActive) + redis.call("DEL", ns .. ":processing:" .. firstActive) + activeCount = 0 + end + else + redis.call("DEL", groupActiveKey) + activeCount = 0 + end + end + if activeCount == 0 then -- Check if group has jobs local head = redis.call("ZRANGE", gZ, 0, 0, "WITHSCORES") diff --git a/src/queue.ts b/src/queue.ts index b66c2ca..fcdbc41 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -1796,6 +1796,53 @@ export class Queue { }; } + /** + * Scan all groups and recover jobs stuck in active lists after ungraceful shutdown. + * Call this BEFORE creating workers on startup to clean up ghost entries. + * + * For each ghost: removes from active list, removes from processing set, + * re-queues with 'waiting' status, and restores the group to the ready set. + */ + async recoverActiveJobs(): Promise { + const groupsKey = `${this.ns}:groups`; + const readyKey = `${this.ns}:ready`; + const processingKey = `${this.ns}:processing`; + const allGroups = await this.r.smembers(groupsKey); + let recovered = 0; + + for (const groupId of allGroups) { + const activeKey = `${this.ns}:g:${groupId}:active`; + const activeJobs = await this.r.lrange(activeKey, 0, -1); + if (activeJobs.length === 0) continue; + + for (const jobId of activeJobs) { + const jobKey = `${this.ns}:job:${jobId}`; + const score = await this.r.hget(jobKey, 'score'); + + await this.r.lrem(activeKey, 0, jobId); + await this.r.zrem(processingKey, jobId); + await this.r.del(`${this.ns}:processing:${jobId}`); + + if (score) { + const groupKey = `${this.ns}:g:${groupId}`; + await this.r.zadd(groupKey, Number(score), jobId); + await this.r.hset(jobKey, 'status', 'waiting'); + + const head = await this.r.zrange(groupKey, 0, 0, 'WITHSCORES'); + if (head.length >= 2) { + await this.r.zadd(readyKey, Number(head[1]), groupId); + } + + recovered++; + this.logger.info( + `Recovered stale active job ${jobId} from group ${groupId}`, + ); + } + } + } + return recovered; + } + /** * Check for stalled jobs and recover or fail them * Returns array of [jobId, groupId, action] tuples diff --git a/src/worker.ts b/src/worker.ts index cdd2980..e6a5d57 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -441,6 +441,20 @@ class _Worker extends TypedEventEmitter> { private async _runLoop(): Promise { this.logger.info(`🚀 Worker ${this.name} starting...`); + + // Auto-recover ghost active entries left by previous ungraceful shutdowns. + // This is idempotent — safe to run from every worker on startup. + try { + const recovered = await this.q.recoverActiveJobs(); + if (recovered > 0) { + this.logger.info( + `Recovered ${recovered} ghost active job(s) from previous crash`, + ); + } + } catch (err) { + this.logger.warn('Failed to recover active jobs on startup:', err); + } + // Dedicated blocking client per worker with auto-pipelining to reduce contention try { this.blockingClient = this.q.redis.duplicate({ @@ -585,10 +599,10 @@ class _Worker extends TypedEventEmitter> { const fetchedJob = allowBlocking ? this.q.reserveBlocking( - adaptiveTimeout, - undefined, // blockUntil removed (was always 0, dead code) - this.blockingClient ?? undefined, - ) + adaptiveTimeout, + undefined, // blockUntil removed (was always 0, dead code) + this.blockingClient ?? undefined, + ) : this.q.reserve(); asyncFifoQueue.add(fetchedJob); diff --git a/test/queue.ghost-active-job.test.ts b/test/queue.ghost-active-job.test.ts new file mode 100644 index 0000000..e6a5d4c --- /dev/null +++ b/test/queue.ghost-active-job.test.ts @@ -0,0 +1,238 @@ +import Redis from 'ioredis'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { Queue } from '../src/queue'; +import { Worker } from '../src/worker'; + +const REDIS_URL = process.env.REDIS_URL ?? 'redis://127.0.0.1:6379'; + +interface GhostReport { + totalGhosts: number; + totalProcessingOrphans: number; + groups: { + groupId: string; + activeJobs: string[]; + details: { + jobId: string; + status: string | null; + inProcessingSet: boolean; + deadlineDaysAway: number | null; + }[]; + }[]; +} + +async function getGhostReport(redis: Redis, ns: string): Promise { + const groups = await redis.smembers(`${ns}:groups`); + const report: GhostReport = { + totalGhosts: 0, + totalProcessingOrphans: 0, + groups: [], + }; + + for (const groupId of groups) { + const activeKey = `${ns}:g:${groupId}:active`; + const activeJobs = await redis.lrange(activeKey, 0, -1); + if (activeJobs.length === 0) continue; + + const groupReport: GhostReport['groups'][number] = { + groupId, + activeJobs, + details: [], + }; + + for (const jobId of activeJobs) { + const status = await redis.hget(`${ns}:job:${jobId}`, 'status'); + const procScore = await redis.zscore(`${ns}:processing`, jobId); + const inProcessingSet = procScore !== null; + let deadlineDaysAway: number | null = null; + + if (procScore) { + deadlineDaysAway = (Number(procScore) - Date.now()) / 1000 / 60 / 60 / 24; + } + + groupReport.details.push({ jobId, status, inProcessingSet, deadlineDaysAway }); + report.totalGhosts++; + } + + report.groups.push(groupReport); + } + + const allProcessing = await redis.zrange(`${ns}:processing`, 0, -1); + for (const jobId of allProcessing) { + const status = await redis.hget(`${ns}:job:${jobId}`, 'status'); + if (status !== 'processing' && status !== 'completing') { + report.totalProcessingOrphans++; + } + } + + return report; +} + +function printGhostReport(label: string, report: GhostReport) { + console.log(`\n--- ${label} ---`); + console.log('╔══════════════════════════════════════════════════════╗'); + console.log('║ GHOST ACTIVE JOB REPORT ║'); + console.log('╠══════════════════════════════════════════════════════╣'); + console.log(`║ Total ghost active entries: ${String(report.totalGhosts).padStart(4)} ║`); + console.log(`║ Processing set orphans: ${String(report.totalProcessingOrphans).padStart(4)} ║`); + console.log('╠══════════════════════════════════════════════════════╣'); + + for (const group of report.groups) { + console.log(`║ Group: ${group.groupId.padEnd(43)} ║`); + console.log(`║ Active list entries: ${group.activeJobs.length} ║`); + for (const d of group.details) { + const deadline = d.deadlineDaysAway !== null + ? `${d.deadlineDaysAway.toFixed(1)}d away` + : 'N/A'; + console.log(`║ - ${d.jobId.substring(0, 8)}... status=${(d.status ?? 'null').padEnd(12)} proc=${d.inProcessingSet ? 'yes' : 'no '} deadline=${deadline}`); + } + } + + if (report.groups.length === 0) { + console.log('║ (no ghost entries found) ║'); + } + + console.log('╚══════════════════════════════════════════════════════╝\n'); +} + +describe('Ghost Active Job after Ctrl+C', () => { + let redis: Redis; + let namespace: string; + + beforeEach(async () => { + namespace = `test-ghost-${Date.now()}-${Math.random().toString(36).slice(2)}`; + redis = new Redis(REDIS_URL, { maxRetriesPerRequest: null }); + + const keys = await redis.keys(`groupmq:${namespace}:*`); + if (keys.length > 0) await redis.del(...keys); + }); + + afterEach(async () => { + const keys = await redis.keys(`groupmq:${namespace}:*`); + if (keys.length > 0) await redis.del(...keys); + await redis.quit(); + }); + + function simulateCtrlC(worker: Worker) { + // @ts-ignore + if (worker['blockingClient']) { + worker['blockingClient'].disconnect(); + worker['blockingClient'] = null; + } + // @ts-ignore + worker['stopping'] = true; + // @ts-ignore + worker['closed'] = true; + // @ts-ignore + if (worker['cleanupTimer']) clearInterval(worker['cleanupTimer']); + // @ts-ignore + if (worker['schedulerTimer']) clearInterval(worker['schedulerTimer']); + // @ts-ignore + if (worker['stalledCheckTimer']) clearInterval(worker['stalledCheckTimer']); + // @ts-ignore + worker['jobsInProgress'].clear(); + } + + async function setupGhost(redis: Redis, namespace: string) { + const ns = `groupmq:${namespace}`; + const queue = new Queue({ + redis: redis.duplicate(), + namespace, + jobTimeoutMs: 7 * 24 * 60 * 60 * 1000, + orderingDelayMs: 1000, + }); + + const job1 = await queue.add({ + groupId: 'timeout', + data: { msg: 'job-1' }, + orderMs: Date.now() + 100, + }); + + await new Promise((r) => setTimeout(r, 1500)); + await queue.runSchedulerOnce(); + + await queue.add({ + groupId: 'timeout', + data: { msg: 'job-2' }, + orderMs: Date.now() + 200, + }); + + await new Promise((r) => setTimeout(r, 1500)); + await queue.runSchedulerOnce(); + + let pickedUpResolve: () => void; + const pickedUp = new Promise((r) => { pickedUpResolve = r; }); + + const workerRef = new Worker({ + queue, + handler: async (_job) => { + pickedUpResolve(); + await new Promise(() => {}); + }, + concurrency: 1, + }); + + await pickedUp; + simulateCtrlC(workerRef); + + return { queue, ns, job1 }; + } + + it('should detect ghost active entries after ungraceful shutdown', async () => { + const { queue, ns, job1 } = await setupGhost(redis, namespace); + + const report = await getGhostReport(redis, ns); + printGhostReport('AFTER Ctrl+C', report); + + // Prove the ghost exists + expect(report.totalGhosts).toBe(1); + expect(report.groups[0].groupId).toBe('timeout'); + expect(report.groups[0].details[0].jobId).toBe(job1.id); + expect(report.groups[0].details[0].status).toBe('processing'); + expect(report.groups[0].details[0].inProcessingSet).toBe(true); + expect(report.groups[0].details[0].deadlineDaysAway).toBeGreaterThan(6); + + await queue.close(); + }, 10000); + + it('should auto-recover ghosts when a new worker starts (no manual call needed)', async () => { + const { queue, ns } = await setupGhost(redis, namespace); + + const reportBefore = await getGhostReport(redis, ns); + printGhostReport('BEFORE new worker starts', reportBefore); + expect(reportBefore.totalGhosts).toBe(1); + + // "Restart" — just create a new queue + worker, like a normal app startup. + // The worker's run() auto-calls recoverActiveJobs() internally. + const queue2 = new Queue({ + redis: redis.duplicate(), + namespace, + jobTimeoutMs: 7 * 24 * 60 * 60 * 1000, + orderingDelayMs: 1000, + }); + + const processed: string[] = []; + const worker2 = new Worker({ + queue: queue2, + handler: async (job) => { + processed.push(job.id); + }, + concurrency: 1, + blockingTimeoutSec: 1, + }); + + // Give the worker time to auto-recover and process jobs + await new Promise((r) => setTimeout(r, 3000)); + await worker2.close(1000); + + const reportAfter = await getGhostReport(redis, ns); + printGhostReport('AFTER new worker started', reportAfter); + + // Ghosts are gone, both jobs processed — all automatic + expect(reportAfter.totalGhosts).toBe(0); + expect(processed.length).toBe(2); + console.log(`Auto-recovered and processed ${processed.length} jobs: ${processed.join(', ')}`); + + await queue.close(); + await queue2.close(); + }, 15000); +});