Streams are not a good replacement for futures

Streams (a.k.a. observables) are a generalization of futures and can do all the things that futures do and more. In fact, many stream libraries will claim that they are a replacement for futures. Unfortunately, this sometimes leads to people completely dumping futures from their code base in favor of streams.

While I have nothing against streams, I am of the opinion that using streams as futures is very inappropriate, for the reasons explained in this post.

Running Example

Let’s illustrate the problem with a simple example of using a stream where a future would have been more appropriate.

Imagine we are writing the client side of an application that has users. Let’s say we already have the username of a user and we simply want to fetch that user’s information from the server, which may include their email, real name, profile picture, etc. This is how it will look in code using a Stream:

def fetchUserInfo(username: Username): Stream[UserInfo] = {...}

Already, the first problem with this approach is evident: mis-communication. Fetching the UserInfo of a user by their username returns a “stream” of UserInfos? How many UserInfos are going to be returned? How many does it make sense to return anyway? Probably just one … right? Enjoy having this conversation in your head everytime you see this function’s signature. It’s annoying, and can occasionally even break the reader’s train of thought.

Had we declared the return type of this function as a Future[UserInfo] however, the intent of this function would have been perfectly clear.

Even more importantly, it would have made it impossible for the implementation of that function to accidentally yield more than one UserInfo, a mistake that is easier to make than one may imagine, specially if a loop is somehow involved. Which brings us to the next set of problems with this approach: the mistakes the implementation of fetchUserInfo can make.

The mistakes the implementation of fetchUserInfo can make

The implementation side of things, that is, the part of code that returns a stream, has to make sure to avoid a number of problems that are sure to confuse the part of code that consumes that stream (a.k.a. the call / usage side of things). Before we can go through these mistakes though, let’s quickly remind ourselves that most stream libraries are more or less based on the following Event ADT:

sealed trait Event[A]
case class Next[A](a: A) extends Event[A]
case class Complete[A]() extends Event[A]
case class Failure[A](error: Throwable) extends Event[A]

A stream, then, is (or consists of) zero or more Nexts containing As, followed by either a Complete or a Failure. Parts of code that want to consume a stream, at a very primitive level of abstraction, will have to “subscribe” to it and provide a callback / closure that will be invoked whenever a new Event is available.

With that out of the way, here are the mistakes that the implementation of a function that returns a stream (instead of a future) can make:

  • Send more than one Next. We are simulating a future, so it doesn’t make sense to send more than one Next, and the client won’t know how to handle that.
  • Send Complete before Next. Best case scenario: the stream library will throw an exception for you. Second best case scenario: you will handle this at the call site and throw an exception your-self. Worst case scenario: you have only defined a callback for the Next event, which means the callback may never be invoked.
  • Send Next, but then forget to send Complete. It’s important to send Complete after we have sent the Next, because, among other things, it signals to the stream library that there won’t be any new Events in this stream, so it can proceed to clear up its collection of listeners / closures. In other words, failing to send Complete may cause the listeners / closures of the stream to be never garbage collected, meaning you will end up with a memory leak. A naively implemented stream library may even require you to “unsubscribe” manually.

Call site torture

The tortures of consuming a future disguised as a stream mirror the problems of creating it (explained above). Basically the client side will have to handle all the weird cases and (since there is no way to recover from these errors) blow up the program in response, say, by throwing a runtime exception.

See what happened here? We threw away the compile time type safety guarantees of futures and now have to throw runtime errors to make up for it. This is just sad!

Recap: the drawbacks of using streams as futures

Before going any further let’s review the drawbacks of using streams as futures (because there are so many of them):

  • Mis-communication
  • More than one Next in the stream.
  • Sending Complete before the Next.
  • Not sending Complete after the Next.
  • Handling the above at call site mistakes.

The root cause of these problems is that streams are a much more general & expressive concept than futures, and with that generality and expressiveness comes a big can of worms. Worms about which we wouldn’t need to worry if we used a more constrained and specialized abstraction like futures wherever appropriate.

Interoperability between Futures and Streams

Obviously, streams have their uses, and in a sufficiently complex project, we’ll want to use them alongside futures.

We will also usually want to mix and match our use of futures and streams. For instance, we may want to treat a set of futures as a stream, processing each as it becomes available. On the other hand, maybe we don’t really need to represent every incoming post request to our server as a Stream[Byte] and instead prefer to treat some as Future[PostRequest]. We should be able to make these conversions easily.

This mixed usage of streams and futures means that we will require some sort of interoperability between them. The actual mechanism used for achieving this interoperability is outside the scope of this post. The point to be made here is that a stream library worth its salt should have great support for futures as well. Ideally, the stream library will support the most popular future library of the language / runtime.

Conclusion

Streams and Futures complement each-other and should ideally be interoperable.