Skip to content

Commit ee157d3

Browse files
committed
MB-64256: Add dynamic config for fusionSyncRateLimit
Change-Id: I01c895bca0260b3a8bfffe306c3199d95202f416 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/224401 Reviewed-by: Paolo Cocchi <paolo.cocchi@couchbase.com> Reviewed-by: Trond Norbye <trond.norbye@couchbase.com> Tested-by: Build Bot <build@couchbase.com>
1 parent 3504851 commit ee157d3

File tree

12 files changed

+110
-2
lines changed

12 files changed

+110
-2
lines changed

engines/ep/configuration.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1675,6 +1675,17 @@
16751675
}
16761676
}
16771677
},
1678+
"magma_fusion_sync_rate_limit": {
1679+
"default": "(1024 * 1024 * 75)",
1680+
"descr": "The rate limit for Fusion sync uploads, in bytes per second.",
1681+
"dynamic": true,
1682+
"type": "size_t",
1683+
"validator": {
1684+
"range": {
1685+
"min": 1
1686+
}
1687+
}
1688+
},
16781689
"continuous_backup_enabled": {
16791690
"default": "false",
16801691
"descr": "True if continouous backup is enabled.",

engines/ep/management/cbepctl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ Available params for "set":
299299
history_retention_bytes - Max bytes of history a bucket should aim to retain on disk.
300300
fusion_metadata_auth_token - Authorization token for the Chronical service
301301
magma_fusion_migration_rate_limit - The rate limit for Fusion extent migration, in bytes per second.
302+
magma_fusion_sync_rate_limit - The rate limit for Fusion sync uploads, in bytes per second.
302303
303304
Available params for "set replication_param":
304305
<empty>

engines/ep/src/ep_engine.cc

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -966,10 +966,10 @@ cb::engine_errc EventuallyPersistentEngine::setFlushParam(
966966
configuration.setContinuousBackupEnabled(cb_stob(val));
967967
} else if (key == "continuous_backup_interval") {
968968
configuration.setContinuousBackupInterval(std::stoull(val));
969-
} else if (key == "magma_fusion_migration_rate_limit") {
970-
configuration.setMagmaFusionMigrationRateLimit(std::stoull(val));
971969
} else if (key == "fusion_metadata_auth_token") {
972970
setFusionMetadataAuthToken(val);
971+
} else if (key.starts_with("magma_fusion")) {
972+
return setFusionFlushParam(key, val, msg);
973973
} else if (key == "workload_monitor_enabled") {
974974
configuration.setWorkloadMonitorEnabled(cb_stob(val));
975975
} else {
@@ -1006,6 +1006,23 @@ cb::engine_errc EventuallyPersistentEngine::setFlushParam(
10061006
return rv;
10071007
}
10081008

1009+
cb::engine_errc EventuallyPersistentEngine::setFusionFlushParam(
1010+
std::string_view key, const std::string& val, std::string& msg) {
1011+
auto rv = cb::engine_errc::success;
1012+
if (key == "magma_fusion_migration_rate_limit") {
1013+
configuration.setMagmaFusionMigrationRateLimit(std::stoull(val));
1014+
} else if (key == "magma_fusion_sync_rate_limit") {
1015+
configuration.setMagmaFusionSyncRateLimit(std::stoull(val));
1016+
} else {
1017+
EP_LOG_WARN_CTX("Rejecting setFusionFlushParam request",
1018+
{"key", key},
1019+
{"value", val});
1020+
msg = "Unknown config param " + std::string{key};
1021+
rv = cb::engine_errc::invalid_arguments;
1022+
}
1023+
return rv;
1024+
}
1025+
10091026
cb::engine_errc EventuallyPersistentEngine::setDcpParam(std::string_view key,
10101027
const std::string& val,
10111028
std::string& msg) {

engines/ep/src/ep_engine.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,9 @@ class EventuallyPersistentEngine : public EngineIface, public DcpIface {
725725
cb::engine_errc setFlushParam(std::string_view key,
726726
const std::string& val,
727727
std::string& msg);
728+
cb::engine_errc setFusionFlushParam(std::string_view key,
729+
const std::string& val,
730+
std::string& msg);
728731

729732
cb::engine_errc setReplicationParam(std::string_view key,
730733
const std::string& val,

engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -900,6 +900,7 @@ MagmaKVStore::MagmaKVStore(MagmaKVStoreConfig& configuration,
900900
configuration.getFusionLogCheckpointInterval());
901901
magma->SetFusionMigrationRateLimit(
902902
configuration.getFusionMigrationRateLimit());
903+
magma->SetFusionSyncRateLimit(configuration.getFusionSyncRateLimit());
903904
}
904905
}
905906

@@ -4650,6 +4651,14 @@ size_t MagmaKVStore::getMagmaFusionMigrationRateLimit() const {
46504651
return magma->GetFusionMigrationRateLimit();
46514652
}
46524653

4654+
void MagmaKVStore::setMagmaFusionSyncRateLimit(size_t value) {
4655+
magma->SetFusionSyncRateLimit(value);
4656+
}
4657+
4658+
size_t MagmaKVStore::getMagmaFusionSyncRateLimit() const {
4659+
return magma->GetFusionSyncRateLimit();
4660+
}
4661+
46534662
cb::engine_errc MagmaKVStore::startFusionUploader(Vbid vbid, uint64_t term) {
46544663
const auto status =
46554664
magma->StartFusionUploader(Magma::KVStoreID(vbid.get()), term);

engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,9 @@ class MagmaKVStore : public KVStore {
680680
size_t getMagmaFusionMigrationRateLimit() const;
681681
void setMagmaFusionMigrationRateLimit(size_t value);
682682

683+
size_t getMagmaFusionSyncRateLimit() const;
684+
void setMagmaFusionSyncRateLimit(size_t value);
685+
683686
// Magma uses a unique logger with a prefix of magma so that all logging
684687
// calls from the wrapper thru magma will be prefixed with magma.
685688
std::shared_ptr<BucketLogger> logger;

engines/ep/src/kvstore/magma-kvstore/magma-kvstore_config.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ class MagmaKVStoreConfig::ConfigChangeListener : public ValueChangedListener {
4848
config.setContinousBackupInterval(std::chrono::seconds(value));
4949
} else if (key == "magma_fusion_migration_rate_limit") {
5050
config.setFusionMigrationRateLimit(value);
51+
} else if (key == "magma_fusion_sync_rate_limit") {
52+
config.setFusionSyncRateLimit(value);
5153
}
5254
}
5355

@@ -231,6 +233,10 @@ MagmaKVStoreConfig::MagmaKVStoreConfig(Configuration& config,
231233
config.addValueChangedListener(
232234
"magma_fusion_migration_rate_limit",
233235
std::make_unique<ConfigChangeListener>(*this));
236+
fusionSyncRateLimit = config.getMagmaFusionSyncRateLimit();
237+
config.addValueChangedListener(
238+
"magma_fusion_sync_rate_limit",
239+
std::make_unique<ConfigChangeListener>(*this));
234240
}
235241

236242
void MagmaKVStoreConfig::setStore(MagmaKVStore* store) {
@@ -244,6 +250,13 @@ void MagmaKVStoreConfig::setFusionMigrationRateLimit(size_t value) {
244250
}
245251
}
246252

253+
void MagmaKVStoreConfig::setFusionSyncRateLimit(size_t value) {
254+
fusionSyncRateLimit.store(value);
255+
if (store) {
256+
store->setMagmaFusionSyncRateLimit(value);
257+
}
258+
}
259+
247260
void MagmaKVStoreConfig::setMagmaFragmentationPercentage(size_t value) {
248261
magmaFragmentationPercentage.store(value);
249262
if (store) {

engines/ep/src/kvstore/magma-kvstore/magma-kvstore_config.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,12 @@ class MagmaKVStoreConfig : public KVStoreConfig {
305305

306306
void setFusionMigrationRateLimit(size_t value);
307307

308+
size_t getFusionSyncRateLimit() const {
309+
return fusionSyncRateLimit;
310+
}
311+
312+
void setFusionSyncRateLimit(size_t value);
313+
308314
magma::Magma::Config magmaCfg;
309315

310316
/**
@@ -587,4 +593,6 @@ class MagmaKVStoreConfig : public KVStoreConfig {
587593
std::chrono::seconds fusionLogCheckpointInterval;
588594
// The rate limit for Fusion extent migration, in bytes per second
589595
std::atomic<size_t> fusionMigrationRateLimit;
596+
// The rate limit for Fusion sync uploads, in bytes per second
597+
std::atomic<size_t> fusionSyncRateLimit;
590598
};

engines/ep/src/kvstore/magma-kvstore/magma-memory-tracking-proxy.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,16 @@ size_t MagmaMemoryTrackingProxy::GetFusionMigrationRateLimit() const {
774774
return magma->GetFusionMigrationRateLimit();
775775
}
776776

777+
void MagmaMemoryTrackingProxy::SetFusionSyncRateLimit(size_t limit) {
778+
cb::UseArenaMallocSecondaryDomain domainGuard;
779+
magma->SetFusionSyncRateLimit(limit);
780+
}
781+
782+
size_t MagmaMemoryTrackingProxy::GetFusionSyncRateLimit() const {
783+
cb::UseArenaMallocSecondaryDomain sGuard;
784+
return magma->GetFusionSyncRateLimit();
785+
}
786+
777787
magma::Status MagmaMemoryTrackingProxy::StartFusionUploader(
778788
magma::Magma::KVStoreID kvId, uint64_t term) {
779789
cb::UseArenaMallocSecondaryDomain d;

engines/ep/src/kvstore/magma-kvstore/magma-memory-tracking-proxy.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,10 @@ class MagmaMemoryTrackingProxy {
305305
void SetFusionMigrationRateLimit(size_t limit);
306306
size_t GetFusionMigrationRateLimit() const;
307307

308+
// The rate limit for Fusion sync uploads, in bytes per second.
309+
void SetFusionSyncRateLimit(size_t limit);
310+
size_t GetFusionSyncRateLimit() const;
311+
308312
/**
309313
* Start uploading data to FusionLogStore for the latest revision of the
310314
* given kvstore.

0 commit comments

Comments
 (0)