Skip to main content

Posts

Spark - Distributed Computing and Multi-Threading

Spark on Hadoop works on concept of data locality, such that code is tried to send to data rather then shuffling data to code. We specify following resources when creating a Spark Job -  executor cores executor memory driver cores driver memory number of executors Above property in general determines number of executors and each executor is kind of JVM process with specified cores and memory to use. The executors connects back to your driver program. Now the driver can send them commands.  Data is distributed on HDFS in blocks, and an input split kind of determine size of a data partition. YARN tries to collocate executors as close to data as possible to minimize network transfer of data. A task is a command sent from the driver to an executor by serializing your Function object. The executor deserializes the command, and executes it on a partition. The number of the Spark tasks in a single stage equals to the number of RDD partitions. Now, "executor cores" or " spark.ex

Spark Error - Caused by: org.apache.spark.SparkException: Could not execute broadcast in 300 secs

  While running Spark applications you would have seen below error -  Caused by: org.apache.spark.SparkException: Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1 at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:150) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:154) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:150) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:165) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:162) at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:150) at org.apache.spark.sql.execut

Load Balance or dynamic discovery of HiveServer2 Connection from Beeline or Hive Shell

To provide high availability or load balancing for HiveServer2, Hive provides a function called dynamic service discovery where multiple HiveServer2 instances can register themselves with Zookeeper. Instead of connecting to a specific HiveServer2 directly, clients connect to Zookeeper which returns a randomly selected registered HiveServer2 instance. For example -  Below command connects to Hive Server on MachineA beeline -u "jdbc:hive2://machineA: 10000" Below command connects to Zookeeper Node: to determine one of the available Hive Server's to make a connection beeline -u "jdbc:hive2://machineA:2181,machineB:2181,machineC:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-mob-batch?tez.queue.name=myyarnqueue" We can Create ZNode with Zookeeper as follows -  Open Zookeeper command line interface zookeeper-client Connect to Zookeeper Server connect  machineA:2181,machineB:2181,machineC:2181 Create ZNode create / hiveserver2-mob-batch Manually,

org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow

  We were running an application which was leading to below error -  Job aborted due to stage failure: Task 137 in stage 5.0 failed 4 times, most recent failure: Lost task 137.3 in stage 5.0 (TID 2090, ncABC.hadoop.com, executor 1): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 59606960. To avoid this, increase spark.kryoserializer.buffer.max value. at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:330) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:456) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 59606960 at com.esotericsoftware.kryo.io.Output.require(Output.java:167) at com.esotericsoftware.kryo.io.Output.writeBytes(

Spark MongoDB Connection - Fetched BSON document does not have all the fields solution

  Spark MongoDB connector does not fetch all the fields present in stored BSON document in a collection.  This is because Mongo collection can have documents with different schema. Typically, all documents in a collection are of similar or related purpose. A document is a set of key-value pairs. Documents have dynamic schema. Dynamic schema means that documents in the same collection do not need to have the same set of fields or structure, and common fields in a collection's documents may hold different types of data. So, when we read MongoDB Collection using Spark connector. It infers schema as per first row it may read, which might not consist of fields which are present in subsequent tuples/ rows. Suppose We have a MongoDB collection - default.fruits, and it has following documents -  { "_id" : 1, "type" : "apple"} { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "ban