Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1549,12 +1549,18 @@ ulong raft_server::get_expected_committed_log_idx() {
std::greater<ulong>() );

size_t quorum_idx = get_quorum_for_commit();
if (ctx_->get_params()->use_full_consensus_among_healthy_members_) {
ptr<raft_params> params = ctx_->get_params();
ptr<raft_params> params = ctx_->get_params();

if (ctx_->get_params()->use_full_consensus_among_healthy_members_ &&
params->custom_commit_quorum_size_ == 0) {
// In full consensus mode, a peer is considered unhealthy when
// 1) it is not responding for 3 times of heartbeat interval, or
// 2) its last log index is smaller (older) than
// the current committed log index - max batch size.
//
// WARNING: If custom quorum size is set, we should prioritize
// the custom quorum size over full consensus mode.

int32_t allowed_interval =
params->heart_beat_interval_ *
raft_server::raft_limits_.full_consensus_leader_limit_;;
Expand Down
18 changes: 17 additions & 1 deletion src/handle_commit.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,9 @@ uint64_t raft_server::find_sm_commit_idx_to_notify() {
? quick_commit_index_ - params->max_append_size_ : 0;

uint64_t min_commit_idx = sm_commit_index_;
std::vector<uint64_t> commit_idx_list;
commit_idx_list.reserve(16);
commit_idx_list.push_back(sm_commit_index_);

for (auto& pp: peers_) {
uint64_t last_resp_time_ms = pp.second->get_resp_timer_us() / 1000;
Expand All @@ -595,8 +598,21 @@ uint64_t raft_server::find_sm_commit_idx_to_notify() {
if (pp.second->get_sm_committed_idx() == 0) {
continue;
}
min_commit_idx = std::min(min_commit_idx, pp.second->get_sm_committed_idx());
commit_idx_list.push_back(pp.second->get_sm_committed_idx());
}
// Sort it (decending order).
std::sort(commit_idx_list.begin(), commit_idx_list.end(),
std::greater<uint64_t>());

// Pick last one (== minimum).
size_t idx_to_pick = commit_idx_list.size() - 1;
if (params->custom_commit_quorum_size_ &&
params->custom_commit_quorum_size_ < (int)commit_idx_list.size()) {
// If custom quorum size is set, pick the index based on it.
idx_to_pick = params->custom_commit_quorum_size_ - 1;
}
min_commit_idx = commit_idx_list[idx_to_pick];

return min_commit_idx;
}

Expand Down
143 changes: 143 additions & 0 deletions tests/asio/custom_quorum_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,146 @@ int custom_commit_condition_test() {
return 0;
}

int full_consensus_with_snapshot_transfer_test() {
reset_log_files();

std::string s1_addr = "tcp://127.0.0.1:20010";
std::string s2_addr = "tcp://127.0.0.1:20020";
std::string s3_addr = "tcp://127.0.0.1:20030";

RaftAsioPkg s1(1, s1_addr);
RaftAsioPkg s2(2, s2_addr);
RaftAsioPkg s3(3, s3_addr);
std::vector<RaftAsioPkg*> pkgs = {&s1, &s2, &s3};

_msg("launching asio-raft servers\n");
CHK_Z( launch_servers(pkgs, false) );

_msg("organizing raft group\n");
CHK_Z( make_group(pkgs) );

// Set async, full consensus mode, track peers' SM, snapshot distance 10.
for (auto& entry: pkgs) {
RaftAsioPkg* pp = entry;
raft_params param = pp->raftServer->get_current_params();
param.return_method_ = raft_params::async_handler;
param.use_full_consensus_among_healthy_members_ = true;
param.track_peers_sm_commit_idx_ = true;
param.snapshot_distance_ = 10;
param.reserved_log_items_ = 5;
pp->raftServer->update_params(param);
}

// Append messages asynchronously.
std::list< ptr< cmd_result< ptr<buffer> > > > handlers;
std::list<ulong> idx_list;
std::mutex idx_list_lock;
auto do_async_append = [&](RaftAsioPkg& ss, size_t num) {
handlers.clear();
idx_list.clear();
for (size_t ii = 0; ii < num; ++ii) {
std::string test_msg = "test" + std::to_string(ii);
ptr<buffer> msg = buffer::alloc(test_msg.size() + 1);
msg->put(test_msg);
ptr< cmd_result< ptr<buffer> > > ret =
ss.raftServer->append_entries( {msg} );

cmd_result< ptr<buffer> >::handler_type my_handler =
std::bind( async_handler,
&idx_list,
&idx_list_lock,
std::placeholders::_1,
std::placeholders::_2 );
ret->when_ready( my_handler );

handlers.push_back(ret);
}
};

// Append 5 messages.
do_async_append(s1, 5);
TestSuite::sleep_ms(RaftAsioPkg::HEARTBEAT_MS * 2, "wait for replication of 5 msgs");

// Bring down S3.
s3.raftServer->shutdown();
s3.stopAsio();

// Wait enough to mark down S3.
TestSuite::sleep_ms(RaftAsioPkg::HEARTBEAT_MS * 5, "wait to mark down S3");

// Append 20 more messages (S3 is down).
do_async_append(s1, 20);
TestSuite::sleep_ms(RaftAsioPkg::HEARTBEAT_MS * 2, "wait for replication of 20 msgs");

uint64_t commit_before_restart = s1.raftServer->get_committed_log_idx();
_msg("commit index before restart: %zu\n", commit_before_restart);

// Bring down both S1 and S2.
s1.raftServer->shutdown();
s1.stopAsio();
s2.raftServer->shutdown();
s2.stopAsio();
TestSuite::sleep_sec(1, "all servers down");

// Restart S1 and S2 with the same params.
raft_params restart_params;
restart_params.with_hb_interval(RaftAsioPkg::HEARTBEAT_MS);
restart_params.with_election_timeout_lower(RaftAsioPkg::HEARTBEAT_MS * 2);
restart_params.with_election_timeout_upper(RaftAsioPkg::HEARTBEAT_MS * 4);
restart_params.with_reserved_log_items(5);
restart_params.with_snapshot_enabled(10);
restart_params.with_client_req_timeout(10000);
restart_params.return_method_ = raft_params::async_handler;
restart_params.use_full_consensus_among_healthy_members_ = true;
restart_params.track_peers_sm_commit_idx_ = true;

// Set snapshot delay on S3 (1000ms per snapshot chunk).
s3.getTestSm()->setSnpDelay(1000);

s1.restartServer(&restart_params);
s2.restartServer(&restart_params);

// Bring up S3. Snapshot install starts since S3 is far behind.
s3.restartServer(&restart_params);

TestSuite::sleep_sec(2, "wait for leader election among S1 and S2");

// One of them should be the leader.
RaftAsioPkg* leader = nullptr;
if (s1.raftServer->is_leader()) {
leader = &s1;
} else if (s2.raftServer->is_leader()) {
leader = &s2;
}
CHK_NONNULL(leader);
_msg("leader: S%d\n", leader->myId);


TestSuite::sleep_ms(RaftAsioPkg::HEARTBEAT_MS * 2,
"wait for snapshot install to start");

// Before snapshot install is done, append one more message.
uint64_t commit_idx = leader->raftServer->get_committed_log_idx();
do_async_append(*leader, 1);
TestSuite::sleep_ms(RaftAsioPkg::HEARTBEAT_MS * 2, "wait for commit");

// It should be committed immediately (S1 + S2 form the quorum,
// S3 is still installing snapshot so it should be excluded).
CHK_GT(leader->raftServer->get_committed_log_idx(), commit_idx);

// Clear snapshot delay and wait for S3 to catch up.
s3.getTestSm()->setSnpDelay(0);
TestSuite::sleep_sec(2, "wait for S3 to catch up");

s1.raftServer->shutdown();
s2.raftServer->shutdown();
s3.raftServer->shutdown();
TestSuite::sleep_sec(1, "shutting down");

SimpleLogger::shutdown();
return 0;
}

} // namespace custom_quorum_test;
using namespace custom_quorum_test;

Expand All @@ -1334,6 +1474,9 @@ int main(int argc, char** argv) {
ts.doTest( "full consensus test",
full_consensus_test );

ts.doTest( "full consensus with snapshot transfer test",
full_consensus_with_snapshot_transfer_test );

ts.doTest( "self mark down test",
self_mark_down_test );

Expand Down
Loading