While running Spark applications you would have seen below error -
Caused by: org.apache.spark.SparkException: Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1 at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:150) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:154) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:150) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:165) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:162) at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:150) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
Mostly, When people receive this error they try to set spark.sql.autoBroadcastJoinThreshold to -1
, which will actually turn off BroadcastJoins resulting poor performance of Spark Jobs.
Ideally, we should analyze why Spark is not able broadcast Data in 300 Seconds / 5 minutes which is less in size i.e spark.sql.autoBroadcastJoinThreshold - 10 MB
A few situation that we have seen is -
- Network slowness which is hindering data broadcast. In enterprise applications, we see this behavior rarely.
- For one of the application- we analyzed that tasks are taking time in scanning files from HDFS/ Hive. There were 612 Tasks which needs to complete to broadcast the data.
- So, we analyzed that Spark Stage fails after completing 300 Tasks out of 617 in 5 minutes. We had been running with 5 executors, each with 3 cores.
- Thus, we had two options -
- First, Increase spark.sql.broadcastTimeout to 600 Seconds.
- Second, Increase Number of Cores/ Executors. So, as to increase parallelism. Thus, we increased executors to 10 and each with 4 cores.
Option Second worked for us, providing best performance and SLA at dispense of providing more resources to Job. Also, avoiding delay introduced by SortMegerJoin due to disabled Broadcast join.
Comments
Post a Comment