Apache Storm was launched with tag “Hadoop of Real Time Processing” . It is essentially a stream-processing engine which works on streaming data in ‘real-time’ (unlike Hadoop and Spark which are either batch or micro-batch processing).
At AppLift, we are using Apache Storm to build Real Time Events Processing Pipeline for analyzing, aggregating, frequency capping and most importantly taking budget decisions in real-time per Ad-campaign with least possible latency. The events data is streamed to Apache Storm in real-time; via in-house built publisher-subscribers mechanism using ZMQ; where events are processed and persisted.
Brief background of Storm
Apache Storm was originally developed by Nathan Marz and the BackType team which was later acquired by Twitter. Later it was open sourced and eventually became a top level Apache project.
Storm framework is built on very efficient message processing abstractions (stream, tuple, spout,bolt,etc) which enables developers to focus their energy on developing processing logic instead of system maintenance tasks. All storm artifacts are analogous to Hadoop job processing system.
Nimbus is similar to Hadoop's JobTracker. It runs on master node and is responsible for distributing code around the cluster, assigning tasks to worker nodes, and monitoring for failures.
Supervisor runs on worker nodes. Supervisor listens for work assigned by Nimbus to its node and starts and stops worker processes accordingly. Each supervisor can run multiple worker processes on the same node.
Each worker process is a separate JVM instance running on the worker node and can itself run multiple parallel tasks (spouts/bolts).
A typical topology consists of spouts and bolts wired in logical flow for processing tasks.
Spout act as a data receiver from external source(s) and creates stream of tuples for the bolts to do actual processing.
Bolt is the unit which do the actual processing. Bolts can be chained serially or in parallel depending on what kind of processing needs to be done. Each bolt is assigned a fixed processing responsibility like modifying incoming tuples before passing on to next Bolt,persisting into database,etc.
Apache Storm does not have its own state managing capabilities. Instead, it uses Apache ZooKeeper to manage the cluster state. All coordination between Nimbus and the Supervisors such as message acknowledgements, processing status etc is done through a Zookeeper cluster. Nimbus daemon and Supervisor daemons are stateless; all state is kept with Zookeeper or on local disk. This enables Storm to start right from where it left even after the restart.
Storm at AppLift
At AppLift, with our unique performance driven RTB bidding platform we do mobile advertising on behalf of our customers. This involves series of events ranging from bidding, winnings bids, impression, clicks, conversion and post conversion, the events are generated at the rate of millions of events per hour. The challenge is to process these events in fastest way possible to get insights from the data. With our storm topology we perform various operations on the incoming events ranging from:
- Simple filtering and validation
- Unique Count Analysis
- Complex cost calculations
- Device Frequency capping
- User balance updations
- Persisting events to MySQL database and aerospike
The events from exchanges and mobile devices are received by the fleet of event machines and are logged to respective files. For performance reasons and to have least footprint on event servers we wrote the publisher in LUA using ZMQ as backbone for messaging. The messages are published as soon as they are written to the logs.
The ZMQ subscriber is written in Java which uses ZMQ java api to receive the messages from ZMQ publisher. Subscriber consumes data from publisher first doing a onetime handshake and then the process keeps on going with guarantee that new message will be received only when an ack is sent ack to publisher. All subscriber runs in a thread pool managed by an executor service. It pushed data to a large shared BlockingQueue.`
Spout keeps polling the shared queue, reads events, transforms into stream of tuples after validation and then finally emits to all the Bolts as defined into the topology graph.
Each bolt is accordance to its implementation process events, aggregates and batches transformed/aggregated data for persisting to MySQL or aerospike. The batch is persisted if a batch size of 50K has reached or over 10 seconds elapsed since last batch commit.
Setup configuration and stats
Server : 64GB RAM, 24 Cores
Memory assigned :
- Nimbus : 1 GB
- Supervisor : 1 GB
- Worker : 24 GB
A particular Bolt is processing on an average ~20-25 million events every hour depending upon traffic of the hour. Each type of Bolt runs with parallelism hint of 4, to perform tasks in parallel.
- process of the same JVM to get better performance.
- Worker memory should be assigned properly (24GB in our case) as at peak time, when memory usage surges, Supervisor/Nimbus might just restart the Storm topology without giving any exception/error in logs.
- Heartbeat timeout parameters should be configured properly as by default they are 30 seconds which might not be sufficient in case a major Garbage Collection happens. We encountered an issue where whole topology was being restarted by Nimbus/Supervisor because GC was causing Stop-The-World pause for more than 30 seconds HeartBeat timeout.
- If running multiple bolts under same worker, always prefer thread safe collections like ConcurrentHashMap,Guava CacheBuilder instead of HashMap as Bolts run in threads so it is better to be thread-safe.