@@ -234,42 +234,44 @@ impl FileGroupPartitioner {
234234 . scan (
235235 ( current_partition_index, current_partition_size) ,
236236 |state, source_file| {
237- let mut produced_files = vec ! [ ] ;
238- let mut range_start = 0 ;
239- while range_start < source_file. object_meta . size {
240- // Skip splitting files smaller than repartition_file_min_size
241- // This may result in a number of partitions slightly smaller than requested
242- if source_file. object_meta . size < repartition_file_min_size {
243- state. 1 += source_file. object_meta . size ;
244- if state. 1 > target_partition_size {
237+ // Skip splitting files smaller than repartition_file_min_size
238+ // This may result in a number of partitions slightly smaller than requested
239+ if source_file. object_meta . size < repartition_file_min_size {
240+ state. 1 += source_file. object_meta . size ;
241+ if state. 1 > target_partition_size {
242+ state. 0 += 1 ;
243+ state. 1 = 0 ;
244+ }
245+ let small_file = ( state. 0 , source_file. clone ( ) ) ;
246+ Some ( vec ! [ small_file] )
247+ } else {
248+ let mut produced_files = vec ! [ ] ;
249+ let mut range_start = 0 ;
250+ while range_start < source_file. object_meta . size {
251+ let range_end = min (
252+ range_start + ( target_partition_size - state. 1 ) ,
253+ source_file. object_meta . size ,
254+ ) ;
255+
256+ let mut produced_file = source_file. clone ( ) ;
257+ produced_file. range = Some ( FileRange {
258+ start : range_start as i64 ,
259+ end : range_end as i64 ,
260+ } ) ;
261+ produced_files. push ( ( state. 0 , produced_file) ) ;
262+
263+ if state. 1 + ( range_end - range_start)
264+ >= target_partition_size
265+ {
245266 state. 0 += 1 ;
246267 state. 1 = 0 ;
268+ } else {
269+ state. 1 += range_end - range_start;
247270 }
248- produced_files. push ( ( state. 0 , source_file. clone ( ) ) ) ;
249- break ;
250- }
251-
252- let range_end = min (
253- range_start + ( target_partition_size - state. 1 ) ,
254- source_file. object_meta . size ,
255- ) ;
256-
257- let mut produced_file = source_file. clone ( ) ;
258- produced_file. range = Some ( FileRange {
259- start : range_start as i64 ,
260- end : range_end as i64 ,
261- } ) ;
262- produced_files. push ( ( state. 0 , produced_file) ) ;
263-
264- if state. 1 + ( range_end - range_start) >= target_partition_size {
265- state. 0 += 1 ;
266- state. 1 = 0 ;
267- } else {
268- state. 1 += range_end - range_start;
271+ range_start = range_end;
269272 }
270- range_start = range_end ;
273+ Some ( produced_files )
271274 }
272- Some ( produced_files)
273275 } ,
274276 )
275277 . flatten ( )
0 commit comments