The great outrage about a trivial matter
Our story has begun with a "simple" need to join data. It seemed easy, as all of us have done joins in databases before, but there was a catch.
The catch was related to the type and amount of data we have. Here, in Adform, the amount of data is large and we have huge throughputs too. 500K events per second is quite common and most of the data is short-lived: we just need to process it, generate some reports and move along. Our goal was to join vast amount of data fast and almost in real time.
Consider the traditional database join:
- Data needs to be collected;
- Then dumped to some database;
- The join should be performed;
- And finally, joined results should be passed to the further processing chain.
In our case, this traditional approach was not good enough. Data collection and dumping to a database, as well as the join, takes time. So, this approach would lead to reports showing outdated data, i.e. minutes or even hours old rather than seconds. In addition to this, lots of storage is required because it's very inefficient to collect, dump and join small amounts of data in a database. What we really needed was to join two streams of data as close to the real time as possible. With the streams, the collecting and dumping phases are eliminated; and as we are processing data as it comes, the time window of join can be small, thus small amount of storage is needed for the join itself.
Working on a Better Join Solution
For the data source we have chosen Apache Kafka. It is a distributed message broker which is fast and enables streaming data one tuple at the time. You can find plenty of introductions and technical presentations elsewhere.
One important thing to note is that Kafka distributes data to partitions and guarantees the order only at the partition level. This is important to our join implementation. For the join implementation itself, we have chosen a distributed real-time computation system Apache Storm. It is stream-based and has great support for consuming data from Kafka. You can check our presentation on Storm from the Big Data Conference 2015 and there are tons of other resources here.
The first obvious idea was to use Storm Trident join implementation.
Trident is an API on top of the Storm. The core data model in Trident is the "Stream", processed as a series of batches, thus Trident join implementation works only on the same batch level. This means that data can be joined only if it's in the same batch. This didn’t suit our needs because we would have get many un-joined items which would land on other batches. As our requirement was to join all the data in a defined time window, and in Trident we wouldn’t have had the control over items which land to a particular batch, we needed something else. As a result, we had to implement our own reliable join solution with the following technical requirements:
- Guarantee join items within a specified time window;
- Handling complex failure scenarios;
- Handling faster or slower consumption from any of the join sides;
- Ensuring that no data is lost if one side of the join is failing or doesn't produce data;
- Doing it fast, i.e. having the smallest join window possible;
- At-least-once guarantee for the join result.
Having these requirements, we have implemented the first specific join case. We had two Kafka topics which needed to be joined by the same unique ID. One stream had 90% of matches to the other. We had to join one item to at most one from the other stream (1 to 0...1). Let’s call the mandatory join side as “left” and the optional one as “right”. Theleft side item was always earlier than the right one.
Every record had a creation time field meant for the time window management. The typical creation time difference between matching items was less than one second. So our time window for the join was very small, therefore we decided to implement the join entirely in memory.
How We Did It
We came up with the following Storm topology design:
Let’s review all the components in the diagram:
- Left side spout – streams the tuples from the left side.
- Right side spout – streams the tuples from the right side.
- Sync bolt ensures that only tuples created earlier than the current sync time are passed through.
- If the left tuple creation time is earlier than the current sync time, the tuple is emitted to the Join bolt immediately;
- Otherwise, it's put to buffer.
- When the Sync tuple with a new sync time is received:
- Messages from the buffer, which creation time is earlier than the current sync time, are emitted to the Join Bolt.
- These items are removed from the buffer.
- Join bolt– consumes left tuples from the sync bolt and right tuples from the right spout.
- It keeps the right tuples in the join window (buffer) until the right item is joined to the corresponding left item.
- When the right tuple is received, the Sync tuple with the new sync time (the creation date of the right tuple) is emitted to the Sync Bolt. This implementation assumes that the left tuple creation time will always be earlier or equal to the matching right tuple.
- Buffer Cleanup spout – periodicallyemits the clean buffer tuple to expire old unmatched items in the join window.
- Joined output handling bolt – receives joined result and can do whatever it needs to with the joined data.
More detailed explanation is necessary for buffers in the sync and join bolts. As performance is critical in our case, we had to implement our own collection with the following characteristics:
- Iteration step forwards - O(1);
- Iteration step backwards - O(1);
- Inserting item to the arbitrary position - O(1);
- Dropping the beginning of the list at any position - O(1).
A quite simple custom double-linked list implementation met all the necessary characteristics.
Even though this solution might seem pretty straightforward, there was a number of nasty details that we needed to overcome for the join to work in production.
First of all, we rely on the input data streams being (mostly) ordered by time. Here "mostly" means that ordering is not perfect, i.e. some records may "jump" backwards in time. Based on the record's time we synchronize the consumption of joined streams and expire the unmatched records.
Increasing the join window timespan helps coping with unordered records from the left stream, but it comes with the price of reduced performance. To tolerate the jumps in the right stream we have introduced a configurable "sync-delay" setting to delay the synchronization time of the left side stream, as some earlier right side records might still arrive later.
We needed to analyze the data to determine how big the common "jumps" are and come up with an appropriate delay. This was obviously a compromise as increasing the delay would improve the tolerance to unordered data, but it would also increase the overall join latency by the same amount. In case the jumps are larger than our delay setting, the corresponding items could end up being unmatched.
Along with the "sync-delay", there are other join parameters that heavily depend on the quality of input data ordering:
- Join window timespan – it needs to accommodate the time difference between joined records, which are allowed by the business logic, and, in addition, most of the time jumps in the left stream.
- "Max_spout_pending" – it controls how many records could be emitted and not yet fully processed for each of the spouts. When the "sync-delay" is increased, naturally, more records need to be emitted from the right stream to the join window before the left stream is allowed to go through. As a result, the delay needs to be increased as well.
However, with what’s described above we only could fight the consequences. If possible, it’s always better to get to the root cause of unordered data first. If there’s a single producer, it is necessary to reduce the topic partition count for improved results. While if there’re multiple producers, it is a good idea to synchronize the time on producers machines and in this way to reduce the size of producer’s batch. To sum up, the producer side tuning helped us reducing the “sync-delay”, and this allowed having smaller join latencies.
Uneven Kafka Topic Consumption Speed
Kafka is great at handling huge loads. However to achieve that, Kafka divides a topic into several partitions that are written to and read from in parallel. This fact does not help when you depend on the ordering of data: all partitions have to be consumed evenly at the same pace to keep the data ordering as close to original as possible.
There are millions of reasons why consumption from separate partitions could get out of sync and you'd end up processing extremely unordered data. The first thing we’ve noticed when analyzing the uneven consumption was related to the parallelism (number of parallel threads) of the Kafka spout. Although it is possible to specify the parallelism less than topic's partition count, and the spout task would handle several partitions, it seems that tasks would emit quite a lot of records from one partition before going to others. If the parallelism matches partition count, the resulting stream is better aligned across partitions.
However, this is the only improvement and still there is no guarantee for even consumption. Considering a distributed nature of Storm, different consuming threads can run on totally different machines; components running on the same machine could communicate faster than others. As a result, all of this causes uneven consumption speed of different partitions. Also, if suddenly the consumption from a single partition would stop, the join would still go on and just miss matching records.
Consequently, we have decided to track the consumption state from each of the partitions separately. To do that we have extended the Kafka spout to add originating partition ID to each of the emitted tuples. Then it was possible to know the timestamp of the latest tuple we've got from each of the input partitions and synchronize the left stream consumption as well as expire the join window items according to the slowest partitions. The processing of records from the faster partitions would pause until the slowest partitions would catch up. Also, if consumption from some of the partitions would stop, further data would not be processed. And that's exactly what we wanted.
Restarts / Redeployments
The distributed nature of streams also brings some nuances related to topology restarts and redeployments. Let's look into some details:
- Storm ensures at-least-once processing.
- It does that by persisting consumption offset for each of the Kafka’s topic partition before which all the records were fully processed by topology.
- On a restart / redeployment consumption of data resumes from the last persisted offset.
Let's look at the example below.
There are spout emitted records 1, 2, 3, and 4.
Let's say that at the time the topology stops only records 1, 2 and 4 are fully processed. So the persisted consumption offset by Storm is 2. Even though the record 4 was processed, the record 3 was not and the next time topology starts, it starts processing records 3, 4 …
So, the record 4 is processed for the second time producing a duplicate, which is alright according to the at-least-once guarantee.
Imagine the following situation in our join with multiple partitions.
Here the green and yellow cells are fully processed records. At this moment the topology is stopped. As Storm stored the consumption offset "b" for the right side's partition 1, the next time the topology starts, it will again process the right side's record 8. However, the left side record 8 was already processed, so no match will be found during the join.
It can happen on both of the join sides and could cause some duplicate unmatched records in the join results for the same key. Therefore, it is necessary to consider if this is acceptable for specific use cases. In our case, the duplicates are handled during the further stages of processing.
What's next ?
We are already running this join solution in production for one topology. It is capable of coping with our performance requirements with a fairly small amount of resources. In the future, we expect to have use cases of join where this in-memory implementation will not be suitable. For example, when the join time window will be large. Thus, the next steps in our plan is a join implementation with external storage for the join window. Now, when we have dealt with most of the issues, adding the external storage support will be pretty straightforward.
Written by Simonas Gelaževičius and Ernestas Vaiciukevičius