Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.2.12"
version = "2.2.13"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
6 changes: 4 additions & 2 deletions src/include/homeobject/blob_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
namespace homeobject {

ENUM(BlobErrorCode, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, UNSUPPORTED_OP, NOT_LEADER, REPLICATION_ERROR,
UNKNOWN_SHARD, UNKNOWN_BLOB, CHECKSUM_MISMATCH, READ_FAILED, INDEX_ERROR, SEALED_SHARD, RETRY_REQUEST);
UNKNOWN_SHARD, UNKNOWN_BLOB, UNKNOWN_PG, CHECKSUM_MISMATCH, READ_FAILED, INDEX_ERROR, SEALED_SHARD, RETRY_REQUEST,
SHUTTING_DOWN);
struct BlobError {
BlobErrorCode code;
// set when we are not the current leader of the PG.
Expand All @@ -27,7 +28,8 @@ struct BlobError {
struct Blob {
Blob() = default;
Blob(sisl::io_blob_safe b, std::string const& u, uint64_t o) : body(std::move(b)), user_key(u), object_off(o) {}
Blob(sisl::io_blob_safe b, std::string const& u, uint64_t o, peer_id_t l) : body(std::move(b)), user_key(u), object_off(o), current_leader(l) {}
Blob(sisl::io_blob_safe b, std::string const& u, uint64_t o, peer_id_t l) :
body(std::move(b)), user_key(u), object_off(o), current_leader(l) {}

Blob clone() const;

Expand Down
2 changes: 1 addition & 1 deletion src/include/homeobject/pg_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
namespace homeobject {

ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, NOT_LEADER, UNKNOWN_PEER, UNSUPPORTED_OP,
CRC_MISMATCH, NO_SPACE_LEFT, DRIVE_WRITE_ERROR, RETRY_REQUEST);
CRC_MISMATCH, NO_SPACE_LEFT, DRIVE_WRITE_ERROR, RETRY_REQUEST, SHUTTING_DOWN);

struct PGMember {
// Max length is based on homestore::replica_member_info::max_name_len - 1. Last byte is null terminated.
Expand Down
4 changes: 2 additions & 2 deletions src/include/homeobject/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
namespace homeobject {

ENUM(ShardError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNSUPPORTED_OP, UNKNOWN_PG, UNKNOWN_SHARD,
PG_NOT_READY, CRC_MISMATCH, NO_SPACE_LEFT, RETRY_REQUEST);
PG_NOT_READY, CRC_MISMATCH, NO_SPACE_LEFT, RETRY_REQUEST, SHUTTING_DOWN);

struct ShardInfo {
enum class State : uint8_t {
Expand All @@ -22,7 +22,7 @@ struct ShardInfo {
shard_id_t id;
pg_id_t placement_group;
State state;
uint64_t lsn; // created_lsn
uint64_t lsn; // created_lsn
uint64_t created_time;
uint64_t last_modified_time;
uint64_t available_capacity_bytes;
Expand Down
40 changes: 38 additions & 2 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ struct put_blob_req_ctx : public repl_result_ctx< BlobManager::Result< HSHomeObj
};

BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& shard, Blob&& blob) {
if (is_shutting_down()) {
LOGI("service is being shut down");
return folly::makeUnexpected(BlobErrorCode::SHUTTING_DOWN);
}
incr_pending_request_num();

auto& pg_id = shard.placement_group;
shared< homestore::ReplDev > repl_dev;
blob_id_t new_blob_id;
Expand All @@ -101,11 +107,13 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s

if (!repl_dev->is_leader()) {
LOGW("failed to put blob for pg [{}], shard [{}], not leader", pg_id, shard.id);
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::NOT_LEADER, repl_dev->get_leader_id()));
}

if (!repl_dev->is_ready_for_traffic()) {
LOGW("failed to put blob for pg [{}], shard [{}], not ready for traffic", pg_id, shard.id);
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::RETRY_REQUEST));
}

Expand Down Expand Up @@ -176,10 +184,12 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
if (result.hasError()) {
auto err = result.error();
if (err.getCode() == BlobErrorCode::NOT_LEADER) { err.current_leader = repl_dev->get_leader_id(); }
decr_pending_request_num();
return folly::makeUnexpected(err);
}
auto blob_info = result.value();
BLOGT(blob_info.shard_id, blob_info.blob_id, "Put blob success blkid=[{}]", blob_info.pbas.to_string());
decr_pending_request_num();
return blob_info.blob_id;
});
}
Expand Down Expand Up @@ -258,6 +268,12 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis

BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, blob_id_t blob_id, uint64_t req_offset,
uint64_t req_len) const {
if (is_shutting_down()) {
LOGI("service is being shut down");
return folly::makeUnexpected(BlobErrorCode::SHUTTING_DOWN);
}
incr_pending_request_num();

auto& pg_id = shard.placement_group;
auto hs_pg = get_hs_pg(pg_id);
RELEASE_ASSERT(hs_pg, "PG not found");
Expand All @@ -269,12 +285,14 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard,

if (!repl_dev->is_ready_for_traffic()) {
LOGW("failed to get blob for pg [{}], shard [{}], not ready for traffic", pg_id, shard.id);
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::RETRY_REQUEST));
}

auto r = get_blob_from_index_table(index_table, shard.id, blob_id);
if (!r) {
BLOGE(shard.id, blob_id, "Blob not found in index during get blob");
decr_pending_request_num();
return folly::makeUnexpected(r.error());
}

Expand All @@ -298,17 +316,20 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< home
read_buf = std::move(read_buf)](auto&& result) mutable -> BlobManager::AsyncResult< Blob > {
if (result) {
BLOGE(shard_id, blob_id, "Failed to get blob: err={}", blob_id, shard_id, result.value());
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED));
}

BlobHeader const* header = r_cast< BlobHeader const* >(read_buf.cbytes());
if (!header->valid()) {
BLOGE(shard_id, blob_id, "Invalid header found: [header={}]", header->to_string());
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED));
}

if (header->shard_id != shard_id) {
BLOGE(shard_id, blob_id, "Invalid shard_id in header: [header={}]", header->to_string());
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED));
}

Expand All @@ -325,12 +346,14 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< home
if (std::memcmp(computed_hash, header->hash, BlobHeader::blob_max_hash_len) != 0) {
BLOGE(shard_id, blob_id, "Hash mismatch header [{}] [computed={:np}]", header->to_string(),
spdlog::to_hex(computed_hash, computed_hash + BlobHeader::blob_max_hash_len));
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::CHECKSUM_MISMATCH));
}

if (req_offset + req_len > header->blob_size) {
BLOGE(shard_id, blob_id, "Invalid offset length requested in get blob offset={} len={} size={}",
req_offset, req_len, header->blob_size);
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::INVALID_ARG));
}

Expand All @@ -341,6 +364,7 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< home
std::memcpy(body.bytes(), blob_bytes + req_offset, res_len);

BLOGT(blob_id, shard_id, "Blob get success: blkid={}", blkid.to_string());
decr_pending_request_num();
return Blob(std::move(body), std::move(user_key), header->object_offset, repl_dev->get_leader_id());
});
}
Expand All @@ -362,6 +386,7 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
auto hs_pg = get_hs_pg(msg_header->pg_id);
if (hs_pg == nullptr) {
LOGW("Received a blob_put on an unknown pg:{}, underlying engine will retry this later", msg_header->pg_id);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::UNKNOWN_PG))); }
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

Expand All @@ -370,6 +395,7 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
if (shard_iter == _shard_map.end()) {
LOGW("Received a blob_put on an unknown shard:{}, underlying engine will retry this later",
msg_header->shard_id);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::UNKNOWN_SHARD))); }
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

Expand All @@ -392,6 +418,12 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
}

BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blob_id_t blob_id) {
if (is_shutting_down()) {
LOGI("service is being shut down");
return folly::makeUnexpected(BlobErrorCode::SHUTTING_DOWN);
}
incr_pending_request_num();

BLOGT(shard.id, blob_id, "deleting blob");
auto& pg_id = shard.placement_group;
auto hs_pg = get_hs_pg(pg_id);
Expand All @@ -402,11 +434,13 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo

if (!repl_dev->is_leader()) {
LOGW("failed to del blob for pg [{}], shard [{}], blob_id [{}], not leader", pg_id, shard.id, blob_id);
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::NOT_LEADER, repl_dev->get_leader_id()));
}

if (!repl_dev->is_ready_for_traffic()) {
LOGW("failed to del blob for pg [{}], shard [{}], not ready for traffic", pg_id, shard.id);
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::RETRY_REQUEST));
}

Expand All @@ -424,14 +458,16 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo
std::memcpy(req->key_buf().bytes(), &blob_id, sizeof(blob_id_t));

repl_dev->async_alloc_write(req->cheader_buf(), req->ckey_buf(), sisl::sg_list{}, req);
return req->result().deferValue([repl_dev](const auto& result) -> folly::Expected< folly::Unit, BlobError > {
return req->result().deferValue([this, repl_dev](const auto& result) -> folly::Expected< folly::Unit, BlobError > {
if (result.hasError()) {
auto err = result.error();
if (err.getCode() == BlobErrorCode::NOT_LEADER) { err.current_leader = repl_dev->get_leader_id(); }
decr_pending_request_num();
return folly::makeUnexpected(err);
}
auto blob_info = result.value();
BLOGT(blob_info.shard_id, blob_info.blob_id, "Delete blob successful");
decr_pending_request_num();
return folly::Unit();
});
}
Expand Down Expand Up @@ -470,7 +506,7 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis
if (ctx) ctx->promise_.setValue(folly::makeUnexpected(r.error()));
return;
} else {
if (ctx) { ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); }
if (ctx) ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info));
return;
}
}
Expand Down
19 changes: 15 additions & 4 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ class HSReplApplication : public homestore::ReplApplication {
return it->second;
}

void on_repl_devs_init_completed() override {
_home_object->on_replica_restart();
}
void on_repl_devs_init_completed() override { _home_object->on_replica_restart(); }

std::pair< std::string, uint16_t > lookup_peer(homestore::replica_id_t uuid) const override {
std::string endpoint;
Expand Down Expand Up @@ -320,9 +318,22 @@ HSHomeObject::~HSHomeObject() {
}
trigger_timed_events();
#endif

start_shutting_down();
// Wait for all pending requests to complete
while (true) {
auto pending_reqs = get_pending_request_num();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

baseline resync be considered as HS or HO?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be considered as HS.

Homeobject side graceful shutdown will only make sure that when shutting down homeobject , there is no pending external (or user api) calls from upper layer and reject later api calls .

baseline resync is a mechanism of statemachin(raft), and all the baseline resync related code in homeobject is called by statemachine. homeobject can not do anything for this. so , we need to handle this in raft level (homestore).

i will try to submit another pr for homestore graceful shutdown.

if (0 == pending_reqs) break;
LOGI("waiting for {} pending requests to complete", pending_reqs);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
LOGI("start shutting down HomeStore");

homestore::HomeStore::instance()->shutdown();
homestore::HomeStore::reset_instance();
iomanager.stop();

LOGI("complete shutting down HomeStore");
}

HomeObjectStats HSHomeObject::_get_stats() const {
Expand All @@ -340,7 +351,7 @@ HomeObjectStats HSHomeObject::_get_stats() const {

stats.num_open_shards = num_open_shards;
stats.avail_open_shards = chunk_selector()->total_chunks() - num_open_shards;
stats.num_disks = chunk_selector()->total_disks();
stats.num_disks = chunk_selector()->total_disks();
return stats;
}

Expand Down
24 changes: 20 additions & 4 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,8 @@ class HSHomeObject : public HomeObjectImpl {
bool update_cursor(objId id);
objId expected_next_obj_id();
bool generate_shard_blob_list();
BlobManager::AsyncResult< sisl::io_blob_safe > load_blob_data(const BlobInfo& blob_info, ResyncBlobState& state);
BlobManager::AsyncResult< sisl::io_blob_safe > load_blob_data(const BlobInfo& blob_info,
ResyncBlobState& state);
bool create_pg_snapshot_data(sisl::io_blob_safe& meta_blob);
bool create_shard_snapshot_data(sisl::io_blob_safe& meta_blob);
bool create_blobs_snapshot_data(sisl::io_blob_safe& data_blob);
Expand All @@ -409,7 +410,7 @@ class HSHomeObject : public HomeObjectImpl {

objId cur_obj_id_{0, 0};
int64_t cur_shard_idx_{-1};
std::vector<BlobInfo> cur_blob_list_{0};
std::vector< BlobInfo > cur_blob_list_{0};
uint64_t cur_start_blob_idx_{0};
uint64_t cur_batch_blob_count_{0};
flatbuffers::FlatBufferBuilder builder_;
Expand Down Expand Up @@ -677,7 +678,7 @@ class HSHomeObject : public HomeObjectImpl {
const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx);
void on_blob_del_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
bool local_add_blob_info(pg_id_t pg_id, BlobInfo const &blob_info);
bool local_add_blob_info(pg_id_t pg_id, BlobInfo const& blob_info);
homestore::ReplResult< homestore::blk_alloc_hints >
blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< homestore::repl_req_ctx >& ctx);
void compute_blob_payload_hash(BlobHeader::HashAlgorithm algorithm, const uint8_t* blob_bytes, size_t blob_size,
Expand Down Expand Up @@ -746,13 +747,28 @@ class HSHomeObject : public HomeObjectImpl {
*/
void destroy_pg_index_table(pg_id_t pg_id);

/**
/**
* @brief Destroy the superblock for the PG identified by pg_id.
* Ensures all operations are persisted by triggering a cp flush before destruction.
*
* @param pg_id The ID of the PG to be destroyed.
*/

void destroy_pg_superblk(pg_id_t pg_id);

// graceful shutdown related
private:
std::atomic_bool shutting_down{false};
mutable std::atomic_uint64_t pending_request_num{0};

bool is_shutting_down() const { return shutting_down.load(); }
void start_shutting_down() { shutting_down = true; }

uint64_t get_pending_request_num() const { return pending_request_num.load(); }

// only leader will call incr and decr pending request num
void incr_pending_request_num() const { pending_request_num++; }
void decr_pending_request_num() const { pending_request_num--; }
};

class BlobIndexServiceCallbacks : public homestore::IndexServiceCallbacks {
Expand Down
Loading
Loading