Skip to content

stream: add StreamExt::filter_map_async#7971

Open
figsoda wants to merge 6 commits intotokio-rs:masterfrom
figsoda:async-closure
Open

stream: add StreamExt::filter_map_async#7971
figsoda wants to merge 6 commits intotokio-rs:masterfrom
figsoda:async-closure

Conversation

@figsoda
Copy link
Contributor

@figsoda figsoda commented Mar 13, 2026

Motivation

This is like the async closure version of StreamExt::filter_map, similar to StreamExt::then for StreamExt::map.

Solution

The code is largely adapted from StreamExt::then and StreamExt::filter_map. I'm not super sure about the name filter_map_async, as this will probably be what other async closure variants will be named after.

The only test is the doctest, since I couldn't find any standalone tests for the surrounding functions. I have tested it for my use case, but it might still be helpful to have more tests in tree.

I also wanted to implement StreamExt::filter_async, but that seemed to require AsyncFnMut due to lifetime issues, which hasn't been in stable Rust for 6 months yet which is the MSRV policy. So I will hold on to that until then.

@figsoda figsoda changed the title stream: add StreamExt::filter_map_async stream: add StreamExt::filter_map_async Mar 13, 2026
Copy link
Member

@ADD-SP ADD-SP left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wanted to implement StreamExt::filter_async, but that seemed to require AsyncFnMut due to lifetime issues, which hasn't been in stable Rust for 6 months yet which is the MSRV policy. So I will hold on to that until then.

Yeah, the StreamExt::filter_async is tricky due to lifetime issue and it might be less useful before resolving this lifetime issue. I think filter_map_async is useful, but I prefer to hold off on implementing of filter_async as it is less useful at this stage.

Copy link
Member

@ADD-SP ADD-SP left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only test is the doctest, since I couldn't find any standalone tests for the surrounding functions. I have tested it for my use case, but it might still be helpful to have more tests in tree.

Is tokio-stream/tests a good place for a new test file?

@figsoda
Copy link
Contributor Author

figsoda commented Mar 16, 2026

added some tests and fixed a few oversights

@mattiapitossi mattiapitossi added A-tokio-stream Area: The tokio-stream crate M-stream Module: tokio/stream labels Mar 17, 2026
Comment on lines +54 to +58
if let Some(future) = me.future.as_mut().as_pin_mut() {
let item = ready!(future.poll(cx));
me.future.set(None);
if let Some(item) = item {
return Poll::Ready(Some(item));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So while we are polling the future, we do not poll the stream. It implies that something like stream.buffer_unordered().filter_map_async(...) is really dangerous because ongoing futures inside of the buffer_unordered() just pause out of nowhere.

On the other hand, this is a pre-existing problem with stream.then() too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the alternative be keeping a buffer of items, so the entire stream can be polled by awaiting on one .next()?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-tokio-stream Area: The tokio-stream crate M-stream Module: tokio/stream

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants