Releases: ReactiveX/RxJava
0.20.2
- Pull 1637 Optimize single BlockingObservable operations
Artifacts: Maven Central
0.20.1
0.20.0
RxJava 0.20.0 is a major release that adds "reactive pull" support for backpressure along with several other enhancements leading into the 1.0 release.
Reactive Pull for Backpressure
Solutions for backpressure was the major focus of this release. A "reactive pull" implementation was implemented. Documentation on this and other options for backpressure are found in the wiki: https://github.com/ReactiveX/RxJava/wiki/Backpressure
The reactive pull solution evolved out of several prototypes and interaction with many people over the months.
Signature Changes
A new type Producer has been added:
public interface Producer {
public void request(long n);
}The Subscriber type now has these methods added:
public abstract class Subscriber<T> implements Observer<T>, Subscription {
public void onStart();
protected final void request(long n);
public final void setProducer(Producer producer);
}Examples
This trivial example shows requesting values one at a time:
Observable.from(1, 2, 3, 4).subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
// on start this tells it to request 1
// otherwise it defaults to request(Long.MAX_VALUE)
request(1);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer t) {
System.out.println(t);
// as each onNext is consumed, request another
// otherwise the Producer will not send more
request(1);
}
});The OnSubscribeFromIterable operator shows how an Iterable is consumed with backpressure.
Some hi-lights (modified for simplicity rather than performance and completeness):
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {
@Override
public void call(final Subscriber<? super T> o) {
final Iterator<? extends T> it = is.iterator();
// instead of emitting directly to the Subscriber, it emits a Producer
o.setProducer(new IterableProducer<T>(o, it));
}
private static final class IterableProducer<T> implements Producer {
public void request(long n) {
int _c = requested.getAndAdd(n);
if (_c == 0) {
while (it.hasNext()) {
if (o.isUnsubscribed()) {
return;
}
T t = it.next();
o.onNext(t);
if (requested.decrementAndGet() == 0) {
// we're done emitting the number requested so return
return;
}
}
o.onCompleted();
}
}
}
}The observeOn operator is a sterotypical example of queuing on one side of a thread and draining on the other, now with backpressure.
private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
@Override
public void onStart() {
// signal that this is an async operator capable of receiving this many
request(RxRingBuffer.SIZE);
}
@Override
public void onNext(final T t) {
try {
// enqueue
queue.onNext(t);
} catch (MissingBackpressureException e) {
// fail if the upstream has not obeyed our backpressure requests
onError(e);
return;
}
// attempt to schedule draining if needed
schedule();
}
// the scheduling polling will then drain the queue and invoke `request(n)` to request more after draining
}Many use cases will be able to use Observable.from, Observable.onBackpressureDrop and Observable.onBackpressureBuffer to achieve "reactive pull backpressure" without manually implementing Producer logic. Also, it is optional to make an Observable support backpressure. It can remain completely reactive and just push events as it always has. Most uses of RxJava this works just fine. If backpressure is needed then it can be migrated to use a Producer or several other approaches to flow control exist such as throttle, sample, debounce, window, buffer, onBackpressureBuffer, and onBackpressureDrop.
The wiki provides further documentation.
Relation to Reactive Streams
Contributors to RxJava are involved in defining the Reactive Streams spec. RxJava 1.0 is trying to comply with the semantic rules but is not attempting to comply with the type signatures. It will however have a separate module that acts as a bridge between the RxJava Observable and the Reactive Stream types.
The reasons for this are:
- Rx has
Observer.onCompletedwhereas Reactive Streams hasonComplete. This is a massive breaking change to remove a "d". - The RxJava
Subscriptionis used also a "Closeable"/"Disposable" and it does not work well to make it now also be used forrequest(n), hence the separate typeProducerin RxJava. It was attempted to reuserx.Subscriptionbut it couldn't be done without massive breaking changes. - Reactive Streams uses
onSubscribe(Subscription s)whereas RxJava injects theSubscriptionas theSubscriber. Again, this change could not be done without major breaking changes. - RxJava 1.0 needs to be backwards compatible with the major Rx contracts established during the 0.x roadmap.
Considering these things, the major semantics of request(long n) for backpressure are compatible and this will allow interop with a bridge between the interfaces.
New Features
Compose/Transformer
The compose operator is similar to lift but allows custom operator implementations that are chaining Observable operators whereas lift is directly implementing the raw Subscriber logic.
Here is a trival example demonstrating how using compose is a better option than lift when existing Observable operators can be used to achieve the custom behavior.
import rx.Observable;
import rx.Observable.Operator;
import rx.Observable.Transformer;
import rx.Subscriber;
public class ComposeExample {
public static void main(String[] args) {
Observable.just("hello").compose(appendWorldTransformer()).forEach(System.out::println);
Observable.just("hello").lift(appendWorldOperator()).forEach(System.out::println);
}
// if existing operators can be used, compose with Transformer is ideal
private static Transformer<? super String, String> appendWorldTransformer() {
return o -> o.map(s -> s + " world!").finallyDo(() -> {
System.out.println(" some side-effect");
});
}
// whereas lift is more low level
private static Operator<? super String, String> appendWorldOperator() {
return new Operator<String, String>() {
@Override
public Subscriber<? super String> call(Subscriber<? super String> child) {
return new Subscriber<String>(child) {
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(String t) {
child.onNext(t + " world!");
System.out.println(" some side-effect");
}
};
}
};
}
}retryWhen/repeatWhen
New operators retryWhen and repeatWhen were added which offer support for more advanced recursion such as retry with exponential backoff.
Here is an example that increases delay between each retry:
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
}).toBlocking().forEach(System.out::println);Breaking Changes
The use of Producer has been added in such a way that it is optional and additive, but some operators that used to have unbounded queues are now bounded. This means that if a source Observable emits faster than the Observer can consume them, a MissingBackpressureException can be emitted via onError.
This semantic change can break existing code.
There are two ways of resolving this:
- Modify the source
Observableto useProducerand support backpressure. - Use newly added operators such as
onBackpressureBufferoronBackpressureDropto choose a strategy for the sourceObservableof how to behave when it emits more data than the consumingObserveris capable of handling. Use ofonBackpressureBuffereffectively returns it to having an unbounded buffer and behaving like version 0.19 or earlier.
Example:
sourceObservable.onBackpressureBuffer().subscribe(slowConsumer);Deprecations
Various methods, operators or classes have been deprecated and will be removed in 1.0. Primarily they have been done to remove ambiguity, remove nuanced functionality that is easy to use wrong, clear out superfluous methods and eliminate cruft that was add...
0.20.0-RC6
Further fixes and enhancements bringing us close to completing 0.20.0 and almost ready for 1.0.
A more major change in this release is the deprecation of Observable.from(T). The full discussion can be seen in #1563.
- Pull 1575 combineLatest with backpressure
- Pull 1569 Compose/Transform Covariance
- Pull 1577 Fix the compose covariance
- Pull 1581 zip(Iterable) -> zipWith(Iterable)
- Pull 1582 Deprecate GroupedObservable.from
- Pull 1583 Redo/Repeat Backpressure
- Pull 1576 from(T) -> just(T)
- Pull 1545 Make Android ViewObservable.input observe TextView instead of String
Artifacts: Maven Central
0.20.0-RC5
Version 0.20.0-RC5 updates parallel, buffer(size), switchOnNext, repeat, and retry to support "reactive pull" backpressure. It adds a groupBy overload with an element selector, a new compose method as an alternative to lift for custom operators, fixes bugs and other general improvements.
There are still oustanding items being tracked for 0.20 that need to be completed for the final release.
- Pull 1573 Backpressure: parallel
- Pull 1572 Remove Timeout in Blocking Iterator
- Pull 1570 RxClojure: Fix for mapcat
- Pull 1568 Compose/Transformer
- Pull 1567 groupBy with element selector
- Pull 1565 Fixing Kotlin Defer
- Pull 1507 BufferWithSize with Backpressure Support
- Pull 1557 SwitchOnNext with backpressure support
- Pull 1562 TakeLastTimed with backpressure support
- Pull 1564 RxScala: Fix errors in Completeness.scala and also improve it
- Pull 1548 Adding backpressure to OnSubscribeRedo
- Pull 1559 More consistent hooks for scheduler plugins
- Pull 1561 Remove Variance on Defer
- Pull 1537 recursive scheduling in RxScala
- Pull 1560 flatMap overloads
- Pull 1558 mergeMap generics
- Pull 1552 Fixing a bug and a potential for other concurrency issues
- Pull 1555 RxScala: Add retryWhen/repeatWhen methods
Artifacts: Maven Central
0.20.0-RC4
Version 0.20.0-RC4 continues bug fixes and completing work related to "reactive pull" backpressure. This release updates amb and concat to connect the backpressure request.
Internal uses of RxRingBuffer migrated to using SpmcArrayQueue which significantly reduces object allocations. See #1526 for details.
The multicast operators were updated to use a Subject factory so that Observable sequences can be reused. See #1515 for details.
- Pull 1534 Concat Backpressure
- Pull 1533 Amb + Backpressure
- Pull 1527 Failing unit test for reduce, showing it does not implement backpressure correctly
- Pull 1528 Add operators to create Observables from BroadcastReceiver (rebased)
- Pull 1523 Fix issue #1522: takeLast
- Pull 1530 Fix the unbounded check for merge
- Pull 1526 Restore use of SpmcArrayQueue in RxRingBuffer
- Pull 1468 RxScala: Update CompletenessTest.scala
- Pull 1515 Support Subject Factory with Multicast
- Pull 1518 Fix typos in javadoc comments
- Pull 1521 Fix toIterator Exception Handling
- Pull 1520 Fix non-deterministic RxRingBuffer test
Artifacts: Maven Central
0.20.0-RC3
Version 0.20.0-RC3 preview release fixes several bugs related to backpressure and adds retryWhen, repeatWhen for more advanced recursion use cases like retry with exponential backoff.
This version passed the Netflix API production canary process. Please test this against your code to help us find any issues before we release 0.20.0.
- Pull 1493 retryWhen/repeatWhen
- Pull 1494 zipWith
- Pull 1501 blocking synchronous next
- Pull 1498 non-deterministic testUserSubscriberUsingRequestAsync
- Pull 1497 spsc ring buffer concurrency test
- Pull 1496 Change RxRingBuffer Queue Usage
- Pull 1491 Concat Outer Backpressure
- Pull 1490 non-deterministic timeouts on slow machines
- Pull 1489 Backpressure Fixes and Docs
- Pull 1474 Ignore backpressure for OperatorToObservableSortedList
- Pull 1473 OperatorAny needs to handle backpressure
- Pull 1472 Add test of backpressure to OperatorAll
- Pull 1469 ToList operator needs to ignore backpressure
- Pull 1393 Add cache(int capacity) to Observable
- Pull 1431 CompositeException fix for Android
- Pull 1436 Correct warnings
Artifacts: Maven Central
0.20.0-RC2
Version 0.20.0-RC2 preview release adds support for backpressure to the zip operators, fixes bugs and removes the Subscribe.onSetProducer method.
This means signature changes are modified to be:
The new type Producer ->
public interface Producer {
public void request(long n);
}New methods added to Subscriber ->
public abstract class Subscriber<T> implements Observer<T>, Subscription {
public void onStart();
protected final void request(long n);
public final void setProducer(Producer producer);
}- Pull 1448 RxScala: Add Scala idiomatic methods
- Pull 1446 Zip with Backpressure Support
- Pull 1454 doOnEachObserver fix
- Pull 1457 MergeDelayError & OnErrorFlatMap w/ Merge
- Pull 1458 Remove Pivot Operator
- Pull 1459 Remove Subscriber.onSetProducer
- Pull 1462 Merge Perf Fix: Re-enable fast-path
- Pull 1463 Merge Bug: Missing Emissions
Artifacts: Maven Central
0.20.0-RC1
Version 0.20.0-RC1 is a preview release that adds backpressure support to RxJava as per issue #1000. It has been done in a way that is mostly additive and most existing code will not be affected by these additions. A section below on "Breaking Changes" will discuss use cases that do break and how to deal with them.
This release has been tested successfully in Netflix production canaries, but that does not exercise all use cases or operators, nor does it leverage the newly added backpressure functionality (though the backpressure code paths are used).
Outstanding Work
- The
zipoperator has not yet been upgraded to support backpressure. The work is almost done and it will be included in the next release. - Not all operators have yet been reviewed for whether they need to be changed in any way.
- Temporal operators (like
buffer,window,sample, etc) need to be modified to disable backpressure upstream (usingrequest(Long.MAX_VALUE)) and a decision made about how downstream backpressure requests will be supported. - Ensure all code works on Android. New data structures rely on
sun.misc.Unsafebut are conditionally used only when it is available. We need to ensure those conditions are working and the alternative implementations are adequate. The default buffer size of 1024 also needs to be reviewed for whether it is a correct default for all systems, or needs to be modified by environment (such as smaller for Android). - Ensure use cases needing backpressure all work.
Signature Changes
A new type Producer has been added:
public interface Producer {
public void request(long n);
}The Subscriber type now has these methods added:
public abstract class Subscriber<T> implements Observer<T>, Subscription {
public void onStart();
public final void request(long n);
public final void setProducer(Producer producer);
protected Producer onSetProducer(Producer producer);
}Examples
This trivial example shows requesting values one at a time:
Observable.from(1, 2, 3, 4).subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(1);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer t) {
request(1);
}
});The OnSubscribeFromIterable operator shows how an Iterable is consumed with backpressure.
Some hi-lights (modified for simplicity rather than performance and completeness):
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {
@Override
public void call(final Subscriber<? super T> o) {
final Iterator<? extends T> it = is.iterator();
// instead of emitting directly to the Subscriber, it emits a Producer
o.setProducer(new IterableProducer<T>(o, it));
}
private static final class IterableProducer<T> implements Producer {
public void request(long n) {
int _c = requested.getAndAdd(n);
if (_c == 0) {
while (it.hasNext()) {
if (o.isUnsubscribed()) {
return;
}
T t = it.next();
o.onNext(t);
if (requested.decrementAndGet() == 0) {
// we're done emitting the number requested so return
return;
}
}
o.onCompleted();
}
}
}
}The observeOn operator is a sterotypical example of queuing on one side of a thread and draining on the other, now with backpressure.
private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
@Override
public void onStart() {
// signal that this is an async operator capable of receiving this many
request(RxRingBuffer.SIZE);
}
@Override
public void onNext(final T t) {
try {
// enqueue
queue.onNext(t);
} catch (MissingBackpressureException e) {
// fail if the upstream has not obeyed our backpressure requests
onError(e);
return;
}
// attempt to schedule draining if needed
schedule();
}
// the scheduling polling will then drain the queue and invoke `request(n)` to request more after draining
}Breaking Changes
The use of Producer has been added in such a way that it is optional and additive, but some operators that used to have unbounded queues are now bounded. This means that if a source Observable emits faster than the Observer can consume them, a MissingBackpressureException can be emitted via onError.
This semantic change can break existing code.
There are two ways of resolving this:
- Modify the source
Observableto useProducerand support backpressure. - Use newly added operators such as
onBackpressureBufferoronBackpressureDropto choose a strategy for the sourceObservableof how to behave when it emits more data than the consumingObserveris capable of handling. Use ofonBackpressureBuffereffectively returns it to having an unbounded buffer and behaving like version 0.19 or earlier.
Example:
sourceObservable.onBackpressureBuffer().subscribe(slowConsumer);Relation to Reactive Streams
Contributors to RxJava are involved in defining the Reactive Streams spec. RxJava 1.0 is trying to comply with the semantic rules but is not attempting to comply with the type signatures. It will however have a separate module that acts as a bridge between the RxJava Observable and the Reactive Stream types.
The reasons for this are:
- Rx has
Observer.onCompletedwhereas Reactive Streams hasonComplete. This is a massive breaking change to remove a "d". - The RxJava
Subscriptionis used also a "Closeable"/"Disposable" and it does not work well to make it now also be used forrequest(n), hence the separate typeProducerin RxJava. It was attempted to reuserx.Subscriptionbut it couldn't be done without massive breaking changes. - Reactive Streams uses
onSubscribe(Subscription s)whereas RxJava injects theSubscriptionas theSubscriber. Again, this change could not be done without major breaking changes. - RxJava 1.0 needs to be backwards compatible with the major Rx contracts established during the 0.x roadmap.
- Reactive Streams is not yet 1.0 and despite significant progress, it is a moving target.
Considering these things, the major semantics of request(long n) for backpressure are compatible and this will allow interop with a bridge between the interfaces. As the Reactive Streams spec matures, RxJava 2.0 may choose to fully adopt the types in the future while RxJava 1.x retains the current signatures.
How to Help
First, please test this release against your existing code to help us determine if we have broken anything.
Second, try to solve backpressure use cases and provide feedback on what works and what doesn't work.
Thank you!
Artifacts: Maven Central
0.19.6
Inclusion of rxjava-scalaz in release.
Artifacts: Maven Central