Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
8716007
Replay distributed work onto users/qiazh/pre-merge-tikv-bugfix
May 20, 2026
4186747
Fix unneede diff
TerrenceZhangX May 20, 2026
ee97d3f
Remove unused stride-shard experiment
TerrenceZhangX May 20, 2026
4df704f
InsertVectors: dedupe branches, log InsertThreadNum ignore in bulk path
TerrenceZhangX May 20, 2026
c27a109
Restore (layers+1) multiplier in BlockController IO queue size
TerrenceZhangX May 20, 2026
f3a9de9
SetVersionBatch: bypass LRU cache, read TiKV directly
TerrenceZhangX May 20, 2026
f35ae85
Drop high-priority job queue from SPDKThreadPool
TerrenceZhangX May 20, 2026
a49b26d
Fix space
TerrenceZhangX May 20, 2026
689e5b2
Fix distributed benchmark README + drop dead orchestrator code
TerrenceZhangX May 20, 2026
ee405d4
README: clarify driver = worker 0 + dispatcher; workers peer-to-peer
TerrenceZhangX May 20, 2026
6cf7d36
README: drop unused TiKV pre-split helper section
TerrenceZhangX May 20, 2026
07bdc03
Clean comment
TerrenceZhangX May 20, 2026
f0d8fe5
Extract IsRemoteOwnedHead predicate for owner-ring checks
TerrenceZhangX May 21, 2026
d55de54
VersionMap extend: use stride formula capacity*numWorkers
TerrenceZhangX May 21, 2026
3703866
RemotePostingOps: move RPC chunk/retry/timeout/inflight into INI options
TerrenceZhangX May 21, 2026
9619b2f
Async Split/Merge jobs: retry counter + re-enqueue on failure
TerrenceZhangX May 21, 2026
864e268
DispatchResult: carry SPTAG::ErrorCode back to driver
TerrenceZhangX May 21, 2026
1cd19f1
AppendCallback: HandleRaceCondition gate against in-flight split/merge
TerrenceZhangX May 21, 2026
dca197b
SPANN distributed: TTL-based remote lock lease
TerrenceZhangX May 21, 2026
489ff4e
SPANN distributed: watchdog for failed async append batches
TerrenceZhangX May 21, 2026
7093d40
SPANN distributed: durable HeadSync log + Split WAL scaffolding
TerrenceZhangX May 21, 2026
111d37c
SPANN distributed: full lease-fencing with monotonic fencing tokens
TerrenceZhangX May 21, 2026
74c0350
SPANN distributed: wire split path through fenced cross-owner write
TerrenceZhangX May 21, 2026
de3fa64
SPANN distributed: route inner layers, retire async-job UAF, larger R…
TerrenceZhangX May 21, 2026
06d8899
feat(distributed): receiver-side durable Batch WAL for RemoteAppend (…
TerrenceZhangX May 21, 2026
7aca9f0
fix(distributed): receiver-side admission control for Batch WAL
TerrenceZhangX May 21, 2026
2088e13
fix(distributed): stop replaying moved-out items + per-layer remote-o…
TerrenceZhangX May 22, 2026
3107dbc
feat(distributed): classify async-job errors + exponential backoff retry
TerrenceZhangX May 22, 2026
19ba298
perf(distributed): receiver-side batched BatchAppend + fix resurrecti…
TerrenceZhangX May 22, 2026
15f17c9
fix(distributed): atomic Split locking, drop async retries, drain on …
TerrenceZhangX May 24, 2026
6d5a1b8
fix(distributed): bounded fenced-append retry, rollback, simplify Spl…
TerrenceZhangX May 24, 2026
74a5a8c
refactor(distributed): explicit distributed gate + cleanup hot-path b…
TerrenceZhangX May 24, 2026
b0774db
fix(distributed): extend fenced-append retry to match local lock budget
TerrenceZhangX May 24, 2026
279100e
fix(distributed): plumb fencingToken to AppendCallback so Split can p…
TerrenceZhangX May 24, 2026
dfb77a9
fix(distributed): MergePostings skip-and-continue instead of re-enque…
TerrenceZhangX May 24, 2026
f39db6c
fix(distributed): bump SendRemoteLock RPC timeout to lease TTL (30s)
TerrenceZhangX May 24, 2026
0cb7eaf
fix(distributed): bump fenced Append RPC timeout to 4x lease TTL (120s)
TerrenceZhangX May 24, 2026
6bfe0f1
fix(distributed): align WaitForRemoteBucketUnlocked wait cap to lease…
TerrenceZhangX May 24, 2026
adaf01c
fix(distributed): receiver-side fenced Append bypasses self-bucket wait
TerrenceZhangX May 24, 2026
17e8646
Remove unused variable
TerrenceZhangX May 25, 2026
82dc35a
fix(socket): typo in SimpleSerialization static_assert messages
TerrenceZhangX May 27, 2026
8fd4c30
build(test): gate absl_* link deps behind if(TIKV)
TerrenceZhangX May 27, 2026
047ed5b
fix(socket): separate error_code per endpoint() call in Connection
TerrenceZhangX May 27, 2026
1786092
fix(distributed): explicit field-wise Encode/Decode for SplitWAL::Record
TerrenceZhangX May 27, 2026
149bdd4
fix(distributed): graceful WorkerNode shutdown drains auto-flush threads
TerrenceZhangX May 27, 2026
5d4dbd3
fix(bench): move SPTAGTest CWD to per-scale scratch dir on NVMe
TerrenceZhangX May 27, 2026
8ccbd98
Merge users/qiazh/pre-merge-tikv-bugfix: hashmap LocalVersionMap + CAS
TerrenceZhangX May 30, 2026
dabe74a
fix(versionmap): restore per-layer Initialize to seed alive heads
TerrenceZhangX May 31, 2026
e7ef65d
perf(versionmap): batch per-VID hot loops via BatchGetVersions/MultiPut
TerrenceZhangX May 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion AnnService/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ include_directories(${Zstd}/lib)
file(GLOB_RECURSE HDR_FILES ${AnnService}/inc/Core/*.h ${AnnService}/inc/Helper/*.h)
file(GLOB_RECURSE SRC_FILES ${AnnService}/src/Core/*.cpp ${AnnService}/src/Helper/*.cpp)

# Include Socket sources in core lib for PostingRouter
file(GLOB SOCKET_HDR_FILES ${AnnService}/inc/Socket/*.h)
file(GLOB SOCKET_SRC_FILES ${AnnService}/src/Socket/*.cpp)
list(APPEND HDR_FILES ${SOCKET_HDR_FILES})
list(APPEND SRC_FILES ${SOCKET_SRC_FILES})

set(SPDK_LIBRARIES "")
if (SPDK)
set(Spdk ${PROJECT_SOURCE_DIR}/ThirdParty/spdk/build)
Expand Down Expand Up @@ -73,7 +79,7 @@ endif()
add_library (SPTAGLib SHARED ${SRC_FILES} ${HDR_FILES} ${TiKV_PROTO_SOURCES})
target_link_libraries (SPTAGLib DistanceUtils ${RocksDB_LIBRARIES} ${uring_LIBRARIES} libzstd_shared ${NUMA_LIBRARY} ${TBB_LIBRARIES} ${SPDK_LIBRARIES} ${TiKV_LIBRARIES})
add_library (SPTAGLibStatic STATIC ${SRC_FILES} ${HDR_FILES} ${TiKV_PROTO_SOURCES})
target_link_libraries (SPTAGLibStatic DistanceUtils ${RocksDB_LIBRARIES} ${uring_LIBRARIES} libzstd_static ${NUMA_LIBRARY_STATIC} ${TBB_LIBRARIES} ${SPDK_LIBRARIES} ${TiKV_LIBRARIES})
target_link_libraries (SPTAGLibStatic DistanceUtils ${RocksDB_LIBRARIES} ${uring_LIBRARIES} libzstd_static ${NUMA_LIBRARY_STATIC} ${TBB_LIBRARIES} ${SPDK_LIBRARIES} ${TiKV_LIBRARIES} ${Boost_LIBRARIES})

if (MSVC)
# SPANNIndex.cpp can exceed COFF section limits in Debug without /bigobj.
Expand Down
21 changes: 13 additions & 8 deletions AnnService/inc/Core/Common/FineGrainedLock.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ namespace SPTAG
{
return idx;
}

// Bucket index for the internal mutex-sharded unordered_map of
// per-posting locks. Exposed for callers that need an array sized
// to BucketCount and indexed by the same granularity as the lock
// pool (e.g. ExtraDynamicSearcher::m_remoteBucketLocked).
static inline unsigned BucketIndex(SizeType idx)
{
unsigned key = static_cast<unsigned>(idx);
return ((unsigned)(key * 99991) + _rotl(key, 2) + 101) & BucketMask;
}

static const int BucketMask = 32767;
static const int BucketCount = BucketMask + 1;
private:
struct Bucket {
std::mutex mutex;
Expand All @@ -76,14 +89,6 @@ namespace SPTAG
return *iter->second;
}

static inline unsigned BucketIndex(SizeType idx)
{
unsigned key = static_cast<unsigned>(idx);
return ((unsigned)(key * 99991) + _rotl(key, 2) + 101) & BucketMask;
}

static const int BucketMask = 32767;
static const int BucketCount = BucketMask + 1;
mutable std::unique_ptr<Bucket[]> m_buckets;
};
}
Expand Down
36 changes: 36 additions & 0 deletions AnnService/inc/Core/Common/IVersionMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,30 @@ namespace SPTAG

virtual void DeleteAll() = 0;

/// One-time per-layer setup performed at the end of BuildIndex.
/// size total VID count for this layer (== m_opt->m_vectorSize)
/// blockSize/capacity hints for array-backed legacy maps; ignored
/// by hashmap / TiKV implementations
/// globalIDs (optional) set of GLOBAL VIDs that are alive on
/// this layer. Layers whose "default
/// version" semantics treat unknown VIDs as
/// DELETED (e.g. TiKV layer >0, hashmap
/// LocalVersionMap) MUST persist an
/// explicit alive byte for each globalID;
/// otherwise MergePostings'
/// Deleted()/version-mismatch filter
/// eats every base entry on the first
/// async merge and corrupts the head index.
/// Default impl: just bump the internal count via SetR.
virtual void Initialize(SizeType size, SizeType blockSize, SizeType capacity,
COMMON::Dataset<SizeType>* globalIDs = nullptr)
{
(void)blockSize;
(void)capacity;
(void)globalIDs;
SetR(size);
}

virtual SizeType Count() = 0;
virtual SizeType GetDeleteCount() = 0;
virtual std::uint64_t BufferSize() = 0;
Expand All @@ -45,6 +69,18 @@ namespace SPTAG
virtual bool TryGetDefaultVersionForNewVector(uint8_t& version) const { return false; }
virtual void SetR(SizeType num) {}
virtual void SetVersion(const SizeType& key, const uint8_t& version) = 0;

/// Batch SetVersion: apply (vids[i] -> versions[i]) for all i.
/// Default impl is a per-VID loop. TiKV-backed maps override this
/// to group writes by chunk so N records in the same chunk only
/// trigger 1 ReadChunk + 1 WriteChunk RPC pair
virtual void SetVersionBatch(const std::vector<SizeType>& vids, const std::vector<uint8_t>& versions)
{
size_t n = std::min(vids.size(), versions.size());
for (size_t i = 0; i < n; i++) {
SetVersion(vids[i], versions[i]);
}
}
/// Increment the version of a VID.
/// @param expectedOld If not 0xff, the caller asserts the current version should be this value.
/// If TiKV already holds (expectedOld+1)&0x7f, treat as success (another node did the same increment).
Expand Down
23 changes: 23 additions & 0 deletions AnnService/inc/Core/Common/LocalVersionMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,29 @@ namespace SPTAG
m_label.clear();
}

void Initialize(SizeType size, SizeType blockSize, SizeType capacity,
COMMON::Dataset<SizeType>* globalIDs = nullptr) override
{
(void)size;
(void)blockSize;
(void)capacity;
if (globalIDs == nullptr || globalIDs->R() <= 0) return;

// Hashmap LocalVersionMap treats missing keys as deleted
// (Deleted() returns true, GetVersion() returns 0xfe).
// Layer-1 build calls Initialize with the alive-head global
// IDs; we must explicitly mark them alive (0x00) so that
// MergePostings' Deleted()/version-mismatch filter does not
// strip every base head entry on the first async merge.
std::unique_lock<std::shared_timed_mutex> lock(m_updateMutex);
for (SizeType i = 0; i < globalIDs->R(); i++) {
SizeType globalID = *(globalIDs->At(i));
if (globalID >= 0) {
m_label[globalID] = 0x00;
}
}
}

SizeType Count() override {
std::shared_lock<std::shared_timed_mutex> lock(m_updateMutex);
return (SizeType)(m_label.size());
Expand Down
93 changes: 89 additions & 4 deletions AnnService/inc/Core/Common/TiKVVersionMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "IVersionMap.h"
#include "inc/Helper/KeyValueIO.h"
#include <algorithm>
#include <atomic>
#include <string>
#include <vector>
Expand Down Expand Up @@ -212,7 +213,7 @@ namespace SPTAG

std::shared_ptr<Helper::KeyValueIO> GetDB() const { return m_db; }

void Initialize(SizeType size, SizeType blockSize, SizeType capacity, COMMON::Dataset<SizeType>* globalIDs = nullptr)
void Initialize(SizeType size, SizeType blockSize, SizeType capacity, COMMON::Dataset<SizeType>* globalIDs = nullptr) override
{
(void)blockSize;
(void)capacity;
Expand All @@ -233,10 +234,44 @@ namespace SPTAG
m_deleted = size;
SaveMetadata();

// Batch the alive-marker writes via MultiPut so they
// can be grouped per TiKV region and issued in parallel.
// Serial PutByte was the build-time hotspot (~1-2ms
// per write × ~200K alive heads at 1M-vector scale).
std::vector<SizeType> aliveSorted;
aliveSorted.reserve(aliveIDs.size());
for (SizeType id : aliveIDs) aliveSorted.push_back(id);
std::sort(aliveSorted.begin(), aliveSorted.end());

SizeType written = 0;
for (SizeType globalID : aliveIDs) {
if (PutByte(VersionKey(globalID), 0x00) == ErrorCode::Success) {
written++;
constexpr size_t kBatchSize = 4096;
std::vector<std::string> keys;
std::vector<std::string> values;
keys.reserve(kBatchSize);
values.reserve(kBatchSize);
const std::string aliveByte(1, static_cast<char>(0x00));
for (size_t i = 0; i < aliveSorted.size(); i++) {
keys.push_back(VersionKey(aliveSorted[i]));
values.push_back(aliveByte);
if (keys.size() >= kBatchSize || i + 1 == aliveSorted.size()) {
auto ret = m_db->MultiPut(keys, values, MaxTimeout, nullptr);
if (ret == ErrorCode::Success) {
written += static_cast<SizeType>(keys.size());
} else if (ret == ErrorCode::Undefined) {
// Backend lacks MultiPut: fall back to serial PutByte.
for (const auto& k : keys) {
if (PutByte(k, 0x00) == ErrorCode::Success) written++;
}
} else {
SPTAGLIB_LOG(Helper::LogLevel::LL_Warning,
"TiKVVersionMap::Initialize: MultiPut batch failed layer=%d ret=%d size=%zu; falling back to serial PutByte for this batch.\n",
m_layer, static_cast<int>(ret), keys.size());
for (const auto& k : keys) {
if (PutByte(k, 0x00) == ErrorCode::Success) written++;
}
}
keys.clear();
values.clear();
}
}
m_deleted = size - written;
Expand Down Expand Up @@ -335,6 +370,56 @@ namespace SPTAG
UpdateDeleteCount(oldVal, storedVersion);
}

// Per-VID batch write: mirrors SetVersion() for each (vid, ver) pair.
// Uses TiKVIO MultiPut so the writes are grouped per TiKV region
// and issued in parallel. m_deleted accounting is approximate
// here (we do not read the old byte to compute the exact delta);
// GetDeleteCount() returns 0 for the TiKV-backed version map so
// this approximation is acceptable. Callers that need precise
// accounting can call SetVersion() per-VID instead.
void SetVersionBatch(const std::vector<SizeType>& vids, const std::vector<uint8_t>& versions) override
{
size_t n = std::min(vids.size(), versions.size());
if (n == 0) return;

SizeType count = m_count.load();
std::vector<std::string> keys;
std::vector<std::string> values;
keys.reserve(n);
values.reserve(n);
for (size_t i = 0; i < n; ++i) {
if (vids[i] < 0 || vids[i] >= count) {
SPTAGLIB_LOG(Helper::LogLevel::LL_Error,
"TiKVVersionMap::SetVersionBatch: invalid key %d (max %d)\n",
vids[i], count);
continue;
}
keys.push_back(VersionKey(vids[i]));
values.push_back(std::string(1, static_cast<char>(versions[i])));
}
if (keys.empty()) return;

auto ret = m_db->MultiPut(keys, values, MaxTimeout, nullptr);
if (ret == ErrorCode::Undefined) {
// Backend lacks MultiPut: fall back to serial SetVersion
// which preserves m_deleted accounting.
for (size_t i = 0; i < n; ++i) {
if (vids[i] >= 0 && vids[i] < count) {
SetVersion(vids[i], versions[i]);
}
}
} else if (ret != ErrorCode::Success) {
SPTAGLIB_LOG(Helper::LogLevel::LL_Warning,
"TiKVVersionMap::SetVersionBatch: MultiPut failed layer=%d ret=%d keys=%zu; falling back to per-VID SetVersion.\n",
m_layer, static_cast<int>(ret), keys.size());
for (size_t i = 0; i < n; ++i) {
if (vids[i] >= 0 && vids[i] < count) {
SetVersion(vids[i], versions[i]);
}
}
}
}

bool IncVersion(const SizeType& key, uint8_t* newVersion, uint8_t expectedOld = 0xff) override
{
if (key < 0 || key >= m_count.load()) {
Expand Down
Loading