@@ -465,7 +465,6 @@ public Read<K, V> withConfiguration(Configuration configuration) {
465465 if (getValueTranslationFunction () == null ) {
466466 builder .setValueTypeDescriptor ((TypeDescriptor <V >) inputFormatValueClass );
467467 }
468-
469468 return builder .build ();
470469 }
471470
@@ -728,11 +727,8 @@ public List<BoundedSource<KV<K, V>>> split(long desiredBundleSizeBytes, Pipeline
728727 LOG .info ("Not splitting source {} because source is already split." , this );
729728 return ImmutableList .of (this );
730729 }
731-
732730 computeSplitsIfNecessary ();
733-
734731 reportSourceLineage (inputSplits );
735-
736732 LOG .info (
737733 "Generated {} splits. Size of first split is {} " ,
738734 inputSplits .size (),
@@ -754,17 +750,13 @@ public List<BoundedSource<KV<K, V>>> split(long desiredBundleSizeBytes, Pipeline
754750
755751 /** Report only file-based sources */
756752 private void reportSourceLineage (final List <SerializableSplit > inputSplits ) {
757- List <ResourceId > fileResources = new ArrayList <>();
758-
759- for (SerializableSplit split : inputSplits ) {
760- InputSplit inputSplit = split .getSplit ();
761-
762- if (inputSplit instanceof FileSplit ) {
763- String pathString = ((FileSplit ) inputSplit ).getPath ().toString ();
764- ResourceId resourceId = FileSystems .matchNewResource (pathString , false );
765- fileResources .add (resourceId );
766- }
767- }
753+ List <ResourceId > fileResources =
754+ inputSplits .stream ()
755+ .map (SerializableSplit ::getSplit )
756+ .filter (FileSplit .class ::isInstance )
757+ .map (FileSplit .class ::cast )
758+ .map (fileSplit -> FileSystems .matchNewResource (fileSplit .getPath ().toString (), false ))
759+ .collect (Collectors .toList ());
768760
769761 FileSystems .reportSourceLineage (fileResources );
770762 }
0 commit comments