SPARK running explained - 2
SPARK running explained - 1
1.
YARN Cluster Manager – Basic YARN architecture
is described as below, and it is similar to Spark Standalone Cluster Manager.
Main components –
a. Resource
manager – (Just like Spark Master process)
b. Node
manager – (similar to Spark’s worker processes)
Unlike running on Spark’s standalone cluster,
applications on YARN run in containers (JVM processes to which CPU and memory
resources are granted).
There is an “Application Master” for each
application, running in its own container, it’s responsible for requesting
application resources from Resource Manager.
Node managers track resources used by containers and report
to the resource manager.
Below depicts the Spark application (cluster-deploy
mode) running on YARN cluster with 2 nodes-
1. Client
submit application to Resource Manager
2. Resource
Manger asks one node manager to allocate container for Application Master
3. The
node manager launches a container for Application Master, Spark Driver runs in
Application master.
4. Application
Master asks Resource Manager for more containers for executors.
5. After
Resource manager grants resources then Application Master asks node managers to
launch new containers for executors.
6. Node
managers starts executors on behalf of Spark Application master.
7. After
that, executors and driver communicate independently.
Unlike Spark’s workers, YARN’s node managers can launch
more than one container (executor) per application.
3 possible modes of YARN are described as below –
1. Standalone (local) mode - Runs as a single
Java process
2. Pseudo-distributed mode - Runs all
Hadoop daemons (several Java processes) on a single machine
3. Fully distributed mode – Runs various
Java processed on multiple machines
Resource
scheduling in YARN- There are 3 main schedulers that can be plugged in –
1. FIFO scheduler - If two applications
require the same resources, the first application that requests them will be
first served (FIFO).
2. CAPACITY scheduler – It guarantees
capacity for different organization using same cluster. The main unit of
resources scheduled by YARN is a queue. Each queue’s capacity determines the percentage
of cluster resources that can be used by applications submitted to it. A
hierarchy of queues can be set up to reflect a hierarchy of capacity
requirements by organizations, so that sub-queues (sub-organizations) can share
the resources of a single queue and thus not affect others. In a single queue,
the resources are scheduled in FIFO fashion. If enabled, capacity scheduling
can be elastic, meaning it allows organizations to use any excess capacity not
used by others. But, preemption isn’t supported.
3. FAIR Scheduler - The fair scheduler
tries to assign resources in such a way that all applications get (on average)
an equal share. Like the capacity scheduler, it also organizes applications
into queues. This scheduler supports application priorities and minimum
capacity requirements. It enables preemption, meaning when an application demands
resources, the fair scheduler can take some resources from other running applications.
Configuring resources for Spark jobs
Configuring resources for Spark jobs
Set
following properties –
1. --num-executors—Changes
the number of executors
2. --executor-cores—Changes
the number of cores per executor
Driver Memory can be set –
1. --driver-memory
command-line
2. “spark.driver.memory”
configuration parameter
3. SPARK_DRIVER_MEMORY
environment variable
Spark Executor memory can be set-
1. “spark.executor.memory”
configuration parameter
2. SPARK_EXECUTOR_MEMORY
environment variable
3. --executor-memory
command-line
4. “spark.executor.memoryOverhead”
- Additional parameter, determines additional memory beyond the Java Heap that
will be available to YARN containers running Spark executors. This memory is
for JVM process itself
Note - If your executor uses more memory than spark.executor.memory
+ spark.executor.memoryOverhead YARN will shut down the container, and your
jobs will repeatedly fail.
Failing to set spark.executor.memoryOverhead to a
sufficiently high value can lead to problems that are hard to diagnose. Make
sure to specify at least 1024 MB.
Thus, overall container memory can be segmented in to
–
1. Memory
Overhead
2. Spark
Memory
a. Storage
memory (including safety fraction)
b. Shuffle
memory (including safety fraction)
c. Rest
of Heap for Java objects
In cluster-deploy mode – “spark.yarn.driver.memoryOverhead”
– determines the memory overhead of the driver’s container
In client-deploy mode -“spark.yarn.am.memoryOverhead”
- determines the memory overhead of the application master
Other YARN configuration parameters –
1. “yarn.scheduler.maximum-allocation-mb”
= Determines the upper memory limit of YARN containers. The resource manager
won’t allow allocation of larger amounts of memory. The default value is 8192
MB
2. “yarn.scheduler.minimum-allocation-mb”
= Determines the minimum amount of memory the resource manager can allocate.
The resource manager allocates memory only in multiples of this parameter. The
default value is 1024 MB. This parameter should be set to a value small enough
to not waste memory unnecessarily (for example, 256 MB)
3. “yarn.nodemanager.resource.memory-mb”
- Determines the maximum amount of memory YARN can use on a node overall. The
default value is 8192 MB. Should be set total memory available on node minus
memory needed for OS.
Dynamic resource allocation-
Enables
applications to release executors temporarily so that other applications can
use the
allocated
resources. Set “spark.dynamicAllocation.enabled” to true.
You should also enable Spark’s shuffle service, which is used to serve executors’ shuffle
files even after the executors are no longer available. If an executor’s
shuffle files are requested and the executor isn’t available while the service isn’t
enabled, shuffle files will need to be recalculated, which wastes resources.
Therefore, you should always enable the shuffle service when enabling dynamic
allocation. To enable –
1. Add
spark-<version>-shuffle.jar to classpath of all node mangers.
2. Set
“yarn.nodemanager.aux-services” Ã
{ mapreduce_shuffle, spark_shuffle}
3. Set
“yarn.nodemanager.aux-services.spark_shuffle.class” -> “org.apache.spark.network.yarn.YarnShuffleService”
4. Set
“spark.shuffle.service.enabled” -> true
You can
control the number of executors with these parameters:
1. “spark.dynamicAllocation.minExecutors”
– Minimum number of executors for your application
2. “spark.dynamicAllocation.maxExecutors”
- Maximum number of executors for your application
3. “spark.dynamicAllocation.initialExecutors”
- Initial number of executors for your application
Comments
Post a Comment