SelectFrom

A vocal community of enthusiastic developers. We speak all things data, code and engineering.

Follow publication

Throttling Made Easy — Back Pressure in Akka Streams

Jay Reddy
SelectFrom
Published in
6 min readDec 27, 2020

--

Photo by Moritz Mentges on Unsplash

Big data is the buzz word all over lately, but Fast data is also gaining traction nowadays. And if you are into any data streaming, you know things become tedious when not done right. Moreover, it may result in data leaks and OutOfMemory exceptions!

If you are building a service or product today, users are willing to pay high bucks to those who provide content with milliseconds of latency.

Akka Streams

Akka Streams is a streaming module that is part of the Akka toolkit. It is designed to work with data streams to achieve concurrency. It does so by leveraging Akka toolkit powers without defining actor behaviors and methods explicitly. It also helps conceal the abstraction by ignoring what goes under the hood and helping you focus on the logic needed for business.

Akka Streams is capable of handling almost all data sources and types present out there with its open source module, Alpakka .

When you think of a stream, you picture a starting point from where the stream begins and an ending point where the stream ends. Iwant you to think about Akka Streams in the same way.

  • The starting point of the stream is called Publisher
  • The ending point of the stream is called Subscriber

Akka Streams uses operators as below to deal with the ingestion, processing, transformation, and storage of data.

  • Source — An operator with exactly one output emitting data elements whenever downstream operators are ready to receive them
  • Flow — An operator that has exactly one input and output, which connects its upstream and downstream by transforming the data elements flowing through it
  • Sink — An operator with exactly one input, requesting and accepting data elements

Let’s write some code and break it down into what’s happening.

import akka.stream.scaladsl.{Source, Flow, sink}val source = Source(1 to 1000)val multiply = Flow[Int].map(x => x * 10)val sink = Sink.foreach[Int](println)

Apply method of Source and Flow is as follows. Sink does not have an apply method.

//Sourcedef apply[T](iterable: Iterable[T]): Source[T, NotUsed]
//Flowdef apply[T]: Flow[T, T, NotUsed]

Here, our source emits values from 1 to 1000 in order. They are transformed at the Flow operator by a map function and printed to the console/terminal at the Sink. This is quiet a small computation and will not result in latency issues or back pressure.

We can tie the operations with Akka’s rich library.

source.via(multiply).to(sink).run()

You have create a graph here, which is runnable.

When a Flow has both ends “attached” to a Source and Sink respectively, it is ready to be run(). It is called a RunnableGraph.

How is this happening without you configuring and telling the program how to run the graph?

Even after constructing the RunnableGraph by connecting all the Source, Sink and different operators, no data will flow through it.

This is where Materialization comes into action!

Stream Materialization

When constructing flows and graphs in Akka Streams, think of it as preparing a blueprint/execution plan. Stream Materialization is the process of taking a stream description and allocating the resources it needs to run. This means starting up Actors that power the processing, and much more, depending on what the stream needs.

After running (Materializing) the RunnableGraph we get back the Materialized value of a specified type. Every stream operator can produce a materialized value, and it is your responsibility to combine them to a new type. Akka has .toMat to indicate that we want to transform the materialized value of the Source and Sink, and you have convenient functions(Keep.right/left/both/none) to say that we are only interested in the materialized value of the Sink.

import akka.stream.scaladsl.Keep//keep right
source.via(multiply).toMat(sink)(Keep.right).run()
//keep left
source.via(multiply).toMat(sink)(Keep.left).run()
//keep both
source.via(multiply).toMat(sink)(Keep.both).run()
//keep none
source.via(multiply).toMat(sink)(Keep.none).run()

You can use .viaMat to apply this on Flow.

//keep right
source.viaMat(multiply)(Keep.right).to(sink).run()
//keep left
source.viaMat(multiply)(Keep.left).to(sink).run()
//keep both
source.viaMat(multiply)(Keep.both).to(sink).run()
//keep none
source.viaMat(multiply)(Keep.none).to(sink).run()

By default, Akka Streams keeps/returns left value(Source) of Materialized values.

If you are using Akka Classic Actors, you need to manually initialize the Materializer and make it an implicit.

implicit val actorMaterializer = ActorMaterializer()

However, to Akka Typed, this is deprecated as it is implicitly imported into scope.

Note: Method applied in object ActorMaterializer is deprecated (since 2.6.0). Use the system-wide materializer with stream attributes or configuration settings to change defaults.

Let’s talk about the magic that happens under the hood now.

Code Snippet

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
object backPressure extends App { implicit val system: ActorSystem[Nothing] = ActorSystem(Behaviors.empty,"backPressure-example") val source = Source(1 to 10)
val multiply = Flow[Int].map(x => x* 10)
val sink = Sink.foreach[Int] (println)
source.via(multiply).to(sink).run()
}

Back Pressure

Lets say the Source is producing data fast and Sink is consuming the data slowly: with a 1 second delay.

val source = Source(1 to 1000)
val multiply = Flow[Int].map(x => x* 10)
val sink = Sink.foreach[Int] {
Thread.sleep(1000)
(println)
}

Here, when the Source is sending the data to the Sink via Flow, the Sink sends back the signal to the Source to slow down. Flow will attempt to slow down the data ingestion. If it fails to do so, it signals the source to slow down. All this happens under the hood.

Stream max input buffer size is 16.

Sink will buffer incoming data until the buffer size is exhausted and send the signal to the upstream to slow down.

Here, all the computation is happening on a single Actor or thread.
We have a way to break this down and run the operations on different threads/Actors.

async

It is as simple as adding the term explicitly after the end of each operator using the .async method. Being run asynchronously means that an operator, after handing out an element to its downstream consumer, is able to immediately process the next message.

source.via(multiply).async
.to(sink).async
.run()

The order is always guaranteed but not sequential due to asynchronous nature.

For example:

Source(1 to 3)
.map { i =>
println(s"A: $i"); i
}
.async
.map { i =>
println(s"B: $i"); i
}
.async
.map { i =>
println(s"C: $i"); i
}
.async
.runWith(Sink.ignore)

The output is:

A: 1A: 2B: 1A: 3B: 2C: 1B: 3C: 2C: 3

A: 1, A: 2, A: 3 will always be printed in order but other terms like B: 1 might be outputted in between due to the asynchronous nature.

Note that the order is not A:1, B:1, C:1, A:2, B:2, C:2. That would correspond to the normal fused synchronous execution model of flows where an element completely passes through the processing pipeline before the next element enters the flow. The next element is processed by an asynchronous operator as soon as it has emitted the previous one.

Akka Streams uses a windowed, batching back-pressure strategy internally. It is windowed because as opposed to a Stop-And-Wait protocol, multiple elements might be “in-flight” concurrently with requests for elements. It is also batching because a new element is not immediately requested once an element has been drained from the window-buffer. Multiple elements are requested after multiple elements have been drained.

This batching strategy reduces the communication cost of propagating the back-pressure signal through the asynchronous boundary.

Conclusion

Akka Streams are a proficient way to deal with data streams and are accountable for handling complexity to help you focus on the business to achieve targets. Akka Streams comes with a built-in library and operators, which are built on top of Akka Actors, providing asynchronous and concurrent results and handling most of the jobs under the hood.

Thanks for reading and the support.🍻
Happy Holidays!

The world’s fastest cloud data warehouse:

When designing analytics experiences which are consumed by customers in production, even the smallest delays in query response times become critical. Learn how to achieve sub-second performance over TBs of data with Firebolt.

--

--

Published in SelectFrom

A vocal community of enthusiastic developers. We speak all things data, code and engineering.

Written by Jay Reddy

I write about Data, AI, Startup, and Entrepreneurship. Life without challenges and risks is mediocre. databracket.substack.com youtube.com/@data_bracket

No responses yet

Write a response