Skip to main content

How to determine Spark JOB Resources over YARN

This is a question which I believe is not answerable in a direct way. Because it depends upon various run-time factors -


  1. Data Distribution
  2. Skewed Data
  3. Operations that we are performing Join, Windowing, etc.
  4. Shuffle Time, GC Overhead, etc
A run-time  analysis of DAG & JOB execution help us tune optimal resources for a Spark Job.

But, we will try to provide very basic answer to this question in the blog. Note that it is a run time behavior, so the answer may not fit all the use cases.

Suppose you have a multi tenant cluster and you determine how much hardware resources is available for your yarn queue or user.

Say you have -

  1. 6 Nodes, and Each node has 16 cores, 64 GB RAM
Also, note the configurations of you Edge Node from where you will trigger the Spark JOB. As multiple Jobs will spawned from same edge node. So, resources of edge node can be a bottleneck too.


  1. 1 core and 1 GB is needed for OS and Hadoop Daemon. So, you have 6 machines with 15 cores and 63 GB RAM.

Number of cores = Concurrent tasks an executor can run 

So we might think, more concurrent tasks for each executor will give better performance. But research shows that
any application with more than 5 concurrent tasks, would lead to bad show. So stick this to 5.

This number came from the ability of executor and not from how many cores a system has. So the number 5 stays same
even if you have double(32) cores in the CPU.

Note that - 
  1. Analyze the DAG, if you see that a executor is processing 1 task at a time then there is no need to provide 5 cores.
  2. Also, try to reduce the number of cores and see performance variation. If reducing number of cores doesn't reduce execution time drastically or you don't have a hard SLA requirements then you might not require 5 cores per executor.
  3. For some use cases, you may require to have Fat executors (Less executors but having more cores) and for some you may require Slim executors (Less cores but more number of executors)
Number of executors:

Coming back to next step, with 5 as cores per executor, and 15 as total available cores in one Node(CPU) - we come to 
3 executors per node.

So with 6 nodes, and 3 executors per node - we get 18 executors. Out of 18 we need 1 executor (java process) for AM/ Driver in YARN we get 17 executors

This 17 is the number we give to spark using --num-executors while running from spark-submit shell command

Note that - 

  1. This a run-time factor as well. If your job is as simple as applying a window function or group by function. Then, if you think logically all the tasks will be processed by 1 executor. So, having too many executors doesn't make sense.
  2. If you can just spawn 17 executors then you would not want to give all 17 to a JOB. There might be other jobs running at same moment in parallel.
  3. You would also want to know execution or completion time for this Job. So, that you can schedule other Jobs accordingly. That way you will know when the space will be released from this Job and can be utilized by other Jobs.
Memory for each executor:

From above step, we have 3 executors  per node. And available RAM is 63 GB

So memory for each executor is 63/3 = 21GB. This, is just random mathematics, you can have less memory. Don't occupy more if you don't need it.

However small overhead memory is also needed to determine the full memory request to YARN for each executor.
Formula for that over head is max(384, .07 * spark.executor.memory)

Calculating that overhead - .07 * 21 (Here 21 is calculated as above 63/3)
                            = 1.47

Since 1.47 GB > 384 MB, the over head is 1.47.
Take the above from each 21 above => 21 - 1.47 ~ 19 GB

So executor memory - 19 GB


Note that -

  1. This is also determined by operations one is performing. Like - if its a map side join then a executor should have sufficient memory for data. In such a scenario, we may require to have Fat executors, meaning if you want to allocate a total of 50 GB RAM then we can have - 
    1. 10 executors, each with 5 GB RAM, or
    2. 5 executors, each with 10GB RAM . This is what we meant by Fat executors.
  2. If there is too much shuffling then we can increase the shuffle partitions and increase the number of executors. That way, we will have a better data distribution of Keys among executors - executing reduce tasks.
  3. Normally, Executor memory controls the execution time period of a task. If we have sufficient memory data will be not spilled to disk. 
  4. Again, determining an optimal memory is a runtime analysis for a JOB. For example - 
    1. If we are executing a window operation like row_number(). And, data is skewed i.e. we have more number of rows for a partition than it may be required to have a big executor with sufficient memory. Note, the processing might be slow in this case but we can increase the memory in such a way that it won't fail because of Memory problems.
    2. In some scenario, we might have a huge data set. But, well distributed among unique keys. In that case, we may require more number of executors but with tiny fraction of memory.
    3. Also note that, we should define container/ executor size in such a way that if its a daily running Job then Job should get sufficient resources on scheduled time for execution. Otherwise, it may disturb stability of system  i.e. a Job may execute successfully today. But, may fail tomorrow due to resource contention.
    4. If needed Salting should be done. So, as to distribute the data set.
    5. Also, say if for a Join we have too many NULL values causing data skew. Then, 
      1. we can filter out all the NULL records.
      2. Do the JOIN
      3. Then union the data set having NULL records (#1) with output data set of JOIN (#2).
    6. One has to analyze the problem and come up with a solution to efficiently utilize memory. Just increasing the executor memory is not a solution in major cases.
    7. Consider a situation, where-in, we have given 45 GB memory to executor and each executor has 10 cores. On run-time, this job is failing with "Out of Memory - Required memory exceeds available". When we analyze tasks, we get a better picture - 
      1. There can be data skew leading to tasks failure, or
      2. Say, a task is reading 4 GB data and joining with some other data set. Now, we say that this executor can execute 10 tasks in parallel (10 cores ). Chances are that it will go out of memory because, as tasks execute 10*4 ~ 40 GB or more memory will be used. A possible solution can be - 
        1. Reduce the parallelism to 4.
        2. Reduce the memory to 21GB
        3. Increase the number of executor by respective amount.
        4. That way, tasks will be parallelized across executors. Rather than one executor executing too many tasks.
Driver Cores: 

Mostly, having default driver cores of 2 is sufficient. But, in case we have some parallel computations i.e say if there are more than 1 JOB's executing in parallel within a spark application then to speed up tasks allocation to executors, we may want to have more number of cores for driver.

Driver Memory:

Driver-memory flag controls the amount of memory to allocate for a driver, which is 1GB by default and should be increased in case you call a collect() or take(N) action on a large RDD inside your application.

Spark driver is the cockpit of jobs and tasks execution. It also hosts WEB UI for the environment.It splits a Spark application into tasks and schedules them to run on executors. A driver is where the task scheduler lives and spawns tasks across workers. A driver coordinates workers and overall execution of tasks. Driver requires many additional services like - ShuffleManager, HTTPFileServer, etc

Thus, driver memory should be set appropriately. So, as to avoid out of memory errors.

Dynamic Allocation
If you are still not sure then you can try dynamic allocation of resources. I avoid using that because Firstly, a JOB can eat up whole cluster (, which may not be required). Secondly, you will turn on Fair Scheduling with preemption: that might result in to lost executors or computation for a JOB, which can result unpredictable behavior for a JOB i.e. it might complete in 10 minutes, or 1 hour or may fail sometimes.

Its a good feature to try. But, as your cluster grows ( in terms of number of running Jobs) you might want to fine tune your applications, or  if you have sufficient money or budget to add more resources or nodes to cluster than may be fine tuning is not needed. 

Comments

Popular posts

Read from a hive table and write back to it using spark sql

In context to Spark 2.2 - if we read from an hive table and write to same, we get following exception- scala > dy . write . mode ( "overwrite" ). insertInto ( "incremental.test2" ) org . apache . spark . sql . AnalysisException : Cannot insert overwrite into table that is also being read from .; org . apache . spark . sql . AnalysisException : Cannot insert overwrite into table that is also being read from .; 1. This error means that our process is reading from same table and writing to same table. 2. Normally, this should work as process writes to directory .hiveStaging... 3. This error occurs in case of saveAsTable method, as it overwrites entire table instead of individual partitions. 4. This error should not occur with insertInto method, as it overwrites partitions not the table. 5. A reason why this happening is because Hive table has following Spark TBLProperties in its definition. This problem will solve for insertInto met




Hive Parse JSON with Array Columns and Explode it in to Multiple rows.

 Say we have a JSON String like below -  { "billingCountry":"US" "orderItems":[       {          "itemId":1,          "product":"D1"       },   {          "itemId":2,          "product":"D2"       }    ] } And, our aim is to get output parsed like below -  itemId product 1 D1 2 D2   First, We can parse JSON as follows to get JSON String get_json_object(value, '$.orderItems.itemId') as itemId get_json_object(value, '$.orderItems.product') as product Second, Above will result String value like "[1,2]". We want to convert it to Array as follows - split(regexp_extract(get_json_object(value, '$.orderItems.itemId'),'^\\["(.*)\\"]$',1),'","') as itemId split(regexp_extract(get_json_object(value, '$.orderItems.product'),'^\\["(.*)\\"]$',1),&




Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary

Exception -  Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:44) at org.apache.spark.sql.execution.vectorized.ColumnVector.getUTF8String(ColumnVector.java:645) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) Analysis - This might occur because of data type mismatch between Hive Table & written Parquet file. Solution - Correct the data type to match between Hive Table & Parquet




org.apache.spark.sql.AnalysisException: Cannot overwrite a path that is also being read from.;

  Caused by: org.apache.spark.sql.AnalysisException: Cannot overwrite a path that is also being read from.; at org.apache.spark.sql.execution.command.DDLUtils$.verifyNotReadPath(ddl.scala:906) at org.apache.spark.sql.execution.datasources.DataSourceAnalysis$$anonfun$apply$1.applyOrElse(DataSourceStrategy.scala:192) at org.apache.spark.sql.execution.datasources.DataSourceAnalysis$$anonfun$apply$1.applyOrElse(DataSourceStrategy.scala:134) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) at org.apache.spark.sql.execution.datasources.DataSourceAnalysis.apply(DataSourceStrategy.scala:134) at org.apache.spark.sql.execution.datasource




Hadoop Distcp Error Duplicate files in input path

  One may face following error while copying data from one cluster to other, using Distcp  Command: hadoop distcp -i {src} {tgt} Error: org.apache.hadoop.toolsCopyListing$DulicateFileException: File would cause duplicates. Ideally there can't be same file names. So, what might be happening in your case is you trying to copy partitioned table from one cluster to other. And, 2 different named partitions have same file name. Your solution is to correct Source path  {src}  in your command, such that you provide path uptil partitioned sub directory, not the file. For ex - Refer below : /a/partcol=1/file1.txt /a/partcol=2/file1.txt If you use  {src}  as  "/a/*/*"  then you will get the error  "File would cause duplicates." But, if you use  {src}  as  "/a"  then you will not get error in copying.