Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tokio-stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ pub use stream_ext::{collect::FromStream, StreamExt};
/// Adapters for [`Stream`]s created by methods in [`StreamExt`].
pub mod adapters {
pub use crate::stream_ext::{
Chain, Filter, FilterMap, Fuse, Map, MapWhile, Merge, Peekable, Skip, SkipWhile, Take,
TakeWhile, Then,
Chain, Filter, FilterMap, FilterMapAsync, Fuse, Map, MapWhile, Merge, Peekable, Skip,
SkipWhile, Take, TakeWhile, Then,
};
cfg_time! {
pub use crate::stream_ext::{ChunksTimeout, Timeout, TimeoutRepeating};
Expand Down
53 changes: 53 additions & 0 deletions tokio-stream/src/stream_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use collect::{Collect, FromStream};
mod filter;
pub use filter::Filter;

mod filter_map_async;
pub use filter_map_async::FilterMapAsync;

mod filter_map;
pub use filter_map::FilterMap;

Expand Down Expand Up @@ -479,6 +482,56 @@ pub trait StreamExt: Stream {
FilterMap::new(self, f)
}

/// Filters the values produced by this stream asynchronously while
/// simultaneously mapping them to a different type according to the
/// provided async closure.
///
/// The provided closure is executed over all elements of this stream as
/// they are made available, and the returned future is executed. Only one
/// future is executed at the time. If the returned future resolves to
/// [`Some(item)`](Some) then the stream will yield the value `item`, but if
/// it resolves to [`None`], then the value will be skipped.
///
/// Note that this function consumes the stream passed into it and returns a
/// wrapped version of it, similar to [`Iterator::filter_map`] method in the
/// standard library.
///
/// Be aware that if the future is not `Unpin`, then neither is the `Stream`
/// returned by this method. To handle this, you can use `tokio::pin!` as in
/// the example below or put the stream in a `Box` with `Box::pin(stream)`.
///
/// # Examples
/// ```
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() {
/// use std::time::Duration;
/// use tokio::time;
/// use tokio_stream::{self as stream, StreamExt};
///
/// let stream = stream::iter(0..=7);
/// let odds = stream.filter_map_async(async |x| {
/// time::sleep(Duration::from_millis(100)).await;
/// if x % 2 == 0 { Some(x + 1) } else { None }
/// });
///
/// tokio::pin!(odds);
///
/// assert_eq!(Some(1), odds.next().await);
/// assert_eq!(Some(3), odds.next().await);
/// assert_eq!(Some(5), odds.next().await);
/// assert_eq!(Some(7), odds.next().await);
/// assert_eq!(None, odds.next().await);
/// # }
/// ```
fn filter_map_async<T, F, Fut>(self, f: F) -> FilterMapAsync<Self, Fut, F>
where
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = Option<T>>,
Self: Sized,
{
FilterMapAsync::new(self, f)
}

/// Creates a stream which ends after the first `None`.
///
/// After a stream returns `None`, behavior is undefined. Future calls to
Expand Down
80 changes: 80 additions & 0 deletions tokio-stream/src/stream_ext/filter_map_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use crate::Stream;

use core::fmt;
use core::future::Future;
use core::pin::Pin;
use core::task::{ready, Context, Poll};
use pin_project_lite::pin_project;

pin_project! {
/// Stream for the [`filter_map_async`](super::StreamExt::filter_map_async) method.
#[must_use = "streams do nothing unless polled"]
pub struct FilterMapAsync<St, Fut, F> {
#[pin]
stream: St,
#[pin]
future: Option<Fut>,
f: F,
}
}

impl<St, Fut, F> fmt::Debug for FilterMapAsync<St, Fut, F>
where
St: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FilterMapAsync")
.field("stream", &self.stream)
.finish()
}
}

impl<St, Fut, F> FilterMapAsync<St, Fut, F> {
pub(super) fn new(stream: St, f: F) -> Self {
FilterMapAsync {
stream,
future: None,
f,
}
}
}

impl<T, St, F, Fut> Stream for FilterMapAsync<St, Fut, F>
where
St: Stream,
Fut: Future<Output = Option<T>>,
F: FnMut(St::Item) -> Fut,
{
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
let mut me = self.project();

loop {
if let Some(future) = me.future.as_mut().as_pin_mut() {
let item = ready!(future.poll(cx));
me.future.set(None);
if let Some(item) = item {
return Poll::Ready(Some(item));
Comment on lines +54 to +58
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So while we are polling the future, we do not poll the stream. It implies that something like stream.buffer_unordered().filter_map_async(...) is really dangerous because ongoing futures inside of the buffer_unordered() just pause out of nowhere.

On the other hand, this is a pre-existing problem with stream.then() too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the alternative be keeping a buffer of items, so the entire stream can be polled by awaiting on one .next()?

}
}

match ready!(me.stream.as_mut().poll_next(cx)) {
Some(item) => {
me.future.set(Some((me.f)(item)));
}
None => return Poll::Ready(None),
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let future_len = usize::from(self.future.is_some());
let upper = self
.stream
.size_hint()
.1
.and_then(|upper| upper.checked_add(future_len));
(0, upper)
}
}
117 changes: 117 additions & 0 deletions tokio-stream/tests/stream_filter_map_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
use futures::Stream;
use tokio::sync::Notify;
use tokio_stream::{self as stream, StreamExt};
use tokio_test::{assert_pending, assert_ready_eq, task};

mod support {
pub(crate) mod mpsc;
}

use support::mpsc;

#[tokio::test]
async fn basic() {
let (tx, rx) = mpsc::unbounded_channel_stream();

let mut st =
task::spawn(rx.filter_map_async(async |x| if x % 2 == 0 { Some(x + 1) } else { None }));
assert_pending!(st.poll_next());

tx.send(1).unwrap();
assert!(st.is_woken());
assert_pending!(st.poll_next());

tx.send(2).unwrap();
assert!(st.is_woken());
assert_ready_eq!(st.poll_next(), Some(3));

assert_pending!(st.poll_next());

tx.send(3).unwrap();
assert!(st.is_woken());
assert_pending!(st.poll_next());

drop(tx);
assert!(st.is_woken());
assert_ready_eq!(st.poll_next(), None);
}

#[tokio::test]
async fn notify_unbounded() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let notify = Notify::new();

let mut st = task::spawn(rx.filter_map_async(async |x| {
notify.notified().await;
if x % 2 == 0 {
Some(x + 1)
} else {
None
}
}));
assert_pending!(st.poll_next());

tx.send(0).unwrap();
assert!(st.is_woken());
assert_pending!(st.poll_next());

notify.notify_one();
assert!(st.is_woken());
assert_ready_eq!(st.poll_next(), Some(1));

tx.send(1).unwrap();
assert!(!st.is_woken());
assert_pending!(st.poll_next());

notify.notify_one();
assert!(st.is_woken());
assert_pending!(st.poll_next());

tx.send(2).unwrap();
assert!(st.is_woken());
assert_pending!(st.poll_next());

notify.notify_one();
assert!(st.is_woken());
assert_ready_eq!(st.poll_next(), Some(3));

drop(tx);
assert!(!st.is_woken());
assert_ready_eq!(st.poll_next(), None);
}

#[tokio::test]
async fn notify_bounded() {
let notify = Notify::new();
let mut st = task::spawn(stream::iter(0..3).filter_map_async(async |x| {
notify.notified().await;
if x % 2 == 0 {
Some(x + 1)
} else {
None
}
}));
assert_eq!(st.size_hint(), (0, Some(3)));
assert_pending!(st.poll_next());

notify.notify_one();
assert!(st.is_woken());
assert_eq!(st.size_hint(), (0, Some(3)));
assert_ready_eq!(st.poll_next(), Some(1));
assert_eq!(st.size_hint(), (0, Some(2)));

notify.notify_one();
assert!(!st.is_woken());
assert_eq!(st.size_hint(), (0, Some(2)));
assert_pending!(st.poll_next());
assert_eq!(st.size_hint(), (0, Some(1)));

notify.notify_one();
assert!(st.is_woken());
assert_eq!(st.size_hint(), (0, Some(1)));
assert_ready_eq!(st.poll_next(), Some(3));
assert_eq!(st.size_hint(), (0, Some(0)));

assert!(!st.is_woken());
assert_ready_eq!(st.poll_next(), None);
}
Loading