Increasing Spark Throughput When Working With S3
I would like to post a short description about a simple design change, I just did for one of Tikal’s customer, which greatly improved the throughput for their processing on their BigData lake with Spark.
In the last few months I had to build a BigData infrastructure for one of our customers. The domain for this customer deals with travelers and their trips bookings. As part of this infrastructure, we gathered all the information accumulated over the years for the enterprise into a “Single Source of Truth” – A kind of “Data Lake”, that can be used by BI and machine learnings intelligence and insights.</br></br> The data include both structured data and semi-structured data. An important part of the semi-structured data is the travelers’ requests for their trips, generated by the ordering sub-system. These requests are processed on-line by the system, and also stored as files in S3 on AWS cloud. As each request describes various kind of data regarding the travelers trip (i.e. the traveler names, the trip details and many other transactions data), and as the request is formatted in XML format, it turned out that each XML is quite big - about 4KB. The customer had decided (a few years ago), when they had started accumulating the requests, to save each request as a single S3 object.</br></br> Now, when I came to build the “data lake” above a few months ago, and wanted to include the S3 objects above into it, the natural thought was to add this S3 bucket with the trips information, into our data lake.</br></br> We use AWS EMR to be our Data-Platform infrastructure, and specifically, we use Spark as our processing engine for the Data Platform. So, the first step was to check whether we can simply load all the XML files, and to apply a simple logic with them. We chose as our “Hello World” POC to simply count the number of trips (i.e. count number of files). Since each trip is stored as a single XML object in S3, that would be an extremely short (one-line) POC, to simply load the files and count them (we knew the results should be the number of files) :
val sc = SparkSession.builder().getOrCreate().sparkContext println(sc.textFile(“s3://bucket-name/”).count)
The expected result was about 400M – The amount of trips trip orders so far. Since I knew that processing should take a while, I launched the code and went to drink some coffee. I came back, and nothing happened, it was still running, and after waiting for a while without any change, I understood we have some problem…
After Googling about it I found this resource , which explains why you can’t simply read from the bucket with Spark, and you actually need to apply the loop logic on the objects yourself from the master node: First get keys, and then let the slaves load the objects, using this method, loading the files will be applied in parallel by all Spark executors.</br></br> Nevertheless, when I wrote and launched this program, I found that just loading the file and doing simple logic (i.e. count the lines), still works extremely slow. A fast, rough calculation pointed to a throughput of about 100 objects/sec. Since we have 400M objects (trips in our case), it would roughly take about 40 days just to load and apply a simple counting…</br></br> Then, it finally hit me…We have a problem with our source layout design. The trip-per-file approach is simply not appropriate for batch processing when you use S3. As explained above, S3 doesn’t have the analogic semantic for “SELECT * FROM TABLE” , as you would have in any DB. S3 is an object store (and NOT a Database), which gives an api to load a page of keys (1000 at most), and then you need to “GET” each object by the key – A kind of the N+1 problem, when you wrongly work with RDBMS. Since we have about 400M in the bucket, the GET operation is affected by the amount of objects in the bucket, we got a poor throughput about 120 files per sec, no matter what is the cluster size!
So I decide to “change the ground we stand on”. Now that I pinpointed the problem, the solution is straightforward. I wrote a simple program, that gather all the objects from the S3 bucket and simply put them in batches in a new bucket (I put 10K trips in each new object). Running the same one-line Spark program above on the new bucket returned the expected result in less than 30 minutes. For production it’s OK for us to run our job for 30-40 minutes (these are daily jobs). It’s true that the import process above still had to run for a long time (40 days run), due to the constraint above, but this is a “one shot” process. Now that it’s done, we will use the output bucket with 10K objects as part of our data lake.
It was already known to me that small files negatively affect BigData processing frameworks (like Spark or Hadoop). However, what surprised me is the sizing – The improvement from 40 days to 30 minutes (for the same one-liner code) was not expected: An improvement of more than 2 order-of-magnitude on throughput.</br></br> The reason for that, is that S3 doesn’t have ‘batch reading’ (as it is an object store and NOT and DB), so in practice reading the 400M files was 400M HTTP API calls to the S3 object store. When the S3 bucket has so many objects, the throughput becomes not realistic for BigData processing, and you’d better accumulate the data in much bigger objects files.