Tuning spark parameters is not a trivial task. In this short post I will explain how to tune some of the important parameters.
Imagine, you have a very large file, which is stored in file system (it can be HDFS for example). You would like to start processing it. First, what you need is to say spark, in which parts you would like to divide this file. This setting is configured by parameter minPartitions during the call of function, which reads the file. For example (hadoopFile)
For example, you have 10 GB file, you specify minPartitions = 1000. Every chunk of the file will contain approximately 10GB/1000=10 MB of data. Hence, you will have 1000 chunks to process, these chunks will build a processing queue.
Another parameter, which influence processing queue, is spark.default.parallelism. It defines how many tasks spark opens to process chunks of data, which we have defined above. If we put here also 1000, then we have 1 task pro 1 chunk.
Next, you need to configure computation engine to process chunks of data defined above. We will specify below the parameters for spark-submit script. Computation engine in spark consists of the following parts:
Driver is a main program. Main workflow of the application is defined here. For example, when you call spark action (collect) parallel calculation is started and when it finishes you receive results back in driver.
For driver you can configure
Depending on your expected result you can define memory parameter here. Typically, but depending on size of your cluster, it can be 64GB.
Parameter driver-core is also not be huge, since calculation is happened in executors, not in driver. For example, the value of the parameter can be 1 or 2 cores.
More important for calculation is definition of computational engine: how many executors you need, how many memory and cores they will use.
For example, we have cluster with 4 Nodes with 48 cores each. Totally we have 4*48 = 192 Cores. Every node has 256 GB of RAM, totally we have 256*4=1024 GB. We need to assign these resources to spark computational engine.
It is recommended, that pro executor we need maximum 5 cores (CPUs). This means every executor will open 5 threads or tasks. If we assign more cores to executor, we will lose time to manage these threads.
Using this number we can easily calculate, how many executors we can have in cluster:
192 Cores in cluster/5 = (approx) 38 executors
Then we can calculate memory
1024GB in cluster/38 = (approx) 26 GB
We can configure:
Let’s put all things together, we have large task to process 10 GB of data, which is divided into small 1000 subtasks. We have configured processing engine, which can process 192 (number of cores) tasks a time. It is recommended, that number of tasks in queue is larger (3-5 times), than processing engine can process. This is because tasks are typically not equal in nature. So one task can last 1 minute, another task 10 minutes. If we have one to one relation, our cluster will stay unused and will wait for finish of other tasks. If we have processing queue larger, than capacity of the cluster, then if one task is finished cluster can process another task in queue.