Skip to content
This repository was archived by the owner on Jan 7, 2026. It is now read-only.

Commit bb8356f

Browse files
authored
fix: deduplicate list of clients in retrieval tasks (#513)
Recently, we discovered that there is a lot of deals using the same payload CID `bafkqaaa`. There are more than 22k deals, but only between 27 miners and 9 clients. As a result, when one of such deals is picked for testing, the list of clients is very long and full of duplicates. In this change, I am modifying the query sampling eligible deals to de-duplicate the list of clients in each task. Signed-off-by: Miroslav Bajtoš <oss@bajtos.net>
1 parent 43b13a6 commit bb8356f

File tree

2 files changed

+69
-2
lines changed

2 files changed

+69
-2
lines changed

api/lib/round-tracker.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ export async function maybeCreateSparkRound (pgClient, {
307307
}
308308
}
309309

310-
async function defineTasksForRound (pgClient, sparkRoundNumber, taskCount) {
310+
export async function defineTasksForRound (pgClient, sparkRoundNumber, taskCount) {
311311
await pgClient.query(`
312312
INSERT INTO retrieval_tasks (round_id, cid, miner_id, clients)
313313
WITH selected AS (
@@ -317,7 +317,7 @@ async function defineTasksForRound (pgClient, sparkRoundNumber, taskCount) {
317317
ORDER BY random()
318318
LIMIT $2
319319
)
320-
SELECT $1 as round_id, selected.payload_cid as cid, selected.miner_id, array_agg(client_id) as clients
320+
SELECT $1 as round_id, selected.payload_cid as cid, selected.miner_id, array_agg(DISTINCT client_id) as clients
321321
FROM selected
322322
LEFT JOIN eligible_deals
323323
ON selected.payload_cid = eligible_deals.payload_cid AND selected.miner_id = eligible_deals.miner_id

api/test/round-tracker.test.js

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
BASELINE_TASKS_PER_NODE,
66
TASKS_EXECUTED_PER_ROUND,
77
ROUND_TASKS_TO_NODE_TASKS_RATIO,
8+
defineTasksForRound,
89
getRoundStartEpoch,
910
getRoundStartEpochWithBackoff,
1011
mapCurrentMeridianRoundToSparkRound,
@@ -560,6 +561,72 @@ describe('Round Tracker', () => {
560561
assert.strictEqual(sparkRound.max_tasks_per_node, MAX_TASKS_PER_NODE_LIMIT)
561562
})
562563
})
564+
565+
describe('defineTasksForRound', () => {
566+
before(async () => {
567+
// Mark all existing deals as expired
568+
await pgClient.query(`
569+
UPDATE eligible_deals SET expires_at = NOW() - INTERVAL '1 day'
570+
`)
571+
})
572+
573+
after(async () => {
574+
// Revert the change that expired existing deals
575+
await pgClient.query(`
576+
UPDATE eligible_deals SET expires_at = NOW() + INTERVAL '1 year'
577+
`)
578+
})
579+
580+
it('merges duplicate clients', async () => {
581+
// Delete any eligible deals created by previous test runs
582+
await pgClient.query(`
583+
DELETE FROM eligible_deals WHERE client_id = 'f0050'
584+
`)
585+
586+
// Create deals from the same client. First two deals are with the same SP, the third is not.
587+
// All deals have the same payload_cid.
588+
// Only these deals will be available for sampling
589+
await pgClient.query(`
590+
INSERT INTO eligible_deals
591+
(miner_id, client_id, piece_cid, piece_size, payload_cid, expires_at, sourced_from_f05_state)
592+
VALUES
593+
('f0010', 'f0050', 'baga1', 1, 'bafkqaaa', NOW() + INTERVAL '1 year', true),
594+
('f0010', 'f0050', 'baga2', 1, 'bafkqaaa', NOW() + INTERVAL '1 year', true),
595+
('f0011', 'f0050', 'baga1', 1, 'bafkqaaa', NOW() + INTERVAL '1 year', true)
596+
`)
597+
598+
// Create a new round and define tasks for the round
599+
const roundId = 1
600+
await pgClient.query(`
601+
INSERT INTO spark_rounds
602+
(id, created_at, meridian_address, meridian_round, start_epoch, max_tasks_per_node)
603+
VALUES
604+
($1, NOW(), '0x1a', 1, 1, 15)
605+
`, [
606+
roundId
607+
])
608+
await defineTasksForRound(pgClient, roundId, 3)
609+
610+
const { rows: tasks } = await pgClient.query(
611+
'SELECT miner_id, cid, clients FROM retrieval_tasks WHERE round_id = $1',
612+
[roundId]
613+
)
614+
615+
assert.deepStrictEqual(tasks, [
616+
{
617+
cid: 'bafkqaaa',
618+
miner_id: 'f0010',
619+
// Important: clients are deduplicated
620+
clients: ['f0050']
621+
},
622+
{
623+
cid: 'bafkqaaa',
624+
miner_id: 'f0011',
625+
clients: ['f0050']
626+
}
627+
])
628+
})
629+
})
563630
})
564631

565632
describe('getRoundStartEpoch', () => {

0 commit comments

Comments
 (0)