Skip to content

Commit 7ae5a63

Browse files
authored
fix: better asset canister upload chunking (#4442)
1 parent 375b7ed commit 7ae5a63

File tree

3 files changed

+159
-50
lines changed

3 files changed

+159
-50
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
# UNRELEASED
44

5+
### Improve frontend canister sync logic
6+
7+
Previously, committing frontend canister changes happened in multiple batches defined by simple heuristics that would likely not exceed the ingress message size limit.
8+
Now, the ingress message size limit is respected more explicitly, and also a limit of total content size per batch since all content in the batch newly gets hashed in the canister.
9+
510
## Dependencies
611

712
### Motoko

src/canisters/frontend/ic-asset/src/batch_upload/plumbing.rs

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pub enum Mode {
5959

6060
type IdMapping = BTreeMap<usize, Nat>;
6161
type UploadQueue = Vec<(usize, Vec<u8>)>;
62+
type CanisterChunkSizeMap = HashMap<Nat, usize>;
6263
pub(crate) struct ChunkUploader<'agent> {
6364
canister: Canister<'agent>,
6465
batch_id: Nat,
@@ -68,6 +69,8 @@ pub(crate) struct ChunkUploader<'agent> {
6869
// maps uploader_chunk_id to canister_chunk_id
6970
id_mapping: Arc<Mutex<IdMapping>>,
7071
upload_queue: Arc<Mutex<UploadQueue>>,
72+
// maps canister_chunk_id to chunk size
73+
canister_chunk_sizes: Arc<Mutex<CanisterChunkSizeMap>>,
7174
}
7275

7376
impl<'agent> ChunkUploader<'agent> {
@@ -80,6 +83,7 @@ impl<'agent> ChunkUploader<'agent> {
8083
bytes: Arc::new(AtomicUsize::new(0)),
8184
id_mapping: Arc::new(Mutex::new(BTreeMap::new())),
8285
upload_queue: Arc::new(Mutex::new(vec![])),
86+
canister_chunk_sizes: Arc::new(Mutex::new(HashMap::new())),
8387
}
8488
}
8589

@@ -93,8 +97,10 @@ impl<'agent> ChunkUploader<'agent> {
9397
progress: Option<&dyn AssetSyncProgressRenderer>,
9498
) -> Result<usize, CreateChunkError> {
9599
let uploader_chunk_id = self.chunks.fetch_add(1, Ordering::SeqCst);
96-
self.bytes.fetch_add(contents.len(), Ordering::SeqCst);
97-
if contents.len() == MAX_CHUNK_SIZE || self.api_version < 2 {
100+
let chunk_size = contents.len();
101+
self.bytes.fetch_add(chunk_size, Ordering::SeqCst);
102+
103+
if chunk_size == MAX_CHUNK_SIZE || self.api_version < 2 {
98104
let canister_chunk_id = create_chunk(
99105
&self.canister,
100106
&self.batch_id,
@@ -103,8 +109,15 @@ impl<'agent> ChunkUploader<'agent> {
103109
progress,
104110
)
105111
.await?;
106-
let mut map = self.id_mapping.lock().await;
107-
map.insert(uploader_chunk_id, canister_chunk_id);
112+
113+
self.id_mapping
114+
.lock()
115+
.await
116+
.insert(uploader_chunk_id, canister_chunk_id.clone());
117+
self.canister_chunk_sizes
118+
.lock()
119+
.await
120+
.insert(canister_chunk_id, chunk_size);
108121

109122
Ok(uploader_chunk_id)
110123
} else {
@@ -146,6 +159,15 @@ impl<'agent> ChunkUploader<'agent> {
146159
self.chunks.load(Ordering::SeqCst)
147160
}
148161

162+
/// Get total size of chunks by their canister chunk IDs
163+
pub(crate) async fn get_canister_chunk_total_size(&self, canister_chunk_ids: &[Nat]) -> usize {
164+
let sizes = self.canister_chunk_sizes.lock().await;
165+
canister_chunk_ids
166+
.iter()
167+
.filter_map(|id| sizes.get(id))
168+
.sum()
169+
}
170+
149171
/// Call only after `finalize_upload` has completed.
150172
pub(crate) async fn uploader_ids_to_canister_chunk_ids(
151173
&self,
@@ -212,15 +234,27 @@ impl<'agent> ChunkUploader<'agent> {
212234
}
213235

214236
try_join_all(batches.into_iter().map(|chunks| async move {
215-
let (uploader_chunk_ids, chunks): (Vec<_>, Vec<_>) = chunks.into_iter().unzip();
216-
let canister_chunk_ids =
217-
create_chunks(&self.canister, &self.batch_id, chunks, semaphores, progress).await?;
218-
let mut map = self.id_mapping.lock().await;
219-
for (uploader_id, canister_id) in uploader_chunk_ids
237+
let (uploader_chunk_ids, chunk_contents): (Vec<_>, Vec<_>) = chunks.into_iter().unzip();
238+
let chunk_sizes: Vec<usize> = chunk_contents.iter().map(|c| c.len()).collect();
239+
240+
let canister_chunk_ids = create_chunks(
241+
&self.canister,
242+
&self.batch_id,
243+
chunk_contents,
244+
semaphores,
245+
progress,
246+
)
247+
.await?;
248+
249+
let mut id_map = self.id_mapping.lock().await;
250+
let mut canister_sizes = self.canister_chunk_sizes.lock().await;
251+
for ((uploader_id, canister_id), chunk_size) in uploader_chunk_ids
220252
.into_iter()
221253
.zip(canister_chunk_ids.into_iter())
254+
.zip(chunk_sizes.into_iter())
222255
{
223-
map.insert(uploader_id, canister_id);
256+
id_map.insert(uploader_id, canister_id.clone());
257+
canister_sizes.insert(canister_id, chunk_size);
224258
}
225259
Ok(())
226260
}))

src/canisters/frontend/ic-asset/src/sync.rs

Lines changed: 110 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,15 @@ use walkdir::WalkDir;
4545
const KNOWN_DIRECTORIES: [&str; 1] = [".well-known"];
4646

4747
/// Sets the contents of the asset canister to the contents of a directory, including deleting old assets.
48-
pub async fn upload_content_and_assemble_sync_operations(
49-
canister: &Canister<'_>,
48+
pub async fn upload_content_and_assemble_sync_operations<'a>(
49+
canister: &Canister<'a>,
5050
canister_api_version: u16,
5151
dirs: &[&Path],
5252
no_delete: bool,
5353
mode: batch_upload::plumbing::Mode,
5454
logger: &Logger,
5555
progress: Option<&dyn AssetSyncProgressRenderer>,
56-
) -> Result<CommitBatchArguments, UploadContentError> {
56+
) -> Result<(CommitBatchArguments, ChunkUploader<'a>), UploadContentError> {
5757
if let Some(progress) = progress {
5858
progress.set_state(AssetSyncState::GatherAssetDescriptors);
5959
}
@@ -144,7 +144,7 @@ pub async fn upload_content_and_assemble_sync_operations(
144144
// -vv
145145
trace!(logger, "Value of CommitBatch: {:?}", commit_batch_args);
146146

147-
Ok(commit_batch_args)
147+
Ok((commit_batch_args, chunk_uploader))
148148
}
149149

150150
/// Sets the contents of the asset canister to the contents of a directory, including deleting old assets.
@@ -156,7 +156,7 @@ pub async fn sync(
156156
progress: Option<&dyn AssetSyncProgressRenderer>,
157157
) -> Result<(), SyncError> {
158158
let canister_api_version = api_version(canister).await;
159-
let commit_batch_args = upload_content_and_assemble_sync_operations(
159+
let (commit_batch_args, chunk_uploader) = upload_content_and_assemble_sync_operations(
160160
canister,
161161
canister_api_version,
162162
dirs,
@@ -180,64 +180,134 @@ pub async fn sync(
180180
warn!(logger, "The asset canister is running an old version of the API. It will not be able to set assets properties.");
181181
commit_batch(canister, commit_batch_args_v0).await
182182
}
183-
BATCH_UPLOAD_API_VERSION.. => commit_in_stages(canister, commit_batch_args, logger, progress).await,
183+
BATCH_UPLOAD_API_VERSION.. => commit_in_stages(canister, commit_batch_args, &chunk_uploader, logger, progress).await,
184184
}.map_err(CommitBatchFailed)?;
185185
if let Some(progress) = progress {
186186
progress.set_state(AssetSyncState::Done);
187187
}
188188
Ok(())
189189
}
190190

191-
async fn commit_in_stages(
191+
/// Creates batches of operations for committing.
192+
///
193+
/// Batches are created based on three conditions (any of which triggers a new batch):
194+
/// 1. 500 operations reached - generally respected limit to avoid too much cert tree work
195+
/// 2. 1.5MB of header map data reached - headers are the largest part of ingress message size
196+
/// 3. 100MB of total chunk size reached - the full asset content gets hashed in the commit message
197+
async fn create_commit_batches<'a>(
198+
operations: Vec<BatchOperationKind>,
199+
chunk_uploader: &ChunkUploader<'a>,
200+
) -> Vec<Vec<BatchOperationKind>> {
201+
const MAX_OPERATIONS_PER_BATCH: usize = 500; // empirically this works good enough
202+
const MAX_HEADER_MAP_SIZE: usize = 1_500_000; // 1.5 MB leaves plenty of room for other data that we do not calculate precisely
203+
const MAX_ASSET_CONTENT_SIZE: usize = 100_000_000; // 100 MB is ~20% of how much data we can hash in a single message: 40b instructions per update call, measured best case of 80 instructions per byte hashed -> ~500MB limit
204+
205+
let mut batches = Vec::new();
206+
let mut current_batch = Vec::new();
207+
let mut operation_count = 0;
208+
let mut header_map_size = 0;
209+
let mut content_size = 0;
210+
211+
for operation in operations {
212+
let operation_header_size = calculate_header_size(&operation);
213+
let operation_chunk_size = calculate_content_size(&operation, chunk_uploader).await;
214+
215+
// Check if adding this operation would exceed any limits
216+
let would_exceed_operation_limit = operation_count >= MAX_OPERATIONS_PER_BATCH;
217+
let would_exceed_header_limit =
218+
header_map_size + operation_header_size >= MAX_HEADER_MAP_SIZE;
219+
let would_exceed_chunk_limit =
220+
content_size + operation_chunk_size >= MAX_ASSET_CONTENT_SIZE;
221+
222+
if (would_exceed_operation_limit || would_exceed_header_limit || would_exceed_chunk_limit)
223+
&& !current_batch.is_empty()
224+
{
225+
// Start a new batch
226+
batches.push(current_batch);
227+
current_batch = Vec::new();
228+
operation_count = 0;
229+
header_map_size = 0;
230+
content_size = 0;
231+
}
232+
233+
// Add operation to current batch
234+
current_batch.push(operation);
235+
operation_count += 1;
236+
header_map_size += operation_header_size;
237+
content_size += operation_chunk_size;
238+
}
239+
240+
// Add the last batch if it has any operations
241+
if !current_batch.is_empty() {
242+
batches.push(current_batch);
243+
}
244+
245+
batches
246+
}
247+
248+
/// Calculate the size in bytes of header data for an operation.
249+
fn calculate_header_size(operation: &BatchOperationKind) -> usize {
250+
match operation {
251+
BatchOperationKind::CreateAsset(args) => args.headers.as_ref().map_or(0, |headers| {
252+
headers.iter().map(|(k, v)| k.len() + v.len()).sum()
253+
}),
254+
BatchOperationKind::SetAssetProperties(args) => args
255+
.headers
256+
.as_ref()
257+
.and_then(|h| h.as_ref())
258+
.map_or(0, |headers| {
259+
headers.iter().map(|(k, v)| k.len() + v.len()).sum()
260+
}),
261+
_ => 0,
262+
}
263+
}
264+
265+
/// Calculate the size in bytes of chunk data for an operation.
266+
/// This includes both:
267+
/// - Chunks referenced by `chunk_ids` (looked up from ChunkUploader)
268+
/// - The `last_chunk` field which is included directly in the commit message
269+
async fn calculate_content_size<'a>(
270+
operation: &BatchOperationKind,
271+
chunk_uploader: &ChunkUploader<'a>,
272+
) -> usize {
273+
match operation {
274+
BatchOperationKind::SetAssetContent(args) => {
275+
let chunk_ids_size = chunk_uploader
276+
.get_canister_chunk_total_size(&args.chunk_ids)
277+
.await;
278+
let last_chunk_size = args.last_chunk.as_ref().map_or(0, |chunk| chunk.len());
279+
chunk_ids_size + last_chunk_size
280+
}
281+
_ => 0,
282+
}
283+
}
284+
285+
async fn commit_in_stages<'a>(
192286
canister: &Canister<'_>,
193287
commit_batch_args: CommitBatchArguments,
288+
chunk_uploader: &ChunkUploader<'a>,
194289
logger: &Logger,
195290
progress: Option<&dyn AssetSyncProgressRenderer>,
196291
) -> Result<(), AgentError> {
197292
if let Some(progress) = progress {
198293
progress.set_total_batch_operations(commit_batch_args.operations.len());
199294
}
200-
// Note that SetAssetProperties operations are only generated for assets that
201-
// already exist, since CreateAsset operations set all properties.
202-
let (set_properties_operations, other_operations): (Vec<_>, Vec<_>) = commit_batch_args
203-
.operations
204-
.into_iter()
205-
.partition(|op| matches!(op, BatchOperationKind::SetAssetProperties(_)));
206-
207-
// This part seems reasonable in general as a separate batch
208-
for operations in set_properties_operations.chunks(500) {
209-
debug!(logger, "Setting properties of {} assets.", operations.len());
210-
commit_batch(
211-
canister,
212-
CommitBatchArguments {
213-
batch_id: Nat::from(0_u8),
214-
operations: operations.into(),
215-
},
216-
)
217-
.await?;
218-
if let Some(progress) = progress {
219-
progress.add_committed_batch_operations(operations.len());
220-
}
221-
}
222295

223-
// Seen to work at 800 ({"SetAssetContent": 932, "Delete": 47, "CreateAsset": 58})
224-
// so 500 shouldn't exceed per-message instruction limit
225-
for operations in other_operations.chunks(500) {
226-
debug!(
227-
logger,
228-
"Committing batch with {} operations.",
229-
operations.len()
230-
);
296+
let batches = create_commit_batches(commit_batch_args.operations, chunk_uploader).await;
297+
298+
for operations in batches {
299+
let op_amount = operations.len();
300+
debug!(logger, "Committing batch with {op_amount} operations.");
231301
commit_batch(
232302
canister,
233303
CommitBatchArguments {
234304
batch_id: Nat::from(0_u8),
235-
operations: operations.into(),
305+
operations,
236306
},
237307
)
238308
.await?;
239309
if let Some(progress) = progress {
240-
progress.add_committed_batch_operations(operations.len());
310+
progress.add_committed_batch_operations(op_amount);
241311
}
242312
}
243313

@@ -260,7 +330,7 @@ pub async fn prepare_sync_for_proposal(
260330
progress: Option<&dyn AssetSyncProgressRenderer>,
261331
) -> Result<(Nat, ByteBuf), PrepareSyncForProposalError> {
262332
let canister_api_version = api_version(canister).await;
263-
let arg = upload_content_and_assemble_sync_operations(
333+
let (arg, _chunk_uploader) = upload_content_and_assemble_sync_operations(
264334
canister,
265335
canister_api_version,
266336
dirs,

0 commit comments

Comments
 (0)