diff --git a/README.md b/README.md index a007e283f..ca3442ee3 100644 --- a/README.md +++ b/README.md @@ -188,6 +188,7 @@ Dart provides the [StreamController](https://api.dart.dev/stable/dart-async/Stre - [BehaviorSubject](https://pub.dev/documentation/rxdart/latest/rx/BehaviorSubject-class.html) - A broadcast StreamController that caches the latest added value or error. When a new listener subscribes to the Stream, the latest value or error will be emitted to the listener. Furthermore, you can synchronously read the last emitted value. - [ReplaySubject](https://pub.dev/documentation/rxdart/latest/rx/ReplaySubject-class.html) - A broadcast StreamController that caches the added values. When a new listener subscribes to the Stream, the cached values will be emitted to the listener. +- [BufferedSubject](https://pub.dev/documentation/rxdart/latest/rx/BufferedSubject-class.html) - A broadcast StreamController that caches the added values while there are no listeners. When the first new listener subscribes to the Stream, the cached values will be emitted to the listener. ## Rx Observables vs Dart Streams diff --git a/lib/src/subjects/buffered_subject.dart b/lib/src/subjects/buffered_subject.dart new file mode 100644 index 000000000..bed138dd2 --- /dev/null +++ b/lib/src/subjects/buffered_subject.dart @@ -0,0 +1,184 @@ +import 'dart:async'; +import 'dart:collection'; + +import 'package:rxdart/rxdart.dart'; + +abstract class _BufferEntry { + void addToSink(StreamSink controller); +} + +class _BufferedEvent extends _BufferEntry { + final T value; + _BufferedEvent(this.value); + + @override + void addToSink(StreamSink controller) => controller.add(value); +} + +class _BufferedError extends _BufferEntry { + final Object error; + final StackTrace? stackTrace; + _BufferedError(this.error, this.stackTrace); + + @override + void addToSink(StreamSink controller) => controller.addError(error, stackTrace); +} + +/// A special StreamController that captures all of the items that are +/// added to the controller if it has no listener, and emits those as the +/// first items to the first new listener. +/// +/// This subject allows sending data, error and done events to the listener. +/// As items are added to the subject, the BufferedSubject will store them. +/// When the stream is listened to, those recorded items will be emitted to +/// the listener. After that, any new events will be appropriately sent to the +/// listeners. It is possible to cap the number of stored events by setting +/// a maxSize value. +/// +/// BufferedSubject is, by default, a broadcast (aka hot) controller, in order +/// to fulfill the Rx Subject contract. This means the Subject's `stream` can +/// be listened to multiple times. +/// +/// ### Example +/// +/// final subject = BufferedSubject(); +/// +/// subject.add(1); +/// subject.add(2); +/// subject.add(3); +/// +/// final completer = Completer(); +/// final StreamSubscription subscription = subject.stream.listen((event) { +/// print(event); +/// if (event == 3) { +/// completer.complete(); +/// } +/// }); // prints 1, 2, 3 +/// +/// await completer.future; +/// await subscription.cancel(); +/// +/// subject.add(4); +/// subject.stream.listen(print); // prints 4 +/// +/// ### Example with maxSize +/// +/// final subject = BufferedSubject(maxSize: 2); +/// +/// subject.add(1); +/// subject.add(2); +/// subject.add(3); +/// +/// final completer = Completer(); +/// final StreamSubscription subscription = subject.stream.listen((event) { +/// print(event); +/// if (event == 3) { +/// completer.complete(); +/// } +/// }); // prints 2, 3 +/// +/// await completer.future; +/// await subscription.cancel(); +/// +/// subject.add(4); +/// subject.stream.listen(print); // prints 4 +class BufferedSubject extends Subject { + bool _isAddingStreamItems = false; + final int? _maxSize; + final Queue<_BufferEntry> _buffer; + final StreamController _controller; + @override + void Function()? onListen; + + BufferedSubject._(this._controller, Stream stream, this._maxSize, this._buffer, this.onListen) + : super(_controller, stream) { + _controller.onListen = () { + for (final el in _buffer) { + el.addToSink(_controller); + } + _buffer.clear(); + onListen?.call(); + }; + } + + /// Constructs a [BufferedSubject], optionally pass handlers for + /// [onListen], [onCancel] and a flag to handle events [sync]. + /// + /// See also [StreamController.broadcast] + factory BufferedSubject({void Function()? onListen, void Function()? onCancel, bool sync = false, int? maxSize}) { + final Queue<_BufferEntry> buffer = Queue(); + final controller = StreamController.broadcast(onCancel: onCancel, sync: sync); + + return BufferedSubject._(controller, controller.stream, maxSize, buffer, onListen); + } + + @override + void add(T event) { + if (hasListener) { + super.add(event); + } else { + _verifyState(); + _buffer.add(_BufferedEvent(event)); + _truncateBuffer(); + } + } + + @override + void addError(Object error, [StackTrace? stackTrace]) { + if (hasListener) { + super.addError(error, stackTrace); + } else { + _verifyState(); + _buffer.add(_BufferedError(error, stackTrace)); + _truncateBuffer(); + } + } + + @override + Future addStream(Stream source, {bool? cancelOnError}) async { + if (hasListener) { + return super.addStream(source, cancelOnError: cancelOnError); + } else { + _verifyState(); + final completer = Completer(); + _isAddingStreamItems = true; + + source.listen( + (T event) { + _buffer.add(_BufferedEvent(event)); + _truncateBuffer(); + }, + cancelOnError: cancelOnError, + onDone: completer.complete, + onError: (Object e, StackTrace s) { + _buffer.add(_BufferedError(e, s)); + _truncateBuffer(); + if (cancelOnError == true) completer.complete(); + }, + ); + + return completer.future.then((_) { + _isAddingStreamItems = false; + }); + } + } + + void _truncateBuffer() { + final max = _maxSize; + while (max != null && _buffer.length > max) { + _buffer.removeFirst(); + } + } + + void _verifyState() { + if (_isAddingStreamItems) { + throw StateError('You cannot add items while items are being added from addStream'); + } + } + + @override + Future close() async { + if (!hasListener) _verifyState(); + return super.close(); + } +} diff --git a/lib/subjects.dart b/lib/subjects.dart index 77bc47250..1c105f2f3 100644 --- a/lib/subjects.dart +++ b/lib/subjects.dart @@ -1,6 +1,7 @@ library rx_subjects; export 'src/subjects/behavior_subject.dart'; +export 'src/subjects/buffered_subject.dart'; export 'src/subjects/publish_subject.dart'; export 'src/subjects/replay_subject.dart'; export 'src/subjects/subject.dart'; diff --git a/test/subject/buffered_subject_test.dart b/test/subject/buffered_subject_test.dart new file mode 100644 index 000000000..8b53b0f34 --- /dev/null +++ b/test/subject/buffered_subject_test.dart @@ -0,0 +1,394 @@ +import 'dart:async'; + +import 'package:rxdart/rxdart.dart'; +import 'package:test/test.dart'; + +import '../utils.dart'; + +void main() { + group('BufferedSubject', () { + test('replays the buffered items to first subscriber', () async { + final subject = BufferedSubject(); + + subject.add(1); + subject.add(2); + subject.add(3); + await expectLater(subject.stream, emitsInOrder(const [1, 2, 3])); + + subject.add(4); + subject.add(5); + await expectLater(subject.stream, emitsInOrder(const [4, 5])); + }); + + test('replays the buffered items to first subscriber, includes null', () async { + final subject = BufferedSubject(); + + subject.add(null); + subject.add(1); + subject.add(2); + subject.add(3); + subject.add(null); + + await expectLater( + subject.stream, + emitsInOrder(const [null, 1, 2, 3, null]), + ); + subject.add(4); + await expectLater( + subject.stream, + emitsInOrder(const [4]), + ); + }); + + test('replays the buffered errors to first subscriber', () async { + final subject = BufferedSubject(); + + subject.addError(Exception()); + subject.addError(Exception()); + subject.addError(Exception()); + + await expectLater(subject.stream, + emitsInOrder([emitsError(isException), emitsError(isException), emitsError(isException)])); + subject.addError(Exception()); + await expectLater(subject.stream, emitsInOrder([emitsError(isException)])); + }); + + test('replays the buffered items to first subscriber that directly subscribes to the Subject', () async { + final subject = BufferedSubject(); + + subject.add(1); + subject.add(2); + subject.add(3); + + await expectLater(subject, emitsInOrder(const [1, 2, 3])); + subject.add(4); + await expectLater(subject, emitsInOrder(const [4])); + }); + + test('replays the buffered items and errors to the first subscriber directly subscribing to the Subject', () async { + final subject = BufferedSubject(); + + subject.add(1); + subject.addError(Exception()); + subject.addError(Exception()); + subject.add(2); + + await expectLater(subject, emitsInOrder([1, emitsError(isException), emitsError(isException), 2])); + subject.addError(Exception()); + subject.add(3); + await expectLater(subject, emitsInOrder([emitsError(isException), 3])); + }); + + test('replays the most recently emitted items up to a max size', () async { + final subject = BufferedSubject(maxSize: 2); + + subject.add(1); // Should be dropped + subject.add(2); + subject.add(3); + + await expectLater(subject.stream, emitsInOrder(const [2, 3])); + subject.add(4); + await expectLater(subject.stream, emitsInOrder(const [4])); + }); + + test('emits done event to listeners when the subject is closed', () async { + final subject = BufferedSubject(); + + await expectLater(subject.isClosed, isFalse); + + subject.add(1); + scheduleMicrotask(() => subject.close()); + + await expectLater(subject.stream, emitsInOrder([1, emitsDone])); + await expectLater(subject.isClosed, isTrue); + }); + + test('emits error events to subscribers', () async { + final subject = BufferedSubject(); + + scheduleMicrotask(() => subject.addError(Exception())); + + await expectLater(subject.stream, emitsError(isException)); + }); + + test('replays the buffered items from addStream', () async { + final subject = BufferedSubject(); + + await subject.addStream(Stream.fromIterable(const [1, 2, 3])); + + await expectLater(subject.stream, emitsInOrder(const [1, 2, 3])); + subject.add(4); + await expectLater(subject.stream, emitsInOrder(const [4])); + }); + + test('allows items to be added once addStream is complete and there is no listener', () async { + final subject = BufferedSubject(); + + await subject.addStream(Stream.fromIterable(const [1, 2])); + subject.add(3); + + await expectLater(subject.stream, emitsInOrder(const [1, 2, 3])); + }); + + test('allows items to be added once addStream completes with an error and there is no listener', () async { + final subject = BufferedSubject(); + await subject.addStream(Stream.error(Exception()), cancelOnError: true).whenComplete(() => subject.add(1)); + await expectLater(subject.stream, emitsInOrder([emitsError(isException), emits(1)])); + }); + + test('does not allow events to be added when addStream is active and there is no listener', () async { + final subject = BufferedSubject(); + + // Purposely don't wait for the future to complete, then try to add items + // ignore: unawaited_futures + subject.addStream(Stream.fromIterable(const [1, 2, 3])); + + await expectLater(() => subject.add(1), throwsStateError); + }); + + test('does not allow errors to be added when addStream is active and there is no listener', () async { + final subject = BufferedSubject(); + + // Purposely don't wait for the future to complete, then try to add items + // ignore: unawaited_futures + subject.addStream(Stream.fromIterable(const [1, 2, 3])); + + await expectLater(() => subject.addError(Error()), throwsStateError); + }); + + test('does not allow subject to be closed when addStream is active and there is no listener', () async { + final subject = BufferedSubject(); + + // Purposely don't wait for the future to complete, then try to add items + // ignore: unawaited_futures + subject.addStream(Stream.fromIterable(const [1, 2, 3])); + + await expectLater(() => subject.close(), throwsStateError); + }); + + test('does not allow addStream to add items when previous addStream is active and there is no listener', () async { + final subject = BufferedSubject(); + + // Purposely don't wait for the future to complete, then try to add items + // ignore: unawaited_futures + subject.addStream(Stream.fromIterable(const [1, 2, 3])); + + await expectLater(() => subject.addStream(Stream.fromIterable(const [1])), throwsStateError); + }); + + test('allows items to be added once addStream is complete and there are listeners', () async { + final subject = BufferedSubject(); + final result = expectLater(subject.stream, emitsInOrder(const [1, 2, 3])); + + await subject.addStream(Stream.fromIterable(const [1, 2])); + subject.add(3); + + await result; + }); + + test('allows items to be added once addStream completes with an error and there are listeners', () async { + final subject = BufferedSubject(); + final result = expectLater(subject.stream, emitsInOrder([emitsError(isException), emits(1)])); + await subject.addStream(Stream.error(Exception()), cancelOnError: true).whenComplete(() => subject.add(1)); + await result; + }); + + test('does not allow events to be added when addStream is active and there are listeners', () async { + final subject = BufferedSubject(); + final sub = subject.listen((_) {}); + + // Purposely don't wait for the future to complete, then try to add items + unawaited(subject.addStream(Stream.fromIterable(const [1, 2, 3]))); + + await expectLater(() => subject.add(1), throwsStateError); + await sub.cancel(); + }); + + test('does not allow errors to be added when addStream is active and there are listeners', () async { + final subject = BufferedSubject(); + final sub = subject.listen((_) {}); + + // Purposely don't wait for the future to complete, then try to add items + unawaited(subject.addStream(Stream.fromIterable(const [1, 2, 3]))); + + await expectLater(() => subject.addError(Error()), throwsStateError); + await sub.cancel(); + }); + + test('does not allow subject to be closed when addStream is active and there are listeners', () async { + final subject = BufferedSubject(); + final sub = subject.listen((_) {}); + + // Purposely don't wait for the future to complete, then try to add items + unawaited(subject.addStream(Stream.fromIterable(const [1, 2, 3]))); + + await expectLater(() => subject.close(), throwsStateError); + await sub.cancel(); + }); + + test('does not allow addStream to add items when previous addStream is active and there are listeners', () async { + final subject = BufferedSubject(); + final sub = subject.listen((_) {}); + + // Purposely don't wait for the future to complete, then try to add items + // ignore: unawaited_futures + subject.addStream(Stream.fromIterable(const [1, 2, 3])); + + await expectLater(() => subject.addStream(Stream.fromIterable(const [1])), throwsStateError); + await sub.cancel(); + }); + + test('returns onListen callback set in constructor', () async { + void testOnListen() {} + + final subject = BufferedSubject(onListen: testOnListen); + + await expectLater(subject.onListen, testOnListen); + }); + + test('sets onListen callback', () async { + void testOnListen() {} + + final subject = BufferedSubject(); + + await expectLater(subject.onListen, isNull); + + subject.onListen = testOnListen; + + await expectLater(subject.onListen, testOnListen); + }); + + test('returns onCancel callback set in constructor', () async { + Future onCancel() => Future.value(null); + + final subject = BufferedSubject(onCancel: onCancel); + + await expectLater(subject.onCancel, onCancel); + }); + + test('sets onCancel callback', () async { + void testOnCancel() {} + + final subject = BufferedSubject(); + + await expectLater(subject.onCancel, isNull); + + subject.onCancel = testOnCancel; + + await expectLater(subject.onCancel, testOnCancel); + }); + + test('reports if a listener is present', () async { + final subject = BufferedSubject(); + + await expectLater(subject.hasListener, isFalse); + + subject.stream.listen(null); + + await expectLater(subject.hasListener, isTrue); + }); + + test('onPause unsupported', () { + final subject = BufferedSubject(); + + expect(subject.isPaused, isFalse); + expect(() => subject.onPause, throwsUnsupportedError); + expect(() => subject.onPause = () {}, throwsUnsupportedError); + }); + + test('onResume unsupported', () { + final subject = BufferedSubject(); + + expect(() => subject.onResume, throwsUnsupportedError); + expect(() => subject.onResume = () {}, throwsUnsupportedError); + }); + + test('returns controller sink', () async { + final subject = BufferedSubject(); + + await expectLater(subject.sink, const TypeMatcher>()); + }); + + test('correctly closes done Future', () async { + final subject = BufferedSubject(); + + scheduleMicrotask(subject.close); + + await expectLater(subject.done, completes); + }); + + test('can be listened to multiple times', () async { + final subject = BufferedSubject(); + final stream = subject.stream; + + subject.add(1); + subject.add(2); + + await expectLater(stream, emitsInOrder(const [1, 2])); + subject.add(3); + await expectLater(stream, emitsInOrder(const [3])); + }); + + test('always returns the same stream', () async { + final subject = BufferedSubject(); + + await expectLater(subject.stream, equals(subject.stream)); + }); + + test('is always treated as a broadcast Stream', () async { + final subject = BufferedSubject(); + final stream = subject.asyncMap((event) => Future.value(event)); + + expect(subject.isBroadcast, isTrue); + expect(stream.isBroadcast, isTrue); + }); + + test('rxdart issue/419: sync behavior', () async { + final subject = BufferedSubject(sync: true)..add(1); + final mappedStream = subject.map((event) => event).shareValue(); + + mappedStream.listen(null); + + expect(mappedStream.value, equals(1)); + + await subject.close(); + }); + + test('rxdart issue/419: sync throughput', () async { + final subject = BufferedSubject(sync: true)..add(1); + final mappedStream = subject.map((event) => event).shareValue(); + + mappedStream.listen(null); + + subject.add(2); + + expect(mappedStream.value, equals(2)); + + await subject.close(); + }); + + test('rxdart issue/419: async behavior', () async { + final subject = BufferedSubject()..add(1); + final mappedStream = subject.map((event) => event).shareValue(); + + mappedStream.listen(null, onDone: () => expect(mappedStream.value, equals(1))); + + expect(mappedStream.valueOrNull, isNull); + + await subject.close(); + }); + + test('rxdart issue/419: async throughput', () async { + final subject = BufferedSubject()..add(1); + final mappedStream = subject.map((event) => event).shareValue(); + + mappedStream.listen(null, onDone: () => expect(mappedStream.value, equals(2))); + + subject.add(2); + + expect(mappedStream.valueOrNull, isNull); + + await subject.close(); + }); + }); +}