I had been using Spark & Hive to Insert data in to Table.
I have following table in Hive -
CREATE TABLE `ds_test`(
`name` string)
PARTITIONED BY (
`company` string,
`market` string,
`eventdate` string,
`processdate` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'hdfs://hdpprod/apps/hive/warehouse/ds_test'
TBLPROPERTIES (
'transient_lastDdlTime'='1524769102')
I have following table in Hive -
CREATE TABLE `ds_test`(
`name` string)
PARTITIONED BY (
`company` string,
`market` string,
`eventdate` string,
`processdate` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'hdfs://hdpprod/apps/hive/warehouse/ds_test'
TBLPROPERTIES (
'transient_lastDdlTime'='1524769102')
I was inserting data into table using Hive over SQL like below -
sqlContext.sql("INSERT OVERWRITE TABLE ds_test PARTITION(COMPANY = 'MCOM', MARKET, EVENTDATE, PROCESSDATE) Select name, MARKET, EVENTDATE, PROCESSDATE from Table1")
Above method was working fine. But, we were facing performance problems -
1) We saw that application was running too long
2) Spark UI displayed that all task are completed. But, still we have long running YARN application.
3) On further debug we anaylzed that after Spark task completion data is present in ".hiveStaging" but it was not getting quickly moved to original output location on hdfs.
Then we updated our code to use following to save -
val sql1 = "Select name, COMPANY, MARKET, EVENTDATE, PROCESSDATE from Table1"
val df1 = (sqlContext.sql(sql1 ), "ds_test")
df1._1.write.partitionBy("company", "market", "eventdate", "processdate").insertInto(df1._2)
But, above was throwing following error -
18/04/26 14:10:47 ERROR ApplicationMaster: User class threw exception: java.util.NoSuchElementException: key not found: company
java.util.NoSuchElementException: key not found: company
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$8.apply(InsertIntoHiveTable.scala:172)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$8.apply(InsertIntoHiveTable.scala:172)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:172)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:127)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:276)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:189)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:239)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:221)
On much a analysis we identified that above error results because of Column name case sensitivity.
1) Hive had columns in lower case
2) DataFrame had schema in UPPER case
3) So, we converted schema of DataFrame to lower case and tries again. And it worked.
Also, last but not the least. Performance of application was improved drastically.
Also, last but not the least. Performance of application was improved drastically.
Comments
Post a Comment