File tree Expand file tree Collapse file tree 1 file changed +19
-1
lines changed
datafusion/datasource/src Expand file tree Collapse file tree 1 file changed +19
-1
lines changed Original file line number Diff line number Diff line change @@ -213,7 +213,13 @@ impl FileGroupPartitioner {
213213 . iter ( )
214214 . map ( |f| f. object_meta . size as i64 )
215215 . sum :: < i64 > ( ) ;
216- if total_size < ( repartition_file_min_size as i64 ) || total_size == 0 {
216+
217+ // bail if we are asked to *split* a set of files that are already too small
218+ // if we are being asked to consolidate, we proceed
219+ if ( total_size < ( repartition_file_min_size as i64 )
220+ && target_partitions >= file_groups. len ( ) )
221+ || total_size == 0
222+ {
217223 return None ;
218224 }
219225
@@ -231,6 +237,18 @@ impl FileGroupPartitioner {
231237 let mut produced_files = vec ! [ ] ;
232238 let mut range_start = 0 ;
233239 while range_start < source_file. object_meta . size {
240+ // Skip splitting files smaller than repartition_file_min_size
241+ // This may result in a number of partitions slightly smaller than requested
242+ if source_file. object_meta . size < repartition_file_min_size {
243+ state. 1 += source_file. object_meta . size ;
244+ if state. 1 > target_partition_size {
245+ state. 0 += 1 ;
246+ state. 1 = 0 ;
247+ }
248+ produced_files. push ( ( state. 0 , source_file. clone ( ) ) ) ;
249+ break ;
250+ }
251+
234252 let range_end = min (
235253 range_start + ( target_partition_size - state. 1 ) ,
236254 source_file. object_meta . size ,
You can’t perform that action at this time.
0 commit comments