Apache Spark ecosystem has revolutionised the Big Data processing world with its unique unified programming model that encompasses batch processing, stream (near-real-time) processing and machine learning capabilities. This has made Spark a must-have technology for companies such as AppLift, which rely heavily on open source technologies for its data engineering and data science needs.
At Applift, we are leveraging Apache Spark for building Lambda Architecture processing pipeline to perform deep analytics on data. Apart from having a very stable Storm processing system for processing real time Event logs running over last 2 years, we have developed Spark Streaming Applications for providing deep real-time analytics to our customers into how their campaigns are performing by processing both Bid Request logs and User Event logs as soon as these get generated.
Spark Streaming Architecture At Applift :
Our Spark Streaming applications do complex computations like cost calculations, unique event counts, etc. to show data metrics to our customers in real-time.
We have built an Apache Kafka system (a low latency, distributed,replayable, fault tolerant message broker) serving as single source of data for various processing systems. For ingesting data from Kafka in Spark Streaming, we are using the DirectKafka approach.
We have 2-types of logs to process:
- Request Logs: contains Bid Request data logs.
- Event Logs: contains different sub-types of event logs like Notify, Impression/Win, Click, Conversion/Install, Video, Post-Install.
We have a separate Kafka Topic for Request log and each Event log sub-type.
There is huge difference between the incoming data rate of Bid Request logs and Event data logs. While Bid Request logs arrive at around 20-30 K/Sec on average depending upon business hour, Event logs with all subtypes combined clock at around 2-4 K/Sec.
The processing of both the request and event logs is similar and ends up with the similar output format. But due to 10-15 times data rate gap between Request Bid logs and Event logs, we have configured 2 streaming applications for performance reasons:
- One for processing Request log
- Second for processing all Event log types.
Both the applications save data in the same persistent system post-processing, which is then available to be queried from the UI dashboard.
With 15 secs configured as batch interval, we are able to provide customers an insight into their campaign data in real-time within 30-60 secs of the events generation considering average latency of the ingestion and processing system.
We also have a Spark Batch job in place which run on hourly past data with lag of 2 hours.
Spark Streaming is for speed while Spark Batch is for correctness.
Apache Mesos is being used as Resource Manager in our streaming application.
How Applift Tuned Spark Streaming For Performance :
Coding in Spark Streaming is easy, tuning is challenging.
This is because Spark offers a lot of configurable parameters and one needs to arrive at right configurations with right values depending upon use case. Key to tuning is to start with incremental approach, tuning parameters one by one, logging code properly and keeping a hawk’s eye on Spark UI to spot bottlenecks. The approach that we followed at Applift for achieving performance can be summarised in below points:
Kafka Ingestion Approach:
We preferred DirectKafka approach over Receiver based approach for better efficiency, parallelism and exactly-once semantics. This approach keeps 1-1 mapping of each Kafka partition to RDD partition in streaming. So it is better to have total cpu cores less than total partitions (noOfTopics*partitionsPerTopic > totalCoresInCluster) so that all cpu cores are fully exploited. It is generally been suggested to have total no of partitions around 2-3 times of number of cores depending on the processing. Also to reduce network calls, we are passing 100 log lines together in one kafka message which can be splitted once received inside Spark Streaming for processing.
Custom Kafka Offset Management:
Initially we chose Checkpointing feature for saving kafka offsets in a specified directory for recovery from fault tolerance and guaranteeing Exactly-Once semantics as our processing has idempotent behaviour. But Checkpointing makes code upgradation difficult as it also saves state and configurations and does not consider changes on restart. So we chose custom Kafka offset management. Now we just save Kafka starting offsets of partitions to Zookeeper in each microbatch which we can fetch to restart during upgrade.
Batch Interval Parameter :
We started with some intuitive batch interval, say 5 seconds and tried experimenting with different values to get idea what batch interval suits the processing. In our case 15 seconds was better.
val ssc = new StreamingContext(sc, Seconds(15))
There is another parameter spark.streaming.blockInterval (default -200ms) which is kind of confusing. It is relevant for receiver based approach in kafka and not the direct approach.
ConcurrentJobs Parameter :
Cores should be fully utilised for which we found a very useful parameter which significantly improved the processing.
By default the number of concurrent jobs is 1 which means at a time only 1 job will be active and till its not finished,other jobs will be queued up even if the resources are available and idle. However this parameter is intentionally not documented in Spark docs as sometimes it may cause weird behaviour. We should tune it accordingly keeping side-effects in mind. Running concurrent jobs brought down the overall processing time by 2-3 times by fully utilizing the cores available.
maxRatePerPartition parameter :
A particular partition can contain significantly more data compared to other partitions in its microbatch and so it will take longer time to get processed than it peer partitions which means it will alone increase the batch processing time even though other partitions are processed. In worst case, it might happen that all the concurrent jobs are taking longer processing time because of few skewed partitions while new jobs are queued up and many cpu cores are sitting idle. To avoid this, always use this parameter to limit the maximum rate of messages/sec in a partition .
So with batch interval of 10 sec, the above parameter with value 25 will allow a partition to have maximum 25*10=250 messages. We should tune to the maximum rate at which application can process without inducing delay.
Uniform data distribution:
Sometimes it might happen that some kafka topics/partitions send data at very high rate while some send at very low. Such scenarios if possible should be avoided as skewed data distribution in any distributed environment leads to weird behaviours. It is ideal to have uniform data rate in all partitions. Initially we started with single Streaming application for processing both Request and Event logs data but because of huge data rate difference between the two, processing was slow. So instead, We tried running 2 separate spark streaming jobs on same cluster, one for slower Event topics and another for faster Request topic and assigned different maxRatePerPartition to them. It clearly boosted performance several times. But it came with the overhead of maintaining 2 similar code base .
Mesos parameters :
Mesos by default assigns all the cores to the first streaming app submitted . So Mesos will not allow the 2nd app to run saying not enough resources available to offer. Assign max cores to each application after tuning.
It is generally advisable to run Mesos in coarse mode instead of fine grained mode as it has lower overhead and offers better performance.
Mesos in coarse mode allows only 1 executor to run on each node machine (unlike Yarn) and so below parameters executor cores and executor instances are irrelevant in Mesos.
Memory parameters :
Driver and Executor memory should be tuned keeping machine configuration and processing needs in mind.
The executor memory determines the memory assigned for each executor and since Mesos allows only 1 executor per machine, we can keep it relatively higher as per machine RAM . It is good to unpersist the dstream as soon as processing ends for the associated batch.
Also the persist() method call on dstream should be avoided if the same RDD is not going to be used multiple times in processing otherwise it will increase memory consumption.
There are some other memory parameters like spark.memory.fraction,spark.memory.storageFraction,etc which are recommended at default value. They should be changed only after reading:
Backpressure parameter :
Having this property enabled means spark streaming will slow down rate of fetching messages from kafka if the processing time is being observed more than batch interval and scheduling delay is increasing. It is helpful in cases like when there is sudden surge in data flow and is a must have property to have in production to avoid application being over burdened. However this property should be disabled during development and staging phase otherwise we cannot test the limit of the maximum load our application can and should handle.
Additional parameters :
In addition to above parameters, it is suggested to use kryo serializer and G1GC for garbage collection in driver/executor.
sparkConf.set("spark.driver.extraJavaOptions", “-XX:+UseG1GC ”)
SetUp Configuration and Stats :
Currently We have a 3-Node Spark Streaming Cluster running under Mesos . Mesos master is setup on one of the machines with Mesos slave on each of the machines. The cluster is under continuous evaluation and upgradation as data rate is only increasing day by day with addition of new customers and increased bidding budgets. The current set up and stats as of today while writing this post is :
Each Machine Node machine configuration (Total 3 Nodes):
Cores : 24
Memory : 64GB
Model : Intel(R) Xeon(R) CPU E5-2620 v3 @ 2.40GHz
Request Streaming Application :
Max Cores : 64
Driver Memory : 10 GB
Executor memory : 15 GB
Event Streaming Appplication :
Max Cores : 8
Driver Memory : 10 GB
Executor memory : 15 GB
Data Processing Rate :
Request Streaming App: 20-30K logs/Sec, Batch Interval=15Sec , Processing Time=11Sec
Event Streaming App : 2-4 K logs/Sec, Batch Interval=15Sec , Processing Time=3Sec
The tuning details and results shared above are specific to Applift’s use case. But still the info provided gives a very good reference point to start with.
Also, when we started development, Spark 1.5.x version was the stable build at the time. Currently Spark has recently released many advanced features like structured streaming,session,datasets,etc with Spark 2.x releases which we will be evaluating and implementing as and when needed.