The async equivalent to iterators in Rust are streams, which are exactly the same as AsyncIterators in JS or C#; the consumer pulls on the stream but the stream is allowed to defer responding, so both sides must be able to pause. Simple stuff.

In synchronous rust, Iterator::map takes an FnMut, a function which can only be called if you can prove that it’s not already running. This is good because it’s pretty common to want to either use mutable context for a transformation or equivalently perform a sequence of effectful operations and then collect their results into a datastructure, and both of these are obviously expressed as sequence.map(|item| /* some mutation */).collect() which as a bonus propagates size hints. The Orchid codebase is FULL of this pattern.

The async equivalent however has to take a function that returns some type that implements Future because that’s how you statically type an asynchronous function, you parameterize on the state machine the compiler will eventually generate for its paused data. This is again perfectly normal, C++ coroutines do the same as I’m pretty sure every other language that supports any kind of stack-allocated coroutine has to. The problem emerges from lifetimes, because in order for that Future to hold onto a mutable reference, the async equivalent of map (which happens to be called StreamExt::then for reference) has to not only guarantee that the callback will not be running when its next called, but that its return value (the Future instance) will not exist (either because it’s finished or because it’s been freed) by the time the function is called again.

The type of the callback then should be some function which for any lifetime 'a returns some type that is valid for the same lifetime 'a. The type of the return value is parametric!

Since structs and functions can only be parametric on concrete types, not generics, a callback whose return type has a different contract depending on how you called the function is illegal on general. So if you want to access mutable data in an async stream, you have to make an ad-hoc Mutex<&mut T> right there on the stack which the closure and its return value can capture by shared reference and then immediately lock for its entire runtime. Streams are lazy and a new value will not be pulled until the current one is finished so this mutex can never ever be contested, but there is no way at all to explain this to the type system.

There’s an escape hatch, for<'a> Fn(&'a [u8]) -> &'a u8 is a valid type, but this for<'a> syntax is a late addition to the borrow checker which isn’t integrated in the type system at all, so for<'a> Fn(T) -> Box<dyn Future<Output = U> + 'a> is valid but only because the higher-kinded lifetime parametrism has been localized to a single type constraint. Streams don’t do this because it requires a heap allocation and subsequent virtual function calls to interact with the Future, and an uncontested mutex lock-release cycle is vastly faster.

Actually, this kind of contract isn’t inherently impossible to represent in Rust because it has a weird bastard type system where generic parameters to traits (types that pick an implementation such as u32 in impl From<u32> for u64 {}) must be concrete types, but associated types (types that are picked by the implementation such as the str in impl Deref for String { type Target = str }) may themselves be generic. So it’s imaginable for callbacks to be mapped into a bound for a trait with an associated return type that’s generic on a lifetime. Incidentally, this is exactly what the async_closure proposal does.

As a reminder, in ref and mut member functions '_ always refers to the self argument’s lifetime, so The lifetime bound on async_call_mut basically declares that once it’s called, its output is valid for as long as the function is in scope, and the function may not be called again until the return value is freed because for as long as it’s live, so is the mutable borrow. Perfect!

That’s all nice and good but this is unstable and thus a non-solution for crates. So what to do? Well, you can always use a mutex as I mentioned earlier, or the boxed future if you prefer elegance over performance. This is an entirely valid position, I often take it as well. As engineers we often get lost in optimizations and long-term strategy and forget to enjoy our craft or make our products pleasant to work with. The product is certainly important, but can you place a definitive price tag on the willingness of an engineer to touch it, multiplied by engineering hours? How does that compare to the nanosecs of delay caused by an unnecessary allocation and victual call? The Ruby crowd says, very favourably. I think with Rust’s performance, we have plenty of headroom for aesthetics.

It’s also worth considering that eventually async_closure will be standardized and when that happens, the aesthetic choice is trivially convertible, whereas removing the unnecessary mutex may have deeper design implications.

Anyway, philosophy aside, for the specific case of streams we can also use async_stream::stream! with a for loop in a fashion a lot like the below example.

stream! {
  for item in item_seq {
    yield item.process(&mut mutableCtx).await
  }
}
.collect::<Vec<_>>()
.await

This macro is crafted very well; the resulting Stream implementor uses a thread local but no heap allocation or virtual calls. I think it is a solid solution, and just like async/await itself, a fine candidate for eventual inclusion in the language itself.