Skip to main content

Posts

Spark - Creating Sub Folder while writing to Partitioned Hive Table

  We had been writing to a Partitioned Hive Table and realized that data is being written has sub-folder. For ex- Refer Table definition as below -  Create table T1 ( name string, address string) Partitioned by (process_date string) stored as parquet location '/mytable/a/b/c/org=employee'; While writing to table HDFS path being written looks something like this -  /mytable/a/b/c/org=employee/ process_date=20220812/ org=employee The unnecessary addition of   org=employee after  process_date partition is because Hive Table has location consisting "=" operator, which Hive uses as syntax to determine partition column. Re-defining Table resolves above problem -  Create table T1 ( name string, address string) Partitioned by (process_date string) stored as parquet location '/mytable/a/b/c/employee';

Hive QL Spark SQL - Transform Rows into Columns

  For a Structured Tabular Structure it is many a times required to transform Rows into Columns. This blog explains step by step process which can be executed as one SQL to achieve same.  Lets try to understand with help of below example: where -in , we want to implement / transform input Table into table structure mentioned as output. INPUT_TABLE   topic groupId batchTimeMs Partition  offset  Count  t1  g001  1658173779  0 123  122 t1 g001  1658173779  1 2231 100 t2 g001  1658173779  0 12 11 OUTPUT_TABLE   rowkey:key offset:0 count:0     offset:1  count:1  t1:g001:1658173779  123 122 2231 100 t2:g001:1658173779  12 11 NULL NULL   FIRST STEP - Concat Topic, GroupID, and BatchTimeMS to create RowKey  Create Columns - offsets:0, counts:0, offsets:1, counts:1. Such that Columns has value only when respective partition value matches with column name. SQL as below - select ...

spark-sftp com.springml.spark.sftp org.apache.spark.sql.AnalysisException: Path does not exist:

  We had been facing issue with using spark-sftp Jar while downloading a remote SFTP file and creating Dataframe. Code being executed: val df = spark.read.format("com.springml.spark.sftp") .option("host", HOST) .option("port", PORT) .option("username", UN) .option("password", PWD) .option("fileType", "csv") .option("inferSchema", "true") .option("header", "true") .load(FILENAME) Error response: org.apache.spark.sql.AnalysisException: Path does not exist: wasb://... at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:612) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:595) at scala.collection.TraversableLike$$anonfun$flatMap$1.ap...

Spark - Distributed Computing and Multi-Threading

Spark on Hadoop works on concept of data locality, such that code is tried to send to data rather then shuffling data to code. We specify following resources when creating a Spark Job -  executor cores executor memory driver cores driver memory number of executors Above property in general determines number of executors and each executor is kind of JVM process with specified cores and memory to use. The executors connects back to your driver program. Now the driver can send them commands.  Data is distributed on HDFS in blocks, and an input split kind of determine size of a data partition. YARN tries to collocate executors as close to data as possible to minimize network transfer of data. A task is a command sent from the driver to an executor by serializing your Function object. The executor deserializes the command, and executes it on a partition. The number of the Spark tasks in a single stage equals to the number of RDD partitions. Now, "executor cores" or " spark.ex...

Spark Error - Caused by: org.apache.spark.SparkException: Could not execute broadcast in 300 secs

  While running Spark applications you would have seen below error -  Caused by: org.apache.spark.SparkException: Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1 at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:150) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:154) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:150) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:165) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:162) at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:150) at org.apache.spark.s...