Skip to content

Commit ec6e2d5

Browse files
committed
Support Supervision.restart for SubFlow's
1 parent 5376692 commit ec6e2d5

File tree

1 file changed

+22
-12
lines changed

1 file changed

+22
-12
lines changed

stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -511,13 +511,6 @@ import pekko.util.ccompat.JavaConverters._
511511
private var substreamWaitingToBePushed = false
512512
private var substreamCancelled = false
513513

514-
def propagateSubstreamCancel(ex: Throwable): Boolean =
515-
decider(ex) match {
516-
case Supervision.Stop => true
517-
case Supervision.Resume => false
518-
case Supervision.Restart => false
519-
}
520-
521514
setHandler(
522515
out,
523516
new OutHandler {
@@ -614,14 +607,31 @@ import pekko.util.ccompat.JavaConverters._
614607

615608
override def onDownstreamFinish(cause: Throwable): Unit = {
616609
substreamCancelled = true
617-
if (isClosed(in) || propagateSubstreamCancel(cause)) {
618-
cancelStage(cause)
619-
} else {
620-
// Start draining
621-
if (!hasBeenPulled(in)) pull(in)
610+
decider(cause) match {
611+
case Supervision.Stop =>
612+
cancelStage(cause)
613+
case Supervision.Resume =>
614+
if (isClosed(in)) cancelStage(cause)
615+
else {
616+
// Start draining
617+
if (!hasBeenPulled(in)) pull(in)
618+
}
619+
case Supervision.Restart =>
620+
if (isClosed(in)) completeStage()
621+
else {
622+
restartState()
623+
// Start draining
624+
if (!hasBeenPulled(in)) pull(in)
625+
}
622626
}
623627
}
624628

629+
private def restartState(): Unit = {
630+
substreamSource = null
631+
substreamWaitingToBePushed = false
632+
substreamCancelled = false
633+
}
634+
625635
override def onPush(): Unit = {
626636
val elem = grab(in)
627637
try {

0 commit comments

Comments
 (0)