Learning Configuring Spark Jobs
Installation
Go to official Apache Spark download webpage and select Spark release version 3.1.1 and package type Source Code. Unzip the downloaded package and build Spark by Apache Maven.
1 | ./build/mvn -DskipTests clean package |
Example Programs
SparkPi
SparkPi is a compute-intensive task which estimates pi
by “throwing darts” at a circle. Random points in the unit square ((0, 0) to (1,1)) are picked and we can see how many fall in the unit circle. The fraction should be pi/4
, so we use this to get the estimate.
1 | Usage: pi [partitions] |
For 10 partitions, the pi
estimation is roughly 3.1392951, and the running time is 0.496385s. For 100 partitions, the pi
estimation is roughly 3.1413819, and the running time is 3.576282s. For 1000 partitions, the pi
estimation is roughly 3.1416965, and the running time is 43.483414s. For 10000 partitions, the pi
estimation is roughly 3.1415915, and the running time is 363.989005s.
WordCount
Dataset
Gutenberg
To get big quantities of text without repetition, I crawl text files from Project Gutenberg.
1 | Create directory |
After crawling text data, remove special characters from the text file and fix encoding.
1 | import re |
Yelp
Download Yelp dataset. I will focus on the review dataset which contains big quantities of text without repetition and can work as a benchmark. Before I run the example program on the dataset, I first need to convert the JSON file into a CSV file. Since the dataset is too large, I will process the first 1000000 reviews which is almost 6GB.
1 | import json |
After getting the CSV file, I also need to clean the data by removing special characters.
1 | import re |
Result
Gutenberg
Run the following command to count words in the dataset.
1 | time ./bin/spark-submit examples/src/main/python/wordcount.py ../gutenberg.txt |
The total running time is 0m6.037s.
Yelp
Run the following command to count words in the dataset.
1 | time ./bin/spark-submit examples/src/main/python/wordcount.py ../yelp.txt |
The total running time is 1m56.498s.
Configuration Impact
I choose to run an example MapReduce program sort to see the configuration impact.
Bug Fix
The provided python code for the sort program has a bug in line 35. Fix it by removing the translation from string to int.
1 | sortedCount = lines.flatMap(lambda x: x.split(' ')) \ |
Dataset Preparation
I will use the Yelp dataset as a benchmark to see the impact configuration. However, since the Yelp dataset I use in the previous section is too large (>5GB), the sort program will always report buffer overflow. So, I will only run the first 500MB dataset (xaa).
1 | split -b 500m yelp.txt |
Configuration File
The configuration file of Spark is provided as a template spark-defaults.conf.template
. To modify the configuration, first copy the template and change the name to spark-defaults.conf
, then I can change the settings by adding properties in this file.
Driver Memory
The first change in the configuration is spark.driver.memory
and spark.driver.maxResultSize
. The spark.driver.maxResultSize
is set to 10G which is larger than the size of the dataset to ensure the serialization can complete. The driver memory is the amount of memory to use for the driver process. I have tested four values for the driver memory: 1G, 2G, 4G and 8G. The program can run by two commands.
1 | set config directly in command line |
1G
The Spark default value of driver memory is 1G. However, it’s too small for the testing dataset and the sort program will report Java heap space out of memory.
2G
After raising the driver memory to 2G, the sort program will still report Java heap space out of memory.
4G
The sort program will run successfully when the driver memory is raised to 4G. The program can correctly print out the sorted words and the total running time is 3m47.166s (including the printing time).
8G
The program can correctly print out the sorted words and the total running time is 3m47.200s (including the printing time), which is similar to 4G.
Reducer Size
The second property to modify is the spark.reducer.maxSizeInFlight
. It is the maximum size of map outputs to fetch simultaneously from each reduce task. It represents a fixed memory overhead per reduce task. I would like to see if the fetch size will impact the performance. I will try three sizes: 12M, 48M, and 96M. The spark.reducer.maxSizeInFlight
will be added to the property file and the program will run by the following command.
1 | time ./bin/spark-submit --properties-file conf/spark-defaults.conf examples/src/main/python/sort.py ../xaa |
12M
The running time with 12M fetching size is 3m58.065s including the result printing time.
48M
The Spark default reducer max size in flight is 48M. The running time with 12M fetching size is 4m0.725s including the result printing time.
96M
The running time with 12M fetching size is 3m56.303s including the result printing time.
Shuffle Buffer
The third property to change is the spark.shuffle.file.buffer
. It is the size of the in-memory buffer for each shuffle file output stream. The buffers can reduce the number of disk seeks and system calls made in creating intermediate shuffle files so that larger buffers can theoretically result in better performance. I will try three different buffer sizes: 8K, 32K, and 1M. The following is the running command.
1 | time ./bin/spark-submit --properties-file conf/spark-defaults.conf examples/src/main/python/sort.py ../xaa |
8K
The running time for 8K shuffle buffer is 3m55.807s including result printing time.
32K
32K is the Spark default value for shuffle file buffer. The running time for 8K shuffle buffer is 3m59.308s including result printing time.
1M
The running time for 8K shuffle buffer is 3m59.725s including result printing time.
Serializer
The fourth property to change is the spark.serializer
. It is the class to use for serializing objects that will be sent over the network or need to be cached in serialized form. It is said that the default serializer JavaSerializer
doesn’t perform well enough. I will try two different buffer sizes: default one and KryoSerializer
. The following is the running command.
1 | time ./bin/spark-submit --properties-file conf/spark-defaults.conf examples/src/main/python/sort.py ../xaa |
JavaSerializer
The running time without printing out results for default serializer is 6.6012s.
KryoSerializer
The running time without printing out results for default serializer is 6.5324s. In order to run the program without buffer overflow, I set the spark.kryoserializer.buffer.max
to 2047M.
Thread Number
Since I am running Spark on local mode, I can’t control how many cores the program uses to run the benchmark, but I can set how many threads the local program is running on within the limit of the number of logical cores of the device. I have tried five different thread number: 1, 2, 4, 8 and 12. The default thread number is 2. The following is the command running the program.
1 | time ./bin/spark-submit --master local[1] --properties-file conf/spark-defaults.conf examples/src/main/python/sort.py ../xaa |
Summary
Thread Number | Running Time |
---|---|
1 | 32.909s |
2 | 18.484s |
4 | 9.565s |
8 | 7.100s |
12 | 6.570s |