Learning Configuring Spark Jobs

Installation

Go to official Apache Spark download webpage and select Spark release version 3.1.1 and package type Source Code. Unzip the downloaded package and build Spark by Apache Maven.

1
./build/mvn -DskipTests clean package

Example Programs

SparkPi

SparkPi is a compute-intensive task which estimates pi by “throwing darts” at a circle. Random points in the unit square ((0, 0) to (1,1)) are picked and we can see how many fall in the unit circle. The fraction should be pi/4, so we use this to get the estimate.

1
2
# Usage: pi [partitions]
./bin/run-example SparkPi 10/1000/10000

For 10 partitions, the pi estimation is roughly 3.1392951, and the running time is 0.496385s. For 100 partitions, the pi estimation is roughly 3.1413819, and the running time is 3.576282s. For 1000 partitions, the pi estimation is roughly 3.1416965, and the running time is 43.483414s. For 10000 partitions, the pi estimation is roughly 3.1415915, and the running time is 363.989005s.

WordCount

Dataset

Gutenberg

To get big quantities of text without repetition, I crawl text files from Project Gutenberg.

1
2
3
4
5
6
7
8
9
# Create directory
mkdir Download/temp/
cd Download/temp/
# Crawl text files
wget -bqc -w 2 -m -H 'http://www.gutenberg.org/robot/harvest?filetypes[]=txt&langs[]=en'
# Extract text data
mkdir extracted/
find . -name '*.zip' -exec sh -c 'unzip -d extracted {}' ';'
cat extracted/*.txt > temp.txt

After crawling text data, remove special characters from the text file and fix encoding.

1
2
3
4
import re
string = open('temp.txt', encoding = "ISO-8859-1").read()
new_str = re.sub('[^a-zA-Z0-9\n\.]', ' ', string)
open('gutenberg.txt', 'w').write(new_str)
Yelp

Download Yelp dataset. I will focus on the review dataset which contains big quantities of text without repetition and can work as a benchmark. Before I run the example program on the dataset, I first need to convert the JSON file into a CSV file. Since the dataset is too large, I will process the first 1000000 reviews which is almost 6GB.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import json
import pandas as pd
review_json_path = 'yelp_academic_dataset_review.json'
size = 1000000
review = pd.read_json(review_json_path, lines=True, dtype={'review_id': str, 'user_id': str, 'business_id': str,
'stars': int, 'date': str, 'text': str, 'useful': int, 'funny': int, 'cool': int}, chunksize=size)
chunk_list = []
for chunk_review in review:
chunk_review = chunk_review.drop(
['review_id', 'user_id', 'business_id', 'stars', 'date', 'useful', 'funny', 'cool'], axis=1)
chunk_list.append(chunk_review)
df = pd.concat(chunk_list, ignore_index=True, join='outer', axis=0)
csv_name = "result.csv"
df.to_csv(csv_name, index=False)

After getting the CSV file, I also need to clean the data by removing special characters.

1
2
3
4
import re
string = open('yelp.csv').read()
new_str = re.sub('[^a-zA-Z0-9\n]', ' ', string)
open('result.txt', 'w').write(new_str)

Result

Gutenberg

Run the following command to count words in the dataset.

1
time ./bin/spark-submit examples/src/main/python/wordcount.py ../gutenberg.txt

The total running time is 0m6.037s.

Yelp

Run the following command to count words in the dataset.

1
time ./bin/spark-submit examples/src/main/python/wordcount.py ../yelp.txt

The total running time is 1m56.498s.

Configuration Impact

I choose to run an example MapReduce program sort to see the configuration impact.

Bug Fix

The provided python code for the sort program has a bug in line 35. Fix it by removing the translation from string to int.

1
2
3
sortedCount = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.sortByKey()

Dataset Preparation

I will use the Yelp dataset as a benchmark to see the impact configuration. However, since the Yelp dataset I use in the previous section is too large (>5GB), the sort program will always report buffer overflow. So, I will only run the first 500MB dataset (xaa).

1
split -b 500m yelp.txt

Configuration File

The configuration file of Spark is provided as a template spark-defaults.conf.template. To modify the configuration, first copy the template and change the name to spark-defaults.conf, then I can change the settings by adding properties in this file.

Driver Memory

The first change in the configuration is spark.driver.memory and spark.driver.maxResultSize. The spark.driver.maxResultSize is set to 10G which is larger than the size of the dataset to ensure the serialization can complete. The driver memory is the amount of memory to use for the driver process. I have tested four values for the driver memory: 1G, 2G, 4G and 8G. The program can run by two commands.

1
2
3
4
# set config directly in command line
time ./bin/spark-submit --driver-memory 1g examples/src/main/python/sort.py ../xaa
# set config in config file
time ./bin/spark-submit --properties-file conf/spark-defaults.conf examples/src/main/python/sort.py ../xaa

1G

The Spark default value of driver memory is 1G. However, it’s too small for the testing dataset and the sort program will report Java heap space out of memory.

2G

After raising the driver memory to 2G, the sort program will still report Java heap space out of memory.

4G

The sort program will run successfully when the driver memory is raised to 4G. The program can correctly print out the sorted words and the total running time is 3m47.166s (including the printing time).

8G

The program can correctly print out the sorted words and the total running time is 3m47.200s (including the printing time), which is similar to 4G.

Reducer Size

The second property to modify is the spark.reducer.maxSizeInFlight. It is the maximum size of map outputs to fetch simultaneously from each reduce task. It represents a fixed memory overhead per reduce task. I would like to see if the fetch size will impact the performance. I will try three sizes: 12M, 48M, and 96M. The spark.reducer.maxSizeInFlight will be added to the property file and the program will run by the following command.

1
time ./bin/spark-submit --properties-file conf/spark-defaults.conf examples/src/main/python/sort.py ../xaa

12M

The running time with 12M fetching size is 3m58.065s including the result printing time.

48M

The Spark default reducer max size in flight is 48M. The running time with 12M fetching size is 4m0.725s including the result printing time.

96M

The running time with 12M fetching size is 3m56.303s including the result printing time.

Shuffle Buffer

The third property to change is the spark.shuffle.file.buffer. It is the size of the in-memory buffer for each shuffle file output stream. The buffers can reduce the number of disk seeks and system calls made in creating intermediate shuffle files so that larger buffers can theoretically result in better performance. I will try three different buffer sizes: 8K, 32K, and 1M. The following is the running command.

1
time ./bin/spark-submit --properties-file conf/spark-defaults.conf examples/src/main/python/sort.py ../xaa

8K

The running time for 8K shuffle buffer is 3m55.807s including result printing time.

32K

32K is the Spark default value for shuffle file buffer. The running time for 8K shuffle buffer is 3m59.308s including result printing time.

1M

The running time for 8K shuffle buffer is 3m59.725s including result printing time.

Serializer

The fourth property to change is the spark.serializer. It is the class to use for serializing objects that will be sent over the network or need to be cached in serialized form. It is said that the default serializer JavaSerializer doesn’t perform well enough. I will try two different buffer sizes: default one and KryoSerializer. The following is the running command.

1
time ./bin/spark-submit --properties-file conf/spark-defaults.conf examples/src/main/python/sort.py ../xaa

JavaSerializer

The running time without printing out results for default serializer is 6.6012s.

KryoSerializer

The running time without printing out results for default serializer is 6.5324s. In order to run the program without buffer overflow, I set the spark.kryoserializer.buffer.max to 2047M.

Thread Number

Since I am running Spark on local mode, I can’t control how many cores the program uses to run the benchmark, but I can set how many threads the local program is running on within the limit of the number of logical cores of the device. I have tried five different thread number: 1, 2, 4, 8 and 12. The default thread number is 2. The following is the command running the program.

1
time ./bin/spark-submit --master local[1] --properties-file conf/spark-defaults.conf examples/src/main/python/sort.py ../xaa

Summary

Thread Number Running Time
1 32.909s
2 18.484s
4 9.565s
8 7.100s
12 6.570s