Skip to content

Commit 5df6af3

Browse files
committed
feat: Add asJavaStream to Sink
1 parent c708891 commit 5df6af3

File tree

8 files changed

+105
-0
lines changed

8 files changed

+105
-0
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Sink.asJavaStream
2+
3+
Create a sink which materializes into Java 8 `Stream` that can be run to trigger demand through the sink.
4+
5+
@ref[Sink operators](../index.md#sink-operators)
6+
7+
## Signature
8+
9+
@apidoc[Sink.asJavaStream](Sink$) { scala="#asJavaStream[T]():org.apache.pekko.stream.scaladsl.Sink[T,java.util.stream.Stream[T]]" java="#asJavaStream()" }
10+
11+
## Description
12+
13+
Create a sink which materializes into Java 8 `Stream` that can be run to trigger demand through the sink.
14+
Elements emitted through the stream will be available for reading through the Java 8 `Stream`.
15+
16+
The Java 8 `Stream` will be ended when the stream flowing into this `Sink` completes, and closing the Java
17+
`Stream` will cancel the inflow of this `Sink`. If the Java `Stream` throws an exception, the Pekko stream is cancelled.
18+
19+
Be aware that Java `Stream` blocks current thread while waiting on next element from downstream.
20+
21+
## Example
22+
23+
Here is an example of a @apidoc[Sink] that materializes into a @javadoc[java.util.stream.Stream](java.util.stream.Stream).
24+
25+
Scala
26+
: @@snip [StreamConvertersToJava.scala](/docs/src/test/scala/docs/stream/operators/converters/StreamConvertersToJava.scala) { #import #asJavaStreamOnSink }
27+
28+
Java
29+
: @@snip [StreamConvertersToJava.java](/docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java) { #import #asJavaStreamOnSink }
30+
31+
32+
## Reactive Streams semantics
33+
34+
@@@div { .callout }
35+
**cancels** when the Java Stream is closed
36+
37+
**backpressures** when no read is pending on the Java Stream
38+
@@@

docs/src/main/paradox/stream/operators/index.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
5050

5151
| |Operator|Description|
5252
|--|--|--|
53+
|Sink|<a name="asjavastream"></a>@ref[asJavaStream](Sink/asJavaStream.md)|Create a sink which materializes into Java 8 `Stream` that can be run to trigger demand through the sink.|
5354
|Sink|<a name="aspublisher"></a>@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.|
5455
|Sink|<a name="cancelled"></a>@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream|
5556
|Sink|<a name="collect"></a>@ref[collect](Sink/collect.md)|Collect all input elements using a Java @javadoc[Collector](java.util.stream.Collector).|
@@ -397,6 +398,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
397398
* [alsoToAll](Source-or-Flow/alsoToAll.md)
398399
* [asFlowWithContext](Flow/asFlowWithContext.md)
399400
* [asInputStream](StreamConverters/asInputStream.md)
401+
* [asJavaStream](Sink/asJavaStream.md)
400402
* [asJavaStream](StreamConverters/asJavaStream.md)
401403
* [ask](Source-or-Flow/ask.md)
402404
* [ask](ActorFlow/ask.md)

docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,17 @@ public void demonstrateConverterToJava8Stream() {
6666
assertEquals(5, jStream.count());
6767
}
6868

69+
@Test
70+
public void demonstrateConverterToJava8StreamOnSink() {
71+
// #asJavaStreamOnSink
72+
73+
Source<Integer, NotUsed> source = Source.range(0, 9).filter(i -> i % 2 == 0);
74+
Stream<Integer> jStream = source.runWith(Sink.asJavaStream(), system);
75+
76+
// #asJavaStreamOnSink
77+
assertEquals(5, jStream.count());
78+
}
79+
6980
@Test
7081
public void demonstrateCreatingASourceFromJava8Stream()
7182
throws InterruptedException, ExecutionException, TimeoutException {

docs/src/test/scala/docs/stream/operators/converters/StreamConvertersToJava.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,15 @@ class StreamConvertersToJava extends PekkoSpec with Futures {
4343
jStream.count should be(5)
4444
}
4545

46+
"demonstrate materialization to Java8 streams with methods on Sink" in {
47+
// #asJavaStreamOnSink
48+
val source: Source[Int, NotUsed] = Source(0 to 9).filter(_ % 2 == 0)
49+
50+
val jStream: java.util.stream.Stream[Int] = source.runWith(Sink.asJavaStream[Int]())
51+
// #asJavaStreamOnSink
52+
jStream.count should be(5)
53+
}
54+
4655
"demonstrate conversion from Java8 streams" in {
4756
// #fromJavaStream
4857
def factory(): IntStream = IntStream.rangeClosed(0, 9)

stream-tests/src/test/java/org/apache/pekko/stream/io/SinkAsJavaSourceTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,12 @@ public void mustBeAbleToUseAsJavaStream() throws Exception {
4444
java.util.stream.Stream<Integer> javaStream = Source.from(list).runWith(streamSink, system);
4545
assertEquals(list, javaStream.collect(Collectors.toList()));
4646
}
47+
48+
@Test
49+
public void mustBeAbleToUseAsJavaStreamOnSink() throws Exception {
50+
final List<Integer> list = Arrays.asList(1, 2, 3);
51+
java.util.stream.Stream<Integer> javaStream =
52+
Source.from(list).runWith(Sink.asJavaStream(), system);
53+
assertEquals(list, javaStream.collect(Collectors.toList()));
54+
}
4755
}

stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkAsJavaStreamSpec.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ class SinkAsJavaStreamSpec extends StreamSpec(UnboundedMailboxConfig) {
3232
"work in happy case" in {
3333
val javaSource = Source(1 to 100).runWith(StreamConverters.asJavaStream())
3434
javaSource.count() should ===(100L)
35+
//
36+
Source(1 to 100).runWith(Sink.asJavaStream())
37+
.count() should ===(100L)
3538
}
3639

3740
"fail if parent stream is failed" in {

stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,23 @@ object Sink {
212212
def asPublisher[T](fanout: AsPublisher): Sink[T, Publisher[T]] =
213213
new Sink(scaladsl.Sink.asPublisher(fanout == AsPublisher.WITH_FANOUT))
214214

215+
/**
216+
* Creates a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink.
217+
* Elements emitted through the stream will be available for reading through the Java 8 ``Stream``.
218+
*
219+
* The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java
220+
* ``Stream`` will cancel the inflow of this ``Sink``.
221+
*
222+
* Java 8 ``Stream`` throws exception in case reactive stream failed.
223+
*
224+
* Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream.
225+
* As it is interacting wit blocking API the implementation runs on a separate dispatcher
226+
* configured through the ``pekko.stream.blocking-io-dispatcher``.
227+
*
228+
* @since 2.0.0
229+
*/
230+
def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = new Sink(scaladsl.StreamConverters.asJavaStream())
231+
215232
/**
216233
* A `Sink` that will invoke the given procedure for each received element. The sink is materialized
217234
* into a [[java.util.concurrent.CompletionStage]] which will be completed with `Success` when reaching the

stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,23 @@ object Sink {
292292
if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink"))
293293
else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink")))
294294

295+
/**
296+
* Creates a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink.
297+
* Elements emitted through the stream will be available for reading through the Java 8 ``Stream``.
298+
*
299+
* The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java
300+
* ``Stream`` will cancel the inflow of this ``Sink``.
301+
*
302+
* If the Java 8 ``Stream`` throws exception the Pekko stream is cancelled.
303+
*
304+
* Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream.
305+
* As it is interacting wit blocking API the implementation runs on a separate dispatcher
306+
* configured through the ``pekko.stream.blocking-io-dispatcher``.
307+
*
308+
* @since 2.0.0
309+
*/
310+
def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = StreamConverters.asJavaStream()
311+
295312
/**
296313
* A `Sink` that will consume the stream and discard the elements.
297314
*/

0 commit comments

Comments
 (0)