Skip to main content

How to determine Spark JOB Resources over YARN

This is a question which I believe is not answerable in a direct way. Because it depends upon various run-time factors -


  1. Data Distribution
  2. Skewed Data
  3. Operations that we are performing Join, Windowing, etc.
  4. Shuffle Time, GC Overhead, etc
A run-time  analysis of DAG & JOB execution help us tune optimal resources for a Spark Job.

But, we will try to provide very basic answer to this question in the blog. Note that it is a run time behavior, so the answer may not fit all the use cases.

Suppose you have a multi tenant cluster and you determine how much hardware resources is available for your yarn queue or user.

Say you have -

  1. 6 Nodes, and Each node has 16 cores, 64 GB RAM
Also, note the configurations of you Edge Node from where you will trigger the Spark JOB. As multiple Jobs will spawned from same edge node. So, resources of edge node can be a bottleneck too.


  1. 1 core and 1 GB is needed for OS and Hadoop Daemon. So, you have 6 machines with 15 cores and 63 GB RAM.

Number of cores = Concurrent tasks an executor can run 

So we might think, more concurrent tasks for each executor will give better performance. But research shows that
any application with more than 5 concurrent tasks, would lead to bad show. So stick this to 5.

This number came from the ability of executor and not from how many cores a system has. So the number 5 stays same
even if you have double(32) cores in the CPU.

Note that - 
  1. Analyze the DAG, if you see that a executor is processing 1 task at a time then there is no need to provide 5 cores.
  2. Also, try to reduce the number of cores and see performance variation. If reducing number of cores doesn't reduce execution time drastically or you don't have a hard SLA requirements then you might not require 5 cores per executor.
  3. For some use cases, you may require to have Fat executors (Less executors but having more cores) and for some you may require Slim executors (Less cores but more number of executors)
Number of executors:

Coming back to next step, with 5 as cores per executor, and 15 as total available cores in one Node(CPU) - we come to 
3 executors per node.

So with 6 nodes, and 3 executors per node - we get 18 executors. Out of 18 we need 1 executor (java process) for AM/ Driver in YARN we get 17 executors

This 17 is the number we give to spark using --num-executors while running from spark-submit shell command

Note that - 

  1. This a run-time factor as well. If your job is as simple as applying a window function or group by function. Then, if you think logically all the tasks will be processed by 1 executor. So, having too many executors doesn't make sense.
  2. If you can just spawn 17 executors then you would not want to give all 17 to a JOB. There might be other jobs running at same moment in parallel.
  3. You would also want to know execution or completion time for this Job. So, that you can schedule other Jobs accordingly. That way you will know when the space will be released from this Job and can be utilized by other Jobs.
Memory for each executor:

From above step, we have 3 executors  per node. And available RAM is 63 GB

So memory for each executor is 63/3 = 21GB. This, is just random mathematics, you can have less memory. Don't occupy more if you don't need it.

However small overhead memory is also needed to determine the full memory request to YARN for each executor.
Formula for that over head is max(384, .07 * spark.executor.memory)

Calculating that overhead - .07 * 21 (Here 21 is calculated as above 63/3)
                            = 1.47

Since 1.47 GB > 384 MB, the over head is 1.47.
Take the above from each 21 above => 21 - 1.47 ~ 19 GB

So executor memory - 19 GB


Note that -

  1. This is also determined by operations one is performing. Like - if its a map side join then a executor should have sufficient memory for data. In such a scenario, we may require to have Fat executors, meaning if you want to allocate a total of 50 GB RAM then we can have - 
    1. 10 executors, each with 5 GB RAM, or
    2. 5 executors, each with 10GB RAM . This is what we meant by Fat executors.
  2. If there is too much shuffling then we can increase the shuffle partitions and increase the number of executors. That way, we will have a better data distribution of Keys among executors - executing reduce tasks.
  3. Normally, Executor memory controls the execution time period of a task. If we have sufficient memory data will be not spilled to disk. 
  4. Again, determining an optimal memory is a runtime analysis for a JOB. For example - 
    1. If we are executing a window operation like row_number(). And, data is skewed i.e. we have more number of rows for a partition than it may be required to have a big executor with sufficient memory. Note, the processing might be slow in this case but we can increase the memory in such a way that it won't fail because of Memory problems.
    2. In some scenario, we might have a huge data set. But, well distributed among unique keys. In that case, we may require more number of executors but with tiny fraction of memory.
    3. Also note that, we should define container/ executor size in such a way that if its a daily running Job then Job should get sufficient resources on scheduled time for execution. Otherwise, it may disturb stability of system  i.e. a Job may execute successfully today. But, may fail tomorrow due to resource contention.
    4. If needed Salting should be done. So, as to distribute the data set.
    5. Also, say if for a Join we have too many NULL values causing data skew. Then, 
      1. we can filter out all the NULL records.
      2. Do the JOIN
      3. Then union the data set having NULL records (#1) with output data set of JOIN (#2).
    6. One has to analyze the problem and come up with a solution to efficiently utilize memory. Just increasing the executor memory is not a solution in major cases.
    7. Consider a situation, where-in, we have given 45 GB memory to executor and each executor has 10 cores. On run-time, this job is failing with "Out of Memory - Required memory exceeds available". When we analyze tasks, we get a better picture - 
      1. There can be data skew leading to tasks failure, or
      2. Say, a task is reading 4 GB data and joining with some other data set. Now, we say that this executor can execute 10 tasks in parallel (10 cores ). Chances are that it will go out of memory because, as tasks execute 10*4 ~ 40 GB or more memory will be used. A possible solution can be - 
        1. Reduce the parallelism to 4.
        2. Reduce the memory to 21GB
        3. Increase the number of executor by respective amount.
        4. That way, tasks will be parallelized across executors. Rather than one executor executing too many tasks.
Driver Cores: 

Mostly, having default driver cores of 2 is sufficient. But, in case we have some parallel computations i.e say if there are more than 1 JOB's executing in parallel within a spark application then to speed up tasks allocation to executors, we may want to have more number of cores for driver.

Driver Memory:

Driver-memory flag controls the amount of memory to allocate for a driver, which is 1GB by default and should be increased in case you call a collect() or take(N) action on a large RDD inside your application.

Spark driver is the cockpit of jobs and tasks execution. It also hosts WEB UI for the environment.It splits a Spark application into tasks and schedules them to run on executors. A driver is where the task scheduler lives and spawns tasks across workers. A driver coordinates workers and overall execution of tasks. Driver requires many additional services like - ShuffleManager, HTTPFileServer, etc

Thus, driver memory should be set appropriately. So, as to avoid out of memory errors.

Dynamic Allocation
If you are still not sure then you can try dynamic allocation of resources. I avoid using that because Firstly, a JOB can eat up whole cluster (, which may not be required). Secondly, you will turn on Fair Scheduling with preemption: that might result in to lost executors or computation for a JOB, which can result unpredictable behavior for a JOB i.e. it might complete in 10 minutes, or 1 hour or may fail sometimes.

Its a good feature to try. But, as your cluster grows ( in terms of number of running Jobs) you might want to fine tune your applications, or  if you have sufficient money or budget to add more resources or nodes to cluster than may be fine tuning is not needed. 

Comments

Popular posts

Spark MongoDB Connector Not leading to correct count or data while reading

  We are using Scala 2.11 , Spark 2.4 and Spark MongoDB Connector 2.4.4 Use Case 1 - We wanted to read a Shareded Mongo Collection and copy its data to another Mongo Collection. We noticed that after Spark Job successful completion. Output MongoDB did not had many records. Use Case 2 -  We read a MongoDB collection and doing count on dataframe lead to different count on each execution. Analysis,  We realized that MongoDB Spark Connector is missing data on bulk read as a dataframe. We tried various partitioner, listed on page -  https://www.mongodb.com/docs/spark-connector/v2.4/configuration/  But, none of them worked for us. Finally, we tried  MongoShardedPartitioner  this lead to constant count on each execution. But, it was greater than the actual count of records on the collection. This seems to be limitation with MongoDB Spark Connector. But,  MongoShardedPartitioner  seemed closest possible solution to this kind of situation. But, it per...




Scala Spark building Jar leads java.lang.StackOverflowError

  Exception -  [Thread-3] ERROR scala_maven.ScalaCompileMojo - error: java.lang.StackOverflowError [Thread-3] INFO scala_maven.ScalaCompileMojo - at scala.collection.generic.TraversableForwarder$class.isEmpty(TraversableForwarder.scala:36) [Thread-3] INFO scala_maven.ScalaCompileMojo - at scala.collection.mutable.ListBuffer.isEmpty(ListBuffer.scala:45) [Thread-3] INFO scala_maven.ScalaCompileMojo - at scala.collection.mutable.ListBuffer.toList(ListBuffer.scala:306) [Thread-3] INFO scala_maven.ScalaCompileMojo - at scala.collection.mutable.ListBuffer.result(ListBuffer.scala:300) [Thread-3] INFO scala_maven.ScalaCompileMojo - at scala.collection.mutable.Stack$StackBuilder.result(Stack.scala:31) [Thread-3] INFO scala_maven.ScalaCompileMojo - at scala.collection.mutable.Stack$StackBuilder.result(Stack.scala:27) [Thread-3] INFO scala_maven.ScalaCompileMojo - at scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:50) [Thread-3] INFO scala_maven.ScalaCompile...




MongoDB Chunk size many times bigger than configure chunksize (128 MB)

  Shard Shard_0 at Shard_0/xyz.com:27018 { data: '202.04GiB', docs: 117037098, chunks: 5, 'estimated data per chunk': '40.4GiB', 'estimated docs per chunk': 23407419 } --- Shard Shard_1 at Shard_1/abc.com:27018 { data: '201.86GiB', docs: 116913342, chunks: 4, 'estimated data per chunk': '50.46GiB', 'estimated docs per chunk': 29228335 } Per MongoDB-  Starting in 6.0.3, we balance by data size instead of the number of chunks. So the 128MB is now only the size of data we migrate at-a-time. So large data size per chunk is good now, as long as the data size per shard is even for the collection. refer -  https://www.mongodb.com/community/forums/t/chunk-size-many-times-bigger-than-configure-chunksize-128-mb/212616 https://www.mongodb.com/docs/v6.0/release-notes/6.0/#std-label-release-notes-6.0-balancing-policy-changes




AWS EMR Spark – Much Larger Executors are Created than Requested

  Starting EMR 5.32 and EMR 6.2 you can notice that Spark can launch much larger executors that you request in your job settings. For example - We started a Spark Job with  spark.executor.cores  =   4 But, one can see that the executors with 20 cores (instead of 4 as defined by spark.executor.cores) were launched. The reason for allocating larger executors is that there is a AWS specific Spark option spark.yarn.heterogeneousExecutors.enabled (exists in EMR only, does not exist in Open Source Spark) that is set to true by default that combines multiple executor creation requests on the same node into a larger executor container. So as the result you have fewer executor containers than you expected, each of them has more memory and cores that you specified. If you disable this option (--conf "spark.yarn.heterogeneousExecutors.enabled=false"), EMR will create containers with the specified spark.executor.memory and spark.executor.cores settings and will not co...




Hive Count Query not working

Hive with Tez execution engine -  count(*) not working , returning 0 results.  Solution -  set hive.compute.query.using.stats=false Refer -  https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties hive.compute.query.using.stats Default Value:  false Added In: Hive 0.13.0 with  HIVE-5483 When set to true Hive will answer a few queries like min, max, and count(1) purely using statistics stored in the metastore. For basic statistics collection, set the configuration property  hive.stats.autogather   to true. For more advanced statistics collection, run ANALYZE TABLE queries.