Quite a long time ago, we wrote about how we were joining streams in Apache Storm (read here).
The ideas described there are still valid and being used. However, that was only the beginning—many new nuances have come up while join was running in production. Also, we wanted to overcome the limitations pointed out in the first article.
The initial join version that was described in the first article was quite stable... under usual circumstances. But because it’s real life, unusual stuff started to happen, be it Kafka brokers going down, malfunctioning of producers writing to join’s source topic, or crashing Storm nodes.
In many cases, join topology recovered as expected. But in others, it did not—it simply stalled and did not process any items. To recover and start the join process again, it needed manual recovery. Luckily, we had a tool for that created in advance. The tool did binary scans of the Kafka topics looking for the offsets corresponding to specific time stamps and wrote them to ZooKeeper, where Kafka spouts store their states. That way, we could restart topologies and force them to reprocess the source topics from the time we wanted. Using this tool, we just forced the positions in left and right source streams to be at the same time, and this allowed join to proceed further.
After several such cases, a pattern started to emerge: the deadlock would happen if the right (optional) stream position is at a time stamp much older than the left stream. The join bolt performed window expiration based on the time stamps of incoming left stream items. But the synchronization mechanism would not allow any left stream items to pass through to trigger join window expiration, and, at the same time, join window size is limited by the amount of RAM and max-spout-pending settings—the window could not grow large enough to move the left stream synchronization point enough. Deadlock!
After the cause was clarified, the solution came not long afterwards. We extended the SyncBolt to periodically report to JoinBolt time stamps of the most recent items for each partition. While JoinBolt will additionally expire the items from the join window based on these new reports, thus allowing the right stream to catch up to the state where the normal join would resume.
After those improvements, join become much more stable.
Due to Kafka spout design, duplicate items are produced during topology restarts and redeployments. And this is perfectly fine, having in mind “at-least-once” guarantees by Storm and Kafka spout. This was known and mentioned in the first article. However, there are cases where we want to avoid duplication as much as possible, so we started to look for a generic solution to this duplication problem.
Changing or adjusting Kafka spout functionality and later maintaining custom changes seemed too complex and expensive. On the other hand, we knew exactly when duplicates are likely to be produced—in general worker restarts (which happen during topology restarts, partial topology or cluster failures, and redeployments). Many topologies produce output that goes to another Kafka topic or other external system and resulting items often have a way to determine from which source record it was produced (such as having the same id or part of a new id).
Knowing this, we created the following scheme to detect and avoid duplicates rather than constantly ensure stream uniqueness (implemented as a separate bolt we called “deduplicator”):
- Deduplication would automatically start on any worker restart and would last for some time until duplicates are being detected.
- Deduplicator bolt would:
- load some amount of recent results produced by topology
- start passing messages through, filtering out messages that already produced some results (checking messages against the result list loaded previously)
- Deduplication would be disabled automatically after no duplicates are detected for some time.
Obviously, performance degrades while deduplication is active, but once the deduplication period passes and is disabled, there’s no more impact on performance.
Our current join implementation was best suited to small join windows. Some use cases started to come in that would require windows of tens of minutes. Combined with throughputs of tens of thousands of items per second, join windows grew enough to not fit in topology RAM.
On the other hand, in-memory join implementation does not allow pauses in the right stream (it would stop the join until an item comes from the right stream to move the left-side items through).
These arguments led to creation of another flavor of join—external storage join. The main difference, as the name suggests, is that the join window is not maintained in the topology itself; rather, external storage is used. We wanted to use Aerospike as a backing storage, but it was abstracted by the REST API.
We made our API quite general and not tied to a specific use case:
- It allowed storing and retrieving arbitrary data by key and
- supported batching many items in a single request for performance.
The join could be split into these parts:
- Feeding the right-side stream to the join window (through API)
- Processing left-side stream doing lookups to the join window (again through API)
- Synchronization of streams
Feeding the right-side stream
This is quite trivial—just consume the data, accumulate batches, and call the REST API.
Processing left-side stream
This is nothing complicated either (apart from the synchronization part)—accumulate batches, make a request to REST API to find the matching right-side items, and emit results.
The most interesting part is how to synchronize the streams. In other words, we do not want the left-side stream to be processed until we are sure that corresponding right-side items are already saved to Storage API.
As all of our records have the associated time stamps, we again used them for synchronization. Also, we had existing components that we used elsewhere to track the data consumption state.
Those existing components are called Data Delay Collector and Data Delay API. Data Delay is a term we use to describe a data source consumption delay compared to current time. So if we say that a topic is consumed with a 1-minute data delay, that means that all the data that is older than 1 minute should be already consumed and more recent data is still in the queue.
For the Kafka topic data source, the scheme is as below.
Here, Data Delay Collector periodically resolves the time stamp of data consumed by a particular spout from the topic in the following steps:
1. For each partition:
a. Read the Spout state from ZooKeeper to determine latest consumed offset
b. Read the record from Kafka topic’s partition at that offset
c. Determine the time stamp of the record
2. After consumption,time stamps for all partitions are known; submit the minimum time stamp to Data Delay API. The combined Data Delay for the topic can now be provided by Data Delay API based on this time stamp.
Now, having the Data Delay collection configured for our right-side stream spout, we could periodically request the right-side spout consumption time stamp to know at which point in time data was saved to Storage API. As DataDelaySyncBolt would only allow items later than that point in time to pass through, we can be sure that matching items will either be in Storage API or not exist at all while making a REST request in RestApiJoinBolt.
Now the complete picture would look like this:
Although this join flavor requires more external components and has no ability to detect unjoined right-side items, it has its own advantages:
- No need for complex tracking of consumption in each Kafka partition and deadlock prevention
- Support for significantly larger join windows that can span hours
- Support for right-side streams with no continuous throughput (allowing pauses in right-side stream)
As we are running a Storm-based platform and provide it as a service for other teams in Adform, it’s now much more likely that one of these ready-to-use join solutions will fit our consumers’ needs.
One improvement that can be made to this new solution is clear—we could add the ability to handle unjoined right-side items. However, let’s leave that for the next blog post :)