@@ -82,6 +82,12 @@ struct put_blob_req_ctx : public repl_result_ctx< BlobManager::Result< HSHomeObj
8282};
8383
8484BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob (ShardInfo const & shard, Blob&& blob) {
85+ if (is_shutting_down ()) {
86+ LOGI (" service is being shut down" );
87+ return folly::makeUnexpected (BlobErrorCode::SHUTTING_DOWN);
88+ }
89+ incr_pending_request_num ();
90+
8591 auto & pg_id = shard.placement_group ;
8692 shared< homestore::ReplDev > repl_dev;
8793 blob_id_t new_blob_id;
@@ -101,11 +107,13 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
101107
102108 if (!repl_dev->is_leader ()) {
103109 LOGW (" failed to put blob for pg [{}], shard [{}], not leader" , pg_id, shard.id );
110+ decr_pending_request_num ();
104111 return folly::makeUnexpected (BlobError (BlobErrorCode::NOT_LEADER, repl_dev->get_leader_id ()));
105112 }
106113
107114 if (!repl_dev->is_ready_for_traffic ()) {
108115 LOGW (" failed to put blob for pg [{}], shard [{}], not ready for traffic" , pg_id, shard.id );
116+ decr_pending_request_num ();
109117 return folly::makeUnexpected (BlobError (BlobErrorCode::RETRY_REQUEST));
110118 }
111119
@@ -176,10 +184,12 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
176184 if (result.hasError ()) {
177185 auto err = result.error ();
178186 if (err.getCode () == BlobErrorCode::NOT_LEADER) { err.current_leader = repl_dev->get_leader_id (); }
187+ decr_pending_request_num ();
179188 return folly::makeUnexpected (err);
180189 }
181190 auto blob_info = result.value ();
182191 BLOGT (blob_info.shard_id , blob_info.blob_id , " Put blob success blkid=[{}]" , blob_info.pbas .to_string ());
192+ decr_pending_request_num ();
183193 return blob_info.blob_id ;
184194 });
185195}
@@ -258,6 +268,12 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis
258268
259269BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob (ShardInfo const & shard, blob_id_t blob_id, uint64_t req_offset,
260270 uint64_t req_len) const {
271+ if (is_shutting_down ()) {
272+ LOGI (" service is being shut down" );
273+ return folly::makeUnexpected (BlobErrorCode::SHUTTING_DOWN);
274+ }
275+ incr_pending_request_num ();
276+
261277 auto & pg_id = shard.placement_group ;
262278 auto hs_pg = get_hs_pg (pg_id);
263279 RELEASE_ASSERT (hs_pg, " PG not found" );
@@ -269,12 +285,14 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard,
269285
270286 if (!repl_dev->is_ready_for_traffic ()) {
271287 LOGW (" failed to get blob for pg [{}], shard [{}], not ready for traffic" , pg_id, shard.id );
288+ decr_pending_request_num ();
272289 return folly::makeUnexpected (BlobError (BlobErrorCode::RETRY_REQUEST));
273290 }
274291
275292 auto r = get_blob_from_index_table (index_table, shard.id , blob_id);
276293 if (!r) {
277294 BLOGE (shard.id , blob_id, " Blob not found in index during get blob" );
295+ decr_pending_request_num ();
278296 return folly::makeUnexpected (r.error ());
279297 }
280298
@@ -298,17 +316,20 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< home
298316 read_buf = std::move (read_buf)](auto && result) mutable -> BlobManager::AsyncResult< Blob > {
299317 if (result) {
300318 BLOGE (shard_id, blob_id, " Failed to get blob: err={}" , blob_id, shard_id, result.value ());
319+ decr_pending_request_num ();
301320 return folly::makeUnexpected (BlobError (BlobErrorCode::READ_FAILED));
302321 }
303322
304323 BlobHeader const * header = r_cast< BlobHeader const * >(read_buf.cbytes ());
305324 if (!header->valid ()) {
306325 BLOGE (shard_id, blob_id, " Invalid header found: [header={}]" , header->to_string ());
326+ decr_pending_request_num ();
307327 return folly::makeUnexpected (BlobError (BlobErrorCode::READ_FAILED));
308328 }
309329
310330 if (header->shard_id != shard_id) {
311331 BLOGE (shard_id, blob_id, " Invalid shard_id in header: [header={}]" , header->to_string ());
332+ decr_pending_request_num ();
312333 return folly::makeUnexpected (BlobError (BlobErrorCode::READ_FAILED));
313334 }
314335
@@ -325,12 +346,14 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< home
325346 if (std::memcmp (computed_hash, header->hash , BlobHeader::blob_max_hash_len) != 0 ) {
326347 BLOGE (shard_id, blob_id, " Hash mismatch header [{}] [computed={:np}]" , header->to_string (),
327348 spdlog::to_hex (computed_hash, computed_hash + BlobHeader::blob_max_hash_len));
349+ decr_pending_request_num ();
328350 return folly::makeUnexpected (BlobError (BlobErrorCode::CHECKSUM_MISMATCH));
329351 }
330352
331353 if (req_offset + req_len > header->blob_size ) {
332354 BLOGE (shard_id, blob_id, " Invalid offset length requested in get blob offset={} len={} size={}" ,
333355 req_offset, req_len, header->blob_size );
356+ decr_pending_request_num ();
334357 return folly::makeUnexpected (BlobError (BlobErrorCode::INVALID_ARG));
335358 }
336359
@@ -341,6 +364,7 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< home
341364 std::memcpy (body.bytes (), blob_bytes + req_offset, res_len);
342365
343366 BLOGT (blob_id, shard_id, " Blob get success: blkid={}" , blkid.to_string ());
367+ decr_pending_request_num ();
344368 return Blob (std::move (body), std::move (user_key), header->object_offset , repl_dev->get_leader_id ());
345369 });
346370}
@@ -362,6 +386,7 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
362386 auto hs_pg = get_hs_pg (msg_header->pg_id );
363387 if (hs_pg == nullptr ) {
364388 LOGW (" Received a blob_put on an unknown pg:{}, underlying engine will retry this later" , msg_header->pg_id );
389+ if (ctx) { ctx->promise_ .setValue (folly::makeUnexpected (BlobError (BlobErrorCode::UNKNOWN_PG))); }
365390 return folly::makeUnexpected (homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
366391 }
367392
@@ -370,6 +395,7 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
370395 if (shard_iter == _shard_map.end ()) {
371396 LOGW (" Received a blob_put on an unknown shard:{}, underlying engine will retry this later" ,
372397 msg_header->shard_id );
398+ if (ctx) { ctx->promise_ .setValue (folly::makeUnexpected (BlobError (BlobErrorCode::UNKNOWN_SHARD))); }
373399 return folly::makeUnexpected (homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
374400 }
375401
@@ -392,6 +418,12 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
392418}
393419
394420BlobManager::NullAsyncResult HSHomeObject::_del_blob (ShardInfo const & shard, blob_id_t blob_id) {
421+ if (is_shutting_down ()) {
422+ LOGI (" service is being shut down" );
423+ return folly::makeUnexpected (BlobErrorCode::SHUTTING_DOWN);
424+ }
425+ incr_pending_request_num ();
426+
395427 BLOGT (shard.id , blob_id, " deleting blob" );
396428 auto & pg_id = shard.placement_group ;
397429 auto hs_pg = get_hs_pg (pg_id);
@@ -402,11 +434,13 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo
402434
403435 if (!repl_dev->is_leader ()) {
404436 LOGW (" failed to del blob for pg [{}], shard [{}], blob_id [{}], not leader" , pg_id, shard.id , blob_id);
437+ decr_pending_request_num ();
405438 return folly::makeUnexpected (BlobError (BlobErrorCode::NOT_LEADER, repl_dev->get_leader_id ()));
406439 }
407440
408441 if (!repl_dev->is_ready_for_traffic ()) {
409442 LOGW (" failed to del blob for pg [{}], shard [{}], not ready for traffic" , pg_id, shard.id );
443+ decr_pending_request_num ();
410444 return folly::makeUnexpected (BlobError (BlobErrorCode::RETRY_REQUEST));
411445 }
412446
@@ -424,14 +458,16 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo
424458 std::memcpy (req->key_buf ().bytes (), &blob_id, sizeof (blob_id_t ));
425459
426460 repl_dev->async_alloc_write (req->cheader_buf (), req->ckey_buf (), sisl::sg_list{}, req);
427- return req->result ().deferValue ([repl_dev](const auto & result) -> folly::Expected< folly::Unit, BlobError > {
461+ return req->result ().deferValue ([this , repl_dev](const auto & result) -> folly::Expected< folly::Unit, BlobError > {
428462 if (result.hasError ()) {
429463 auto err = result.error ();
430464 if (err.getCode () == BlobErrorCode::NOT_LEADER) { err.current_leader = repl_dev->get_leader_id (); }
465+ decr_pending_request_num ();
431466 return folly::makeUnexpected (err);
432467 }
433468 auto blob_info = result.value ();
434469 BLOGT (blob_info.shard_id , blob_info.blob_id , " Delete blob successful" );
470+ decr_pending_request_num ();
435471 return folly::Unit ();
436472 });
437473}
@@ -470,7 +506,7 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis
470506 if (ctx) ctx->promise_ .setValue (folly::makeUnexpected (r.error ()));
471507 return ;
472508 } else {
473- if (ctx) { ctx->promise_ .setValue (BlobManager::Result< BlobInfo >(blob_info)); }
509+ if (ctx) ctx->promise_ .setValue (BlobManager::Result< BlobInfo >(blob_info));
474510 return ;
475511 }
476512 }
0 commit comments