-
Notifications
You must be signed in to change notification settings - Fork 326
Description
Summary
CallAll::unordered() is documented as only being valid before the stream is polled, and the docs say it panics if poll was called.
However, the current implementation only checks:
assert!(self.queue.is_empty() && !self.eof);That guard does not account for curr_req, which is the slot used by poll_next to hold a request that has already been pulled from the input stream but not yet dispatched to the inner service.
As a result, if an ordered call_all stream is partially polled, consumes one request from the input stream, and then hits Poll::Pending in svc.poll_ready, calling unordered() succeeds and drops that buffered request.
Reproduction
This can be reproduced with the public API:
use futures::{stream, Stream};
use futures::task::{noop_waker_ref, Context};
use std::{
cell::RefCell,
convert::Infallible,
pin::Pin,
rc::Rc,
task::Poll,
};
use tower::{Service, ServiceExt};
#[derive(Clone, Default)]
struct PendingReadySvc {
seen: Rc<RefCell<Vec<u8>>>,
}
impl Service<u8> for PendingReadySvc {
type Response = u8;
type Error = Infallible;
type Future = std::future::Ready<Result<u8, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Pending
}
fn call(&mut self, req: u8) -> Self::Future {
self.seen.borrow_mut().push(req);
std::future::ready(Ok(req))
}
}
#[test]
fn unordered_after_partial_poll_drops_buffered_request() {
let svc = PendingReadySvc::default();
let seen = svc.seen.clone();
let reqs = stream::iter([42u8]);
let mut ordered = svc.call_all(reqs);
let waker = noop_waker_ref();
let mut cx = Context::from_waker(waker);
// First poll consumes `42` from the request stream, stores it in `curr_req`,
// then returns `Pending` because `poll_ready` is pending.
assert!(matches!(Pin::new(&mut ordered).poll_next(&mut cx), Poll::Pending));
// This is documented to panic after polling has started, but it does not.
let mut unordered = ordered.unordered();
// The converted stream is now exhausted, and the request was never dispatched.
assert!(matches!(Pin::new(&mut unordered).poll_next(&mut cx), Poll::Ready(None)));
assert!(seen.borrow().is_empty(), "request was consumed from the input stream but never forwarded");
}To explain the test case:
If the following sequence happens:
poll_next()pulls a request from the input stream,- stores it in
curr_req, svc.poll_ready()returnsPoll::Pending,- the caller then invokes
unordered(),
then:
unordered()does not panic,- the buffered request in
curr_reqis dropped, - the request is never forwarded to the service,
- the unordered stream can terminate cleanly with
None.
This causes silent request loss. A caller can begin polling an ordered call_all stream, hit backpressure, convert to unordered, and lose one already-consumed request without any error.
Why this happens
poll_next() can populate curr_req before the request is dispatched:
if this.curr_req.is_none() {
*this.curr_req = match ready!(this.stream.as_mut().poll_next(cx)) {
Some(next_req) => Some(next_req),
None => {
*this.eof = true;
continue;
}
};
}
if let Err(e) = ready!(svc.poll_ready(cx)) {
*this.eof = true;
return Poll::Ready(Some(Err(e)));
}
this.queue.push(svc.call(this.curr_req.take().unwrap()));But unordered() does not check or preserve curr_req.