Skip to content

[Feature] Add Distributed Posting Router for SPANN#448

Open
TerrenceZhangX wants to merge 40 commits into
users/qiazh/pre-merge-tikv-bugfixfrom
users/zhangt/merge-distributed-to-tikv
Open

[Feature] Add Distributed Posting Router for SPANN#448
TerrenceZhangX wants to merge 40 commits into
users/qiazh/pre-merge-tikv-bugfixfrom
users/zhangt/merge-distributed-to-tikv

Conversation

@TerrenceZhangX
Copy link
Copy Markdown

@TerrenceZhangX TerrenceZhangX commented May 7, 2026

Limitations

  1. Using buildOnly flags to mitigate no ditributed build limitation. Current flow is using buildOnly to let single node to build and distribute to other nodes. Then using disable buildOnly to let multiple nodes load same head index and run benchmarks.

Scale results

Dataset: SIFT1B bigann_base.u8bin, 128d UInt8, L2. SPANN 2-layer index,
4 search threads, 4 insert threads, top-K=5, 200 queries.

1M base + 1M insert

Metric 1node 2node Scale
Build time (s) 74.2 91.3 0.81×
Pre-insert QPS 429.3 696.3 1.62×
Pre-insert mean latency (ms) 9.26 8.63
Pre-insert p99 (ms) 27.31 29.18
Post-insert QPS 425.0 708.1 1.67×
Insert throughput (vec/s) ~900 1793
Recall@5 (pre-insert) 0.984 0.978
Recall@5 (post-insert) 0.983 0.984

Notes:

  • Build is slower on 2node at 1M scale: the cross-node coordination
    overhead (head-sync RPCs, control plane) dominates when there are only
    ~40k head vectors to build.
  • Search scales well already at 1M; recall is unchanged.

100M base + 1M insert

Metric 1node 2node Scale
Build time (s) 15292 16264 0.94×
Pre-insert QPS 183.3 360.5 1.97×
Pre-insert mean latency (ms) 21.56 21.92
Pre-insert p99 (ms) 32.81 39.55
Post-insert QPS (round 2) 183.2 337.2 1.84×
Insert throughput (vec/s) 738 1285 1.74×
Recall@5 (pre-insert) 0.912 0.904
Recall@5 (post-insert) 0.912 0.904

Notes:

  • Search scales near-linearly (1.97×) at the target scale — the per-node
    query partition + remote KV reads are well-balanced.
  • Insert scales sublinearly (1.74×), expected: every insert that
    promotes/splits a head triggers head-index sync across nodes, which is the
    current bottleneck.
  • Build is essentially flat (0.94×): build is dominated by per-node local
    graph construction; 2node has additional head-sync but no work split, so
    near-no scaling here is expected for the current single-builder design.
  • Recall is stable across configurations (within 0.01).

@TerrenceZhangX TerrenceZhangX force-pushed the users/zhangt/merge-distributed-to-tikv branch from dfc4c89 to a158014 Compare May 7, 2026 13:55
@TerrenceZhangX TerrenceZhangX marked this pull request as ready for review May 14, 2026 10:54
Comment thread AnnService/inc/Core/Common/FineGrainedLock.h Outdated
Comment thread AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h Outdated
Comment thread AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h Outdated
@TerrenceZhangX TerrenceZhangX requested a review from zqxjjj May 18, 2026 06:59
Branch users/zhangt/merge-onto-qiazh ports our shared remote/local pool +
per-layer routing changes from users/zhangt/merge-distributed-to-tikv on
top of qianxi's TiKV bugfix branch (lock ordering, splitAsync, version
check, etc.). Avoids the 21-block ExtraDynamicSearcher.h merge conflict
on the merged_spfresh side by replaying instead of merging.

Pragmatic approach for heavy files (ExtraDynamicSearcher.h, SPFreshTest.cpp):
take our HEAD versions wholesale (which already contain our distributed +
MultiChunk logic), and patch only the compile-breaking deltas caused by
qianxi's refactors:
  - PostingCountCache moved from ExtraDynamicSearcher.h to ExtraTiKVController.h
  - KeyValueIO grew MultiMerge + LogAsyncWaitStatsAndReset virtuals
    (qianxi version kept; our MultiPut/MultiDelete virtuals re-added on top)
  - Options/ParameterDefinitionList: kept qianxi version (adds m_globalIDPath)
  - ThreadPool: kept our add_high + added addfront alias for qianxi callers

Index.h / IExtraSearcher.h / SPANNIndex.cpp: applied small additive hooks
on top of qianxi (forward-decl WorkerNode, SetWorker/GetSharedSplitPool
accessors, BuildIndexInternalLayer + AddIndex worker loop). qianxi
bugfixes preserved in those files.

Build system:
  - CMakeLists updated for absl_cord + cordz family (kvproto 25.3 uses
    absl 2308, anaconda's grpc bundles 2111; explicit linkage avoids
    DSO-missing-from-command-line)
  - cmake invoked with gRPC_DIR/Protobuf_DIR/absl_DIR pointing at
    /usr/local so generated kvproto + libabsl 2308 versions align

Verified: SPTAGTest links cleanly.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@TerrenceZhangX TerrenceZhangX force-pushed the users/zhangt/merge-distributed-to-tikv branch from 90dbbae to 8716007 Compare May 20, 2026 06:52
TerrenceZhangX and others added 11 commits May 20, 2026 07:21
Strip the SPFRESH_SHARD_STRIDE opt-in code path (4 helpers + plumbing
through LoadAndInsertBatch/RunBenchmark/RunWorker). No active config
sets the env var; we always use the contiguous slice partition.

Test/CMakeLists.txt: explicitly link ${TiKV_LIBRARIES} into SPTAGTest
so a clean build (no .o cache) resolves gpr_/grpc_ symbols pulled in
by the kvproto generated stubs.

ThirdParty/kvproto/.gitignore: stop tracking regenerated stubs going
forward — they are environment-specific (must match the protoc/grpc
in the build env); regenerate locally via generate_cpp.sh.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The previous if/else duplicated the thread launch+join. Restructure to
a single launch with an optional search-during-insert thread:
  - launch insertThreadCount workers
  - if benchmarking, launch one search thread in parallel
  - join all, then compute stats (only when search ran)

Also log a clear note when the bulk router path is used: the user-
supplied InsertThreadNum is unused there (driver runs one launcher
thread and parallelism comes from [BuildSSDIndex] AppendThreadNum
inside ExtraDynamicSearcher's append/split pool).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
8716007 removed the (m_layers+1) multiplier in the SPDK BlockController
queue-depth formula. The change was based on an incorrect assumption
that the distributed port collapses all per-layer SPDK pools into the
single shared layer-0 pool. In practice only layer 0 + the RPC receiver
share a pool; every inner layer (m_layer >= 1) still creates its own
SPDKThreadPool in both BuildIndex and LoadIndex.

With Layers=2 (current active configs) we therefore have ~2 independent
pools each running insert + reassign + append worker threads, so the
peak concurrent IO-submitter count remains the qianxi-original
(layers+1)*(insert+reassign+append) plus search threads. Under-sizing
the BlockController queue could stall IO submission under heavy
split/reassign + search load; over-sizing is harmless. Restore the
multiplier to match qianxi behaviour.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
All distributed runs override VersionCacheMaxChunks=0 (set by
run_distributed.sh in build/run/nocache phases), so the LRU cache is
effectively disabled. Using ReadChunkCached inside SetVersionBatch
adds bookkeeping noise (cache hit/miss path, refresh-mutex acquire)
that produces no benefit. Switch to direct ReadChunk; the dirty-byte
gating still saves the WriteChunk RPC when no version byte actually
changes.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The distributed port introduced a separate m_highJobs queue + add_high
in ThreadPool plus 'urgent' parameters on AppendAsync/ReassignAsync.
Receiver dispatch already discovered high-priority starved Split jobs
and switched to high=false. The remaining urgent=true callers were:

  - AppendAsync in CollectReAssign's non-TiKV branch (dead under
    Storage::TIKVIO which is the only storage we use)
  - ReassignAsync on head-miss in Append/BatchAppend (same starvation
    risk against Split that motivated the receiver-side revert)

Restore ThreadPool.h to the upstream deque+addfront shape (no semantic
change vs. original) and drop the urgent parameter from AppendAsync/
ReassignAsync, the high flag from JobSubmitter, and the high path from
WireJobSubmitterIfReady.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
run_distributed.sh:
- Remove wait_workers_ready() — dead since the driver-listens-on-30001
  handshake replaced log-grep readiness detection.
- Drop the stale 'Binary already pushed; nothing else to do here' comment
  that sat immediately after the actual binary-push rsync block.

README.md:
- Correct the TiKV deployment model: the cluster is SHARED (all PDs in
  one raft group, all TiKVs registered as stores, max-replicas=1) — not
  one isolated PD+TiKV per node as the old text claimed. Architecture
  diagram, port table, and pre-split helper updated accordingly (one PD
  endpoint, not a per-node loop).
- Fix Step 1 cluster-config path: configs/cluster_2node.conf (an actual
  shipped file), not the non-existent cluster.conf.example.
- Update port defaults to match cluster_2node.conf (23791/23801/20171)
  and call out that the driver's router_port must not collide with the
  dispatcher port 30001 (cluster_2node.conf uses 30011 for this reason).
- List all shipped configs (10m, 100m, insert_dominant, tikv.toml,
  cluster_*.conf) in the file table.
- Document setup-bins subcommand alongside deploy.
- Flag the Build / Distribute / Run split as a workaround for the
  missing distributed SelectHead/BuildHead implementation, so readers
  don't mistake it for the steady-state design.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The previous wording made it sound like the driver was a stateless
coordinator and workers only talked back to it. Reality: node 0 runs as
worker 0 (owns its hash shard like every other worker) and additionally
hosts the dispatcher; workers talk to each other directly through
PostingRouter for remote append, head sync, and merge hints — no
driver-mediated forwarding. Diagram and 'What run does' steps updated.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
We never actually ran the pre-split/scatter helper in our benchmark
runs. Keeping it in the doc gives the false impression that it's part
of the recommended setup. Remove the whole section.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
// If headID is owned by a remote node, queue the append for that
// node and return true; otherwise return false (caller continues
// with local write logic).
bool TryRouteRemoteAppend(SizeType headID,
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we find somewhere to just identify once?

Comment thread AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h Outdated
Comment thread AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h
Comment thread AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h Outdated
if (!m_headIndex->ContainSample(queryResult->VID, m_layer + 1)) continue;

if ((ret=db->Get(DBKey(queryResult->VID), &nextPostingList, MaxTimeout, &(p_exWorkSpace->m_diskRequests))) != ErrorCode::Success) {
if (isRemoteCandidate) {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this correct?

Comment thread AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h Outdated
m_splitThreadPool = std::make_shared<SPDKThreadPool>();
m_splitThreadPool->initSPDK(m_opt->m_appendThreadNum, this);
SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "SPFresh: finish initialization, zeroReplicaCount:%zu\n", zeroReplicaCount);
if (m_layer == 0 && m_headIndex) m_headIndex->SetSharedSplitPool(m_splitThreadPool);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every layer has its split pool.

TerrenceZhangX and others added 4 commits May 21, 2026 09:11
Three Split/Merge/Append code paths duplicated the same check:
  m_worker && m_worker->IsEnabled() &&
each with their own (or missing) m_layer != 0 gate.  Split() at L878
and MergePostings() at L1336 were missing the layer gate entirely, so
on a hypothetical multi-layer cluster they would have skipped local
inner-layer ops (which never use owner-ring routing).

Unify on a single predicate IsRemoteOwnedHead(headID, &nodeIndex) and
gate every callsite on it:
  - TryRouteRemoteAppend  (routing — populates nodeIndex)
  - Split                 (drop remote splits early)
  - MergePostings         (defense-in-depth net)
  - SplitAsync / MergeAsync (don't burn a pool slot for jobs we'll drop)

Addresses PR #448 L553 review comment 'Can we find somewhere to just
identify once'.  Also folds the L1336 'if refine is not there, do we
still need the filter' question — the filter at MergePostings is now
only a safety net behind the MergeAsync enqueue-time gate, so future
RefineIndex removal won't change anything.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Design specifies that when the local VersionMap lags behind a posting
written by a remote peer, the lagging node catches up via
  AddBatch(capacity * numWorkers)

This works because the global VID space is striped across worker
nodes (VID % numWorkers == nodeID), so the peer's maxVID can be at
most ~ localCount * numWorkers ahead of us.  Extending in this large
chunk amortizes many remote inserts into one capacity bump and keeps
growth conflict-free.

The previous EnsureVersionMapCoversPosting did AddBatch(maxVid+1-localCount),
which is correct but causes thrashing — every remote append where
maxVid happens to be slightly past localCount triggers a small extend.

Floor at the exact-gap need so single-node builds (numWorkers <= 1)
behave identically to before.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The four magic constants buried in the RPC layer
  kChunkSize         = 3000   (RemotePostingOps.h)
  attempt < 3        = retry  (RemotePostingOps.h)
  wait_for(180s)     = timeout (RemotePostingOps.h)
  kMaxInflightPerNode = 4     (WorkerNode.h)
are now exposed as SPANN INI parameters under [SSDIndex]:
  RemoteAppendChunkSize       (default 3000)
  RemoteAppendRetry           (default 3)
  RemoteAppendTimeoutSec      (default 180)
  RemoteAppendMaxInflight     (default 4)

Defaults preserve current behavior.  Plumbing:
- Options.h / ParameterDefinitionList.h: declare/register parameters
- RemotePostingOps: hold values in atomics, expose Set/Get* setters
- WorkerNode: forward setters; m_maxInflightPerNode is now atomic
- ExtraDynamicSearcher::SetWorker: push m_opt->m_remoteAppend* once

This unblocks per-deployment RPC tuning (e.g. larger chunks on low-
latency clusters, shorter timeouts in CI) without recompiling, and
removes the long historical comments documenting why the chunk size
was changed 5 times during benchmarking.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Design says async Split/MergeAsync jobs must be safe-to-retry from
any compute node (Section: Async Job Fault Tolerance).  Previous code
recorded a non-Success ret into m_asyncStatus and silently dropped
the job — a transient failure (TiKV blip, remote-lock timeout, etc.)
permanently lost the split/merge.

Both MergeAsyncJob and SplitAsyncJob now carry an attempts counter.
On non-Success, if attempts+1 < m_asyncJobMaxRetry (new SPANN option,
default 3), the job re-adds itself to m_splitThreadPool without
touching the in-flight counter, so the outer drain loop still
accounts for it. After MaxRetry exhaustion the failure surfaces via
m_asyncStatus as before, plus a clear LL_Error log identifying the
head and attempt count.

Idempotency requirements for safe retry are already in place:
- Owner check (IsRemoteOwnedHead) drops remote heads immediately
- ContainSample liveness gate inside Split/MergePostings
- Re-locking the per-head RWLock on each entry
- Read-deduplicate during the next split attempt for partial writes

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
TerrenceZhangX and others added 23 commits May 21, 2026 09:36
Previously the dispatch result only signalled Success/Failed via a 1-byte
enum, so any worker-side failure (TiKV unavailable, KeyNotFound during
search, append rejection, etc.) collapsed into a generic 'Failed' that
the driver couldn't distinguish or react to differently.

Bump DispatchResult MirrorVersion 1 -> 2 and add m_errorCode (int32).
Read/Write gated on mirror >= 2 so older peers stay compatible (they
leave the field at 0).  Driver-side HandleDispatchResult now logs the
errorCode at LL_Error on failed paths, and the existing log line for
every result echoes the code so post-mortem traces show exactly what
each worker reported.

Sample wiring: SPFreshTest's worker dispatch callback sets m_errorCode
on its Unknown-command fallback.  Other code paths (Search/Insert)
already only fail through exceptions in the helpers, which the driver
treats as crash-class events; the field is ready for future failure
propagation work in those paths.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Design's receive-side flow specifies a HandleRaceCondition step before
the local Append callback runs: 'check whether the target HeadID is
currently being split or merged on this node; if so, the append waits
for the structural operation to commit before proceeding.'  Without
this, the existing wasMissing branch (which re-materializes a missing
head from the sender's headVec) can resurrect a head that local Merge
just deleted.  The race is real but small — the per-head RWLock used
by Append/Split/Merge already serializes RMW, but the head-index
ContainSample check + AddHeadIndex resurrection happens outside that
lock.

Add ExtraDynamicSearcher::HandleRaceCondition(headID) that:
  1. Peeks m_splitList / m_mergeList for the head.
  2. If present, briefly acquires the per-head RWLock to wait for the
     structural op to commit.
  3. Returns; the callback continues with a stable view, and the
     normal Append re-acquires the RWLock for the actual RMW.

When the head is genuinely gone after the wait, the sender's later
retry will see the updated head index (via HeadSync) and re-route to
the new owner — exactly the path the design's Append-vs-Merge race
section describes.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Replace the per-bucket atomic<bool> remote-lock cache with a dedicated
RemoteLeaseTable that tracks per-bucket expiry timestamps.  This lets
the owner auto-reclaim a remote lock when the holder crashes or stalls
beyond RemoteLockTtlMs (default 30s) instead of blocking Split/Merge
forever.

New file: AnnService/inc/Core/SPANN/Distributed/RemoteLeaseTable.h.

Fencing tokens deferred — they require a protocol-mirror bump on
RemoteLock{Request,Response} and a callback signature change; will be
added when the watchdog/resend path lands.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
QueueRemoteAppend's auto-flush is fire-and-forget: when the receiver
is briefly unreachable the batch was previously dropped after a single
log line.  This breaks the distributed design's at-least-once async
job contract.

Add AsyncJobWatchdog (new file under Distributed/) that owns timeout-
driven, bounded exponential-backoff resends in a single background
thread.  Wire WorkerNode's auto-flush failure path to hand the batch
to the watchdog instead of dropping it.  RemoteAppend is idempotent on
the receive side (per-posting RMW), so at-least-once is safe.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Adds two TiKV-backed durability primitives matching the distributed
design's HeadSync Job Fault Tolerance and Split Path WAL sections:

  * HeadSyncLog (new file Distributed/HeadSyncLog.h)
      Per-shard monotonically-versioned log of HeadSyncEntry, keyed by
      'hs/e/<shard>/<verBE>', with 'hs/v/<shard>' as the published
      tip and 'hs/c/<node>/<shard>' as each node's applied cursor.
      Exposes Append/ReadSince/LoadCursor/StoreCursor and an optional
      background reconciler thread.  Raw KV (no txn) per design
      guidance; producer-side per-shard mutex serializes version
      bumps and the next reader catches up via cursor replay.

  * SplitWAL (new file Distributed/SplitWAL.h)
      Stage-tracked record under 'wal/split/<headID>/<jobID>' so that
      a cross-owner split can be GC'd after partial failure (one side
      written, the other not).

Wire-in: ExtraDynamicSearcher's BroadcastHeadSync now persists entries
to HeadSyncLog before issuing the in-memory broadcast.  Broadcast
remains the latency path; TiKV is the source of truth so lost or
duplicated broadcasts no longer threaten correctness.

SplitWAL Begin/Commit hooks at the split site, and reconciler thread
activation, are scaffolded behind the new members but not yet wired
into the split flow; they are sequential follow-ups that require
distributed integration testing.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Per the design's Async Job Fault Tolerance section, lease-based locks
need an accompanying fencing token so that a zombie holder which
resumes after its lease expired cannot mutate state now protected by
a newer holder.

Protocol bumps (backwards compatible via mirror-version gates):
  * RemoteAppendRequest  mirror 1 -> 2: m_fencingToken (uint64).
                         Token 0 = unfenced (normal owner-ring route).
  * RemoteLockRequest    mirror 1 -> 2: m_token (uint64).
                         Lock sends 0; Unlock sends issued token.
  * RemoteLockResponse   mirror 0 -> 1: m_token (uint64).
                         Owner returns issued fencing token on Lock.

API changes:
  * RemoteLeaseTable: TryAcquire returns uint64_t token (0=denied);
    Release(bucket, token) only succeeds if token matches; Validate
    used by receiver-side fence check.
  * RemoteLockCallback: bool -> uint64_t signature carrying the token.
  * SendRemoteLock returns uint64_t (issued token on Lock).
  * New FenceValidator callback + RemotePostingOps fence-check on
    inbound RemoteAppend; rejected if token stale.
  * New WorkerNode::SendFencedRemoteAppend synchronous helper for
    split's cross-owner write path (unblocks split-atomicity).

The ExtraDynamicSearcher lock callback now plumbs tokens end-to-end
through RemoteLeaseTable.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Replace the two TryRouteRemoteAppend call sites in Split (existing-head
merge path and new-head create path) with the synchronous
TryWriteRemoteSplitChildFenced helper when the new head is remote-owned.
The helper performs try-lock-both, writes a SplitWAL Begin record,
sends a fenced RemoteAppend with the lock's monotonic token, then
releases the lock and clears the WAL on success.

On fenced-write failure (lock contention or RPC error), fall back to
the legacy async TryRouteRemoteAppend so the posting is not stranded;
the WAL + watchdog converge eventually.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…PC chunks, remote stats

* Drop the m_layer != 0 short-circuit in IsRemoteOwnedHead.  Both layers
  store postings in the same TiKV cluster (DBKey = m_maxID*m_layer+postingID)
  and need owner-ring routing, fencing, and SplitWAL just like layer 0.
  HeadSync broadcast stays layer-0-only since layer-1 centroids are derived
  from layer-0 splits and reach peers via that broadcast.
  SplitWAL keys now carry the layer to avoid collisions across layers.

* Fix MergeAsyncJob / SplitAsyncJob retry use-after-free: the SPDKThreadPool
  worker unconditionally deletes the Job after exec() returns, so the prior
  'add this; return;' retry path freed the Job while it was still queued.
  Enqueue a fresh Job carrying the bumped attempt count instead.

* Bump RemoteAppendChunkSize 3000->10000 and RemoteAppendMaxInflight 4->8.
  Per-chunk grpc framing was dominating, and with replica fan-out =8 the
  outbound queue at 1M+1M scale ships ~8M items; larger chunks amortize
  send overhead ~3x.

* Add remote queue stats to layer progress + ALL DONE logs and gate the
  ALL DONE boundary on the outbound queue draining.  Previously ALL DONE
  fired as soon as local SPDK pool was empty, even though the network
  pump was still shipping millions of fan-out items, making runs look
  stuck for tens of minutes.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…Option A)

When a worker receives a BatchRemoteAppendRequest from a peer, instead of
holding the connection open until every item has been applied (which made
big chunks block long enough to trigger sender timeouts and full-chunk
retries), it now:

  1. Serializes the batch and Put()s it to TiKV under
       wal/rappend/<receiverNode>/<batchID>
  2. Immediately ACKs the sender as 'Accepted'.
  3. Submits the per-item Append jobs onto the per-layer searcher pool.
  4. On last-item completion, deletes the WAL key (best-effort).

On startup, layer-0's SetWorker scans the WAL prefix and re-submits any
batches durably accepted before a previous crash. The Append callback is
already idempotent (versionMap dedup), so duplicate replays are safe.

Implementation:
- New BatchAppendWAL helper (mirrors SplitWAL's style).
- New KeyValueIO::ScanPrefix(prefix, out, max) virtual; TiKVIO implements
  it via paged RawScan with logical-key stripping. Default is no-op so
  non-TiKV backends keep compiling.
- RemotePostingOps::HandleBatchAppendRequest now WAL-then-ACK-then-submit,
  with a graceful fallback to the legacy synchronous-ACK path if the WAL
  Put fails. Shared item-dispatch logic is factored out into
  SubmitBatchItems for reuse by RecoverPendingBatches.
- BatchAppendItemJob takes sendResponse/batchID flags so the same job
  serves both the WAL-backed path (delete WAL on last completion) and the
  legacy path (ACK on last completion).
- ExtraDynamicSearcher::SetWorker constructs the WAL once (layer 0 only,
  scoped by receiver node) and triggers recovery after callbacks are
  wired.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
After enabling the WAL-then-ACK fast path, an aggressive sender could
ACK 1M items in seconds while the receiver's apply pool was still
working through the first 10k — pending queue grew unbounded (1M+),
splits starved because pool workers were all blocked on appends.

Add admission control: RemotePostingOps counts items currently queued
for async apply via m_walPendingItems. When admitting a new batch would
push that above m_walPendingItemsCap (default 50000) we DELIBERATELY
fall back to the synchronous-ACK path, which re-engages the sender's
MaxInflight gate as a natural backpressure mechanism.

Also surface m_walPendingItems in the per-layer progress log
('walPendingItems:N') so operators can see when admission control is
actively engaged.

Verified 2-node insert_dominant 1M+1M: insert throughput 710→770/s
(+8.5%), recall@5 0.976→0.984, post-insert qps 401→438. Pending queue
stays bounded at ~80-130k under load; splits make steady progress.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…rigin pending gauge

Bug fix
-------
SendBatchRemoteAppend moves items[i] into per-chunk std::vector and
calls SendBatchRemoteAppendChunk. If a chunk failed (e.g. timeout) the
function returned without restoring the moved-out items, so the
caller's vector ended up with the leading chunks moved-from (headID +
appendNum scalars still valid, but m_headVec / m_appendPosting empty).
WorkerNode::QueueRemoteAppend's auto-flush path then copied the whole
vector into a watchdog retry queue, which re-sent valid headIDs with
empty postings. The receiver hit the TiKVIO::Merge empty-value gate,
logged 'TiKVIO::Merge: empty append posting!' and 'Merge failed for
HEAD! Posting Size:0' for every such phantom item — in a 2-node
insert_dominant run we observed 390k+ such errors on the driver and
60k on the worker.

Fix:
- SendBatchRemoteAppend now (a) restores moved-out items from the
  still-populated chunk on failure, then erases the already-sent
  prefix so the caller-side retry only sees unsent payload, and
  (b) clears the input vector on full success so any spurious retry
  becomes a no-op instead of resurrecting phantom items.
- Append() drops empty/zero-count payloads with a single warning
  rather than letting them reach the storage layer (defensive guard;
  receiver should never see these once the sender bug above is
  fixed).

Observability
-------------
Added a per-layer counter m_remoteOriginPending in RemotePostingOps,
incremented in SubmitBatchItems and decremented in BatchAppendItemJob.
Exposed via WorkerNode::GetRemoteOriginPendingItems(layer) and a
whole-node aggregate. The progress log in ExtraDynamicSearcher now
prints 'pending queue:N (local:X remote:Y)' so operators can tell
whether the local pool is bottlenecked on its own RMWs/splits or on
serving peer BatchAppend items. Both progress log call sites
(AllFinished's periodic line and GetDBStats's on-demand line) updated
to the same format with the remote out queueDepth / inflightChunks /
walPendingItems context.

Verified on 2-node insert_dominant: 0 empty-posting / merge-failed
errors (was 450k), throughput 758-797/s (within noise of 710 baseline
and 770 WAL run), recall 0.984-0.990.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Background
----------
MergeAsyncJob and SplitAsyncJob previously retried every non-Success
ErrorCode in a tight loop (re-enqueueing immediately on the same pool
worker), capped at AsyncJobMaxRetry=3. This wasted pool slots on
permanent failures (e.g. logical data inconsistencies that no retry
can repair) and gave transient TiKV failures only ~no time to recover
before exhausting the retry budget.

Changes
-------
- Add Distributed/DelayedJobScheduler.h: a single-threaded helper that
  re-enqueues ThreadPool::Job pointers to a target pool after a
  per-call delay. Owns pending jobs between Schedule() and dispatch;
  destructor joins the worker and deletes any undispatched jobs to
  avoid leaks on shutdown. Holds the target pool via shared_ptr so the
  scheduler can survive teardown ordering.

- Add IsTransientAsyncJobError(ret) classifier. Transient: Fail,
  DiskIOFail, EmptyDiskIO, Socket_*. Permanent: everything else
  (Key_NotFound, Posting_*, Block_IDError, etc.). ErrorCode::Fail is
  intentionally transient because every TiKV failure path returns it;
  the rare logical-Fail callers (e.g. headVec-missing in MergePostings)
  pay a bounded number of wasted retries which is acceptable until
  a more specific code is introduced.

- Add AsyncJobRetryBackoffMs(attempt): exponential backoff (200ms
  doubling, capped at 30s).

- MergeAsyncJob and SplitAsyncJob now:
  * On transient + retry budget remaining → re-enqueue via the
    DelayedJobScheduler with exponential backoff (off the pool
    worker so we do not block a job slot during the wait).
  * On permanent → log Warning once, drop, do NOT poison
    m_asyncStatus (these are typically per-head local inconsistencies
    that the next caller-driven recovery handles, and surfacing them
    as process-wide failure was hiding real transient issues).
  * On transient with budget exhausted → keep the existing behaviour
    of setting m_asyncStatus + LL_Error, so a persistent outage still
    bubbles up.

- Bump AsyncJobMaxRetry default 3 -> 8. With the new backoff schedule
  this gives ~25s total retry budget per job (200+400+800+1600+3200+
  6400+12800ms), enough to ride out a short TiKV region rebalance or
  network blip without the operator needing to override the config.

The scheduler is lazily constructed on first retry, so single-node /
build-only paths that never exercise async retries do not pay for an
extra background thread.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…on bug

Five tightly related changes that together raised insert_dominant 2-node
throughput from 758/s to 1039/s (+37%) while preserving recall=0.98.

1) Receiver-side BatchAppend fast path

   Previously, an incoming N-item BatchAppend RPC was unpacked into N
   separate per-item Jobs, each calling Append() -> db->Merge once.
   At ChunkSize=10k that meant 10k pool jobs and 10k Merge round-trips
   per RPC, which saturated the receiver pool and made it the dominant
   bottleneck (peerOrig pending queue routinely sat above 100k).

   The new BatchAppendCallback (registered alongside the existing
   AppendCallback) takes a vector<RemoteAppendRequest*> covering an
   entire layer's worth of items in one RPC. The receiver groups item
   indices by layer and dispatches ONE BatchAppendLayerJob per layer;
   that job runs Phase 1 (per-item HandleRaceCondition + resurrection
   refusal + versionMap mirror) then Phase 2 (group surviving items by
   headID and call BatchAppend()/db->MultiMerge() ONCE). This matches
   the local AddIndex fast path's I/O profile.

   Falls back to the legacy per-item path if a layer has no batch
   callback registered (early bring-up, partial reload).

2) Fix HandleRaceCondition resurrection bug

   HandleRaceCondition() previously acquired-and-released the head's
   RWLock without telling the caller whether a structural op had
   actually occurred. AppendCallback then unconditionally resurrected
   missing heads via AddHeadIndex, which could bring back a head a
   concurrent merge had just deleted.

   Fix:
     - HandleRaceCondition() now returns bool observedStructural.
     - AppendCallback refuses to resurrect when wasMissing &&
       observedStructural, returning ErrorCode::Fail (transient).
       The sender's retry will re-resolve the owner after HeadSync
       Delete propagates.

3) Broadcast HeadSync Delete on Merge

   Split already broadcast HeadSync Delete for losers; MergePostings
   did not. Without the broadcast, peer compute nodes' head indices
   kept routing BatchAppend to the deleted head, triggering the
   resurrection bug. MergePostings now tracks deletedHeadVID in both
   loser branches and broadcasts after lock release (skipped when the
   layer is disk-backed, since TiKV is the source of truth there).

4) Auto-size WAL admission cap from ChunkSize x MaxInflight

   The receiver's WAL pending-items cap was hardcoded at 50k. When
   ChunkSize was raised to test 50k, a single chunk immediately
   tripped the cap and forced every chunk down the slow synchronous-
   ACK path (chunks timing out at the 180s RPC deadline).

   ExtraDynamicSearcher::SetWorker now derives the cap as
   ChunkSize * MaxInflight * 2 from the SPANN options, so the cap
   scales with the configured in-flight window.

   Default ChunkSize bumped 10000 -> 20000 (the receiver-side batched
   path makes the per-Merge fixed cost much cheaper, so larger chunks
   amortize the network roundtrip better without inflating the
   receiver pool depth).

5) Simplify ownership filtering

   Remove duplicate IsRemoteOwnedHead() body-side checks in Split()
   and MergePostings(). The single authoritative gate lives in
   SplitAsync()/MergeAsync(); the hash ring is static after init and
   only layer 0 routes anyway, so the body re-check was dead code.
   Saves one GetOwner() per executed Split/Merge job.

Diagnostics:
- progress log split: 'pending queue local/remote' relabeled to
  selfOrig/peerOrig, with clarifying comment that selfOrig=0 is
  expected (local-owned items bypass the pool via synchronous
  MultiMerge) and peerOrig is what the receiver-side work counts.
- new addIndex route counters track heads(local:X remote:Y)
  items(local:I remote:J) in BatchAppend's TryRouteRemoteAppend
  decision, surfacing ownership skew in the progress log.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…shutdown

Three related fixes for distributed Split/Merge robustness.

1. Atomic Split lock acquisition (Phase A/B/C/D)
   Refactor Split() into precompute-plan / build-payloads / acquire-all-locks
   / execute-writes phases.  Closes the strand window where k=0 wrote and
   k=1 then failed to lock, leaving cluster-1 vectors orphaned.  All
   per-VID local locks (sorted ascending) and per-(owner,bucket) remote
   fencing-token leases (sorted ascending) are acquired before any DB
   write; failure cleanly releases and re-enqueues via SplitAsync.  The
   deterministic ordering prevents deadlock between concurrent Splits on
   overlapping heads.

2. Drop SplitAsync/MergeAsync retries
   Structural ops are best-effort self-healing: a failed Split leaves the
   head oversized so the next Append re-triggers SplitAsync; a failed
   Merge leaves postings undersize so the next Search-driven
   AsyncMergeInSearch / RefineIndex re-triggers MergeAsync.  The previous
   retry loop burned pool slots and racy-spawned jobs into a torn-down
   WorkerNode at shutdown, which is what was producing the segfault.

3. Drain async jobs in ~ExtraDynamicSearcher
   The dtor used to set m_worker=nullptr immediately; in-flight Split/Merge
   jobs joined later by the ThreadPool dtor then null-deref m_worker via
   QueueRemoteAppend.  Now poll per-layer in-flight counters until zero
   (30 s timeout) before clearing callbacks, and leave m_worker alone -
   it is externally owned by the SPFreshTest router.

Plus support cleanup:
- RemoteLeaseGuard: reusable RAII type with fencing-token validation,
  replacing the inline RemoteLockGuard helper in MergePostings.
- HandleRaceCondition removed: the single-gate refactor at
  SplitAsync/MergeAsync plus atomic locking above closes the race it
  was working around; the AppendCallback/BatchAppendCallback wasMissing
  branch now refuses unconditionally.
- MergePostings distinguishes Key_NotFound (skip stale candidate) from
  other IO failures (propagate) instead of silent-skipping all errors.

Measured (2-node insert_dominant, 1M vectors):
  Insert throughput: 1141.6 /s (baseline 758 /s, +50%)
  Recall@5:          0.984
  Segfaults:         0 (was: shutdown crash every run)
  Retry log lines:   0
  Drain timeouts:    0

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…it lock acquisition

Phase D: wrap SendFencedRemoteAppend in a bounded 3-attempt retry loop
(10/20/40ms backoff with stale-token release + re-acquire on each retry).
On exhaustion clear the SplitWAL record, walk the per-Split committed-child
log in reverse to roll back partial progress (SameHead db->Put restore,
LocalNew DeleteIndex+Delete, Remote local-BKT DeleteIndex, LocalExisting
best-effort), then return ErrorCode::Fail so the caller (BatchAppend /
AddIndex) sees the failure and can retry the entire op.  srcHead is
preserved: the trailing 'if (!theSameHead) DeleteIndex(headID)' is gated
behind a Success return, so failures never strand the source cluster.

Phase C: collapse the previous two-pass lock acquisition (local + remote
with sort-by-VID / sort-by-(owner,bucket)) into a single pass over plans[].
The ascending-VID sort never actually prevented deadlock because srcHead
is already held; deadlock-freedom comes from try_lock + reenqueueAndExit,
which re-queues the Split via SplitAsync on contention.  Both branches
retain 20-attempt try-lock + 3*N ms backoff before bailing out.  The post-C
'invariant' assertion block duplicating the same filter logic is dropped:
the single-pass plans[] iteration makes it self-evidently correct.

Phase D control flow: restructure the per-k loop with explicit if/else
the local path matches the pre-distributed PR-target structure with the
remote dispatch as a sibling branch.

Verified: 2-node insert_dominant 1M+1M sustained 1263.7 vec/s (vs 1141.6/s
in 15f17c9) with recall\@5 = 0.986 (vs 0.984), zero segfaults, zero
fencing-token rejections in the run.  The 72 observed retry-exhaustion
events were all TiKV gRPC Deadline Exceeded propagating through; caller-
level retry handled them transparently.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…ranching

ExtraDynamicSearcher.h:
- MergePostings candidate branch: single GetOwner call, explicit
  if/else on isRemoteCandidate (lease-acquire vs try_lock + ContainSample)
- Unified re-queue: local lock-busy AND remote lease-busy both
  re-queue via reenqueueMerge() lambda (counter discipline preserved)
- 4 ec=%d log sites switched to ec=%s via Helper::Convert::ConvertToString
- MergePostings loser-delete: collapse local/remote into single
  db->Delete + BKT::DeleteIndex with location=local|nodeN log
- Restore bool urgent=false parameter on AppendAsync/ReassignAsync
  (3 callsites pass true: CollectReAssign batch fallback, Append
  HeadMiss, BatchAppend HeadMiss); restore addfront dispatch
- Append() prologue: keep separate empty-posting drop + appendNum==0
  log (do not collapse into single early-return)
- Add FindSelfEntryVectorBytes helper for posting self-entry scan
- Delete TryRouteRemoteAppend wrapper; Append/BatchAppend/Reassign
  use explicit 'if (m_worker && IsEnabled()) { if IsRemoteOwnedHead
  { Enqueue+return } else { WaitForRemoteBucketUnlocked } }' pattern
- BatchAppend now calls WaitForRemoteBucketUnlocked for parity with
  Append on the local-owned branch
- BatchAppend routing counters only increment in distributed mode
- Reassign loop: flat 'isRemote = (m_worker && IsEnabled &&
  IsRemoteOwnedHead)' + if/else for clean two-way branch
- BuildIndex zero-replica refill: move WireJobSubmitterIfReady
  inside the pool-init if, consistent with LoadIndex pattern

Index.h:
- Remove unused m_sharedSplitPool slot mechanism (m_sharedSplitPool,
  m_sharedSplitPoolMutex, Get/SetSharedSplitPool). WorkerNode receiver
  shares the layer-0 pool via the SetJobSubmitter lambda closure;
  the per-Index slot was dead code in all observed flows.

Verified: 1M+1M insert_dominant 2-node — insert throughput 1226.7 vec/s,
recall@5 0.984/0.980 pre/post-insert; within run-to-run variance of
the 6d5a1b8 baseline (1263.7 vec/s, 0.986).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The fenced cross-owner Split append used 3 retries with exponential
backoff (10/20/40 ms, ~70 ms total).  This was too tight when the
receiver was momentarily slow on TiKV — every Deadline-Exceeded burst
forced a Split rollback.  In the 1M+1M 2-node benchmark we observed
66 rollbacks per run.

Bump to 20 attempts with linear 3*N ms backoff (~570 ms worst-case),
matching the local lock-acquire retry budget used for sibling-Split
contention elsewhere in the prologue.  Splits that genuinely cannot
publish still propagate Fail to AddIndex so the caller can retry
from the user level.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…ublish new remote heads

The receiver-side AppendCallback unconditionally returned Fail when the
target head was missing on the local index, on the theory that a
concurrent Merge/Split had just deleted it and resurrecting would
race the HeadSync Delete broadcast.  The follow-up AddHeadIndex
call after the return was dead code.

But Split's legitimate "publish a brand-new child head on a remote
owner" path also goes through AppendCallback with wasMissing == true
(the child does not yet exist on the owner).  These appends already
carry a valid fencing token earned by an authoritative bucket lease
on the new head's VID, so they are safe to materialize.

Plumb the fencingToken parameter from HandleAppendRequest through
the AppendCallback typedef (and the two BatchAppendItemJob invocation
sites) into the lambda.  In the wasMissing branch, if fencingToken
is non-zero and a headVec was supplied, resurrect via AddHeadIndex
(the original intent).  Otherwise (unfenced append on a missing
head) keep refusing — that path is the racy structural-op case.

Eliminates the ~66 silent rollbacks per 1M+1M insert run that were
costing ~0.6% recall.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…ue on lock busy

When a merge candidate's lock (local m_rwLocks or remote bucket
lease) was busy, MergePostings re-enqueued itself as a fresh
MergeAsyncJob with zero backoff.

This is a livelock trap whenever two adjacent heads pick each other
as the top merge candidate: each holds its own m_rwLocks entry
inside MergePostings, each fails to lock the other, both re-enqueue,
and the new copies race back through the same path immediately.
The benchmark log shows 348 such re-enqueues for the same head pair
(622604 / 622608) in one window — a tight CPU-burning ping-pong
that starves the rest of the merge queue.

Replace the re-enqueue with a plain 'continue' to skip the current
candidate and try the next neighbor in queryResults.  Worst case
this round produces no merge for the current head, which is benign:
the head remains in m_postingSizes and becomes merge-eligible again
in the next round once its posting size still falls under the
merge threshold.  Delete the now-unused reenqueueMerge lambda.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
When the local future waited only 5 s, an in-flight SendRemoteLock could
time out while the owner had already issued a lease for that bucket --
the Grant response then arrived after we'd given up, leaving the owner
holding an orphaned lease that blocked every subsequent acquire attempt
on the same bucket for the full lease TTL.

Wait up to the receiver-side lease TTL (RemoteLeaseTable default 30000
ms): any lease the owner issues for this request auto-expires by the
time we return, so a late-arriving Grant on a timed-out RPC cannot
leave an orphaned lease. The receiver sends responses synchronously
after processing, so the remaining paths to a real timeout (dead peer,
network partition lasting >= TTL) wouldn't have benefited from a
shorter wait anyway.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The fence-retry path in Split deadlocks when the Append RPC timeout
equals the receiver-side lease TTL: under TiKV pressure the receiver
can take ~30 s on a single append, so the sender's 30 s timeout fires
at exactly the moment the receiver's lease auto-expires.  The retry
calls SendRemoteLock again, but by the time the lock request lands a
sibling Split has already claimed the bucket; the entire 20-attempt
retry budget then burns failing to re-acquire and Split rolls back.

Pick 4 x TTL so a real Append timeout unambiguously means the lease
has been recoverable for long enough that any concurrent acquisition
has had a chance to release.  The remaining cause is a hung or crashed
peer, which is the actual condition this guard should fire on.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
… TTL

The previous 5 s cap made the local writer barge in while a remote
Split that held an advisory lease on the bucket was still mid-flight.
Worst case: Split then broadcasts HeadSync Delete on srcHead and the
items we just appended disappear with the head -- recall drops
silently with no error.

Tie the cap to RemoteLeaseTable::GetTtlMs() (default 30 s): after TTL
the entry is auto-reclaimed by IsLocked() so this loop exits naturally
on its own.  The 'stuck for ... ms, proceeding' log path is now truly
anomalous and worth surfacing in the regression-detector queries.

In the 2-node insert_dominant benchmark this dropped 'stuck for' from
74 events per run to 0.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
When node A holds lease T on bucket(headX) at node B and sends a
FencedRemoteAppend(T) for headX, B's RPC handler validates the fence
(passes), then enters AppendCallback -> Append(headX, ...).  Since
headX is locally-owned on B, Append falls into
WaitForRemoteBucketUnlocked(headX) -- but the lease blocking that
bucket is A's, our own caller's.  B waits up to TTL (~30 s) for A's
lease to expire while A is blocked in SendFencedRemoteAppend waiting
for B's response.

Throughout that 30 s self-block every sibling Split that hashes into
the same bucket sees 'lease busy', burns its 20-attempt retry budget,
and rolls back.  This was the dominant cause of 'lease busy' cascades
on adjacent splits in the 2-node insert_dominant benchmark
(~40-80 events per run; recall dropped to 0.984 with periodic Split
rollbacks).

Add a p_skipRemoteBucketWait bool to Append and BatchAppend; the
receiver-side single-item callback passes
fencingToken != 0, and the BatchAppend callback passes
anyFenced computed across surviving items.  Safety: fence validation
upstream already proved the sender owns the lease covering all
in-flight modifications to this bucket, and per-head serialization via
m_rwLocks[headID] inside Append's body is unchanged.

Local writers (Append called from AddIndex / Split / Reassign / Merge)
keep the default false: they still honour any remote initiator's
advisory lease.

Result after stacking with the SendRemoteLock, fenced-Append, and
WaitForRemoteBucketUnlocked TTL alignments: 0 stuck, 0 lock timeouts,
0 rollbacks, 0 cannot-re-acquire, lease-busy events drop to ~2 per
run (down from 40-80).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Comment thread AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants