File tree Expand file tree Collapse file tree 2 files changed +14
-2
lines changed
sdks/python/apache_beam/runners
portability/fn_api_runner Expand file tree Collapse file tree 2 files changed +14
-2
lines changed Original file line number Diff line number Diff line change @@ -1561,7 +1561,8 @@ def handle_process_outputs(
15611561 A value wrapped in a TaggedOutput object will be unwrapped and
15621562 then dispatched to the appropriate indexed output.
15631563 """
1564- results = results or []
1564+ if results is None :
1565+ results = []
15651566
15661567 # TODO(https://github.com/apache/beam/issues/20404): Verify that the
15671568 # results object is a valid iterable type if
@@ -1614,7 +1615,9 @@ def handle_process_batch_outputs(
16141615 A value wrapped in a TaggedOutput object will be unwrapped and
16151616 then dispatched to the appropriate indexed output.
16161617 """
1617- results = results or []
1618+ if results is None :
1619+ results = []
1620+
16181621 output_element_count = 0
16191622 for result in results :
16201623 tag , result = self ._handle_tagged_output (result )
Original file line number Diff line number Diff line change @@ -454,6 +454,15 @@ def test_pardo_side_input_dependencies(self):
454454 ExpectingSideInputsFn (f'Do{ k } ' ),
455455 * [beam .pvalue .AsList (inputs [s ]) for s in range (1 , k )]))
456456
457+ def test_flatmap_numpy_array (self ):
458+ with self .create_pipeline () as p :
459+ pc = (
460+ p
461+ | beam .Create ([np .array (range (10 ))])
462+ | beam .FlatMap (lambda arr : arr ))
463+
464+ assert_that (pc , equal_to ([np .int64 (i ) for i in range (10 )]))
465+
457466 @unittest .skip ('https://github.com/apache/beam/issues/21228' )
458467 def test_pardo_side_input_sparse_dependencies (self ):
459468 with self .create_pipeline () as p :
You can’t perform that action at this time.
0 commit comments