Skip to main content

Posts

Showing posts from 2020

java.lang.NoClassDefFoundError: org/apache/hbase/thirdparty/com/google/common/cache/CacheLoader

  User class threw exception: java.lang.NoClassDefFoundError: org/apache/hbase/thirdparty/com/google/common/cache/CacheLoader at org.apache.spark.sql.execution.datasources.hbase.HBaseConnectionKey.liftedTree1$1(HBaseConnectionCache.scala:188) at org.apache.spark.sql.execution.datasources.hbase.HBaseConnectionKey.<init>(HBaseConnectionCache.scala:187) at org.apache.spark.sql.execution.datasources.hbase.HBaseConnectionCache$.getConnection(HBaseConnectionCache.scala:144) at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.createTableIfNotExist(HBaseRelation.scala:126) at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:63) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) Reason - ClassNotFound or NoClassDefFoundError Exception occurs when a third party

Spark - java.lang.AssertionError: assertion failed

  Spark SQL fails to read data from a ORC hive table that has a new column added to it. Giving Exception -  java.lang.AssertionError: assertion failed at scala.Predef$. assert (Predef.scala:165) at org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$1.apply(LogicalRelation.scala:39) at org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$1.apply(LogicalRelation.scala:38) at scala.Option.map(Option.scala:145) This happens when following property is set -  spark.sql.hive.convertMetastoreOrc= true Solution - Comment out property if being set explicitly or set it to false. Refer  https://issues.apache.org/jira/browse/SPARK-18355

org.apache.spark.sql.AnalysisException: Cannot overwrite a path that is also being read from.;

  Caused by: org.apache.spark.sql.AnalysisException: Cannot overwrite a path that is also being read from.; at org.apache.spark.sql.execution.command.DDLUtils$.verifyNotReadPath(ddl.scala:906) at org.apache.spark.sql.execution.datasources.DataSourceAnalysis$$anonfun$apply$1.applyOrElse(DataSourceStrategy.scala:192) at org.apache.spark.sql.execution.datasources.DataSourceAnalysis$$anonfun$apply$1.applyOrElse(DataSourceStrategy.scala:134) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) at org.apache.spark.sql.execution.datasources.DataSourceAnalysis.apply(DataSourceStrategy.scala:134) at org.apache.spark.sql.execution.datasource

Kylin- Building Cube using MR Job throws java.lang.ArrayIndexOutOfBoundsException

Caused by: java.lang.ArrayIndexOutOfBoundsException at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1453) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1349) at java.io.DataOutputStream.writeInt(DataOutputStream.java:197 ) at org.apache.hadoop.io.BytesWritable.write(BytesWritable.java:188) Solution- Set kylin.engine.mr.config-override. mapreduce.task.io.sort.mb  to 1024 

Spark 2 Application Errors & Solutions

Exception -  Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Not enough memory to build and broadcast This is Driver Exception and can be solved by  setting spark.sql.autoBroadcastJoinThreshold to -1 Or, increasing --driver-memory Exception -  Container  is running beyond physical memory limits. Current usage: X GB of Y GB physical memory used; X GB of Y GB virtual memory used. Killing container YARN killed container as it was exceeding memory limits. Increase  --driver-memory --executor-memory   Exception - ERROR Executor: Exception in task 600 in stage X.X (TID 12345) java.lang.OutOfMemoryError: GC overhead limit exceeded This means that Executor JVM was spending more time in Garbage collection than actual execution.  This JVM feature can be disabled by adding -XX:-UseGCOverheadLimit Increasing Executor memory may help --executor-memory Make data more distributed so that it is not skewed to one executor. Might use parallel GC -XX:+UseParallelGC or -XX

Spark 2 - DataFrame.withColumn() takes time - Performance Optimization

We wrote a program that iterates and add columns to Spark dataframe.  We already had a table which had 364 columns and we wanted a final dataframe with 864 columns. Thus, we wrote a program like below -   existingColumns.filter(x => !colList.contains(x)).foreach { x =>       {         //stagingDF = stagingDF.withColumn(x, lit(null).cast(StringType))       } We realized that this caused program to take almost 20 minutes to add 500 columns to dataframe. On analysis, we found that  Spark updates Projection after every withColumn() method invocation. So, instead of updating dataframe. We created a StringBuilder and appended columns to that and Finally built a Select SQL out of it and executed it on dataframe:     val comma = ","     val select ="select "     val tmptable="tmptable"     val from =" from "     val strbuild = new StringBuilder()     val col = " cast(null as string) as "     val colStr = colList.mkString(comma)     strbui

Hive SQL: Multi column explode, Creating Map from Array & Rows to Column

 Say, you have a Table like below -  A [K1, K2, K3] [V1,V2,V3] B [K2] [V2] C [K1,K2,K3,K4,K5] [V1,V2,V3,V4,V5]   And you want a final table like below :   K1 K2 K3 K4 K5 A V1 V2 V3     B   V2       C V1 V2 V3 V4 V5 It Can be done with SQL -  select id, arrMap['K1'] K1, arrMap['K2'] K2, arrMap['K3'] K3, arrMap['K4'] K4, arrMap['K5'] K5, from (select id, str_to_map(concat_ws(',', (collect_list(kv)))) arrMap from (select id, n.pos, concat_ws(':', n.attribute, value[n.pos]) kv from  INPUT_TABLE lateral view posexplode(attributes) n as pos, attribute )T group by id) T1; Explanation: 1) Inner query explodes multip

Spark: java.lang.ClassCastException: org.apache.hadoop.hive.serde2.io.DoubleWritable cannot be cast to org.apache.hadoop.io.Text

 Reason-  This occurs because underlying Parquet or ORC file has column with Double Type , whereas Hive reads it as String. Exception -   Caused by: java.lang.ClassCastException: org.apache.hadoop.hive.serde2.io.DoubleWritable cannot be cast to org.apache.hadoop.io.Text at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector.getPrimitiveWritableObject(WritableStringObjectInspector.java:41) at org.apache.spark.sql.hive.HiveInspectors$$anonfun$unwrapperFor$23.apply(HiveInspectors.scala:547) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$15.apply(TableReader.scala:426) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$15.apply(TableReader.scala:426) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:442) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:433) Solution -  One way is to correct Schema, either update Hiv

Error in Installing Zeppelin Interpreter

Error -  Caused by: org.sonatype.aether.transfer.ArtifactNotFoundException: Could not find artifact org.apache.zeppelin:zeppelin-shell:jar:0.8.0 Solution -  export JAVA_TOOL_OPTIONS="-Dzeppelin.interpreter.dep.mvnRepo=http://insecure.repo1.maven.org/maven2/" Then execute or install interpreter -  /usr/hdp/current/zeppelin-server/bin/install-interpreter.sh --name shell

Spark HBase Connector (SHC) - Unsupported Primitive datatype null

While writing Spark DataFrame to HBase Table you may observe following exception - Caused by: java.lang.UnsupportedOperationException: PrimitiveType coder: unsupported data type null         at org.apache.spark.sql.execution.datasources.hbase.types.PrimitiveType.toBytes(PrimitiveType.scala:61)         at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation$$anonfun$org$apache$spark$sql$execution$datasources$hbase$HBaseRelation$$convertToPut$1$1.apply(HBaseRelation.scala:213)         at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation$$anonfun$org$apache$spark$sql$execution$datasources$hbase$HBaseRelation$$convertToPut$1$1.apply(HBaseRelation.scala:209)         at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) There are suggestions to Upgrade SHC-Core jar file. But, it didn't work for us. Rather it started giving following error -  Caused by: org.apache.spark.sql.execution.datasources.hbase.InvalidRegionNumberE

Spark reading Parquet table gives Null records whereas it works from Hive

Spark reading Parquet table gives Null records whereas it works from Hive. if we read a parquet table from Hive it is working - select * from Table_Parq limit 7; 1 2 3 4 5 6 7 Whereas same doesn't work with Spark - select * from Table_Parq limit 7; NULL NULL NULL NULL NULL NULL NULL It may be because Parquet file has different Schema then Hive Metastore, may be column names are in different case. Solution -  Read Parquet File on HDFS then Hive Table , or,  Set following properties -  set spark.sql.caseSensitive=false; set spark.sql.hive.convertMetastoreParquet=false;    

SQL to find Unit whose value equals and not in

Say, we have table_1 like -  unit | req_type A     |   1 A     |   2 B     |   2 B     |   3 D     |   2 E     |   2 E     |   4 We have to write a SQL such that,  we can select Unit with Req_Type=2 Also, that Unit should not have any other Req_Type in list. Ex - 1,3 Solution -  We can add another column such that -  case when  req_type in (1,3) then 1 else 0 end col1 unit | req_type | col1 A     |   1          |   1 A     |   2          |   0 B     |   2          |   0 B     |   3          |   1 D     |   2          |   0 E     |   2          |   0 E     |   4          |   0 Then we can do a group by and take count(col1) unit | c A     | 1 B     | 1 D     | 0 E     | 0 Then we can select all rows which have count = 0 Compete SQL -  select unit from (select unit, sum(col1) c  from (select unit, req_type, case when  req_type in (1,3) then 1 else 0 end col1 from  table_1) group by unit) where c=0

Non-Equi Left Join with Hive & Spark

A non-equi left join is a join between 2 tables in which join condition is given by operators other than equal to "=". For example - Equi Join - "Table1 join Table2 on (T1.c1=T2.c1)" Non-Equi Join - "Table1 join Table2 on (T2.c1 > T1.c1 and T2.c1<T1.c2)" Hive does not support Non-Equi join. Spark supports Non-Equi join. But, we have seen that it only works when there is a combination of Equi & Non-Equi columns. For example -  Table1 join Table2 on (T1.c3=T2.c3 and T2.c1 > T1.c1 and T2.c1<T1.c2) But, if you just have Non-Equi condition then Spark job fails after sometime with "Future Timeout" exception   Table1 join Table2 on (T2.c1 > T1.c1 and T2.c1<T1.c2) In that scenario, we have to identify additional column on which Equi condition can be specified . It can be a hash value column or something else that matches in Table 1 and Table 2. For example - We had 2 tables and wanted to check if IP is between sta

Spark - Data Skew - Join Optimization

Sometimes data is unevenly distributed leading to data skew. What it means is a partition has more data due to same/ related keys compared to other partitions. In case of Joins and Aggregations , all  data for same key should be co-located, may be processed by one container/ executor . This may be lead to slowness of application. Solution - If data is small than smaller data set can be broadcasted . Thus, increasing join efficiency. This is governed by property - spark.sql.autoBroadcastJoinThreshold Identify if there are too many NULL values then filter them out before joining. And , process records with NULL keys separately then do a union with renaming data set.   Salting - To understand salting, Lets understand problem with an example -  Table 1  Key 1 1 1 Table 2 Key 1 1 On joining Table1 with Table 2,  Since this is same key all data should be shuffled to same container or one JVM, which will return 3*2 rows . like  K1   K2 1 * 1

Spark - Metastore slow, or stuck or Hanged.

We have seen that Spark connects to Hive Metastore. And, sometimes it takes too long to get connected as Metastore is slow. Also, we have seen that there is no automatic load balancing between Hive Meta stores. Solution - Out of all available Hive Metastore connections- we can externally test a working Metastore in a script then set that URL while invoking Spark Job as follows - spark-sql --conf "spark.hadoop.hive.metastore.uris=thrift://myMetaIp:9083" -e "show databases" Note - This can also be used to set Hive Metastore or other properties externally from Spark.

Setup DbVisualizer or Dbeaver or Data grip or other tools to access Secure Kerberos hadoop Cluster from remote windows machine

If you are on Windows 10 you should already be having utilities like - kinit, kutil, etc available on your machine. If not then install MIT Kerberos - http://web.mit.edu/kerberos/ Here are the steps -  Copy /etc/krb5.conf from any node of your cluster to your local machine Also, copy *.keytab file from cluster to your local machine. Rename krb5.conf as krb5.ini Copy krb5.ini to -  <Java_home>\jre\lib\security\ C:\Users\<User_name>\ Copy keytab file to -  C:\Users\<User_name>\ On Hadoop Cluster get the principal name from keytab file -  ktutil ktutil:  read_kt keytab ktutil:  list slot KVNO Principal ---- ---- ---------------------------------------------------------------------    1    1 <username>@HADOOP.domain.COM ktutil:  quit Above highlighted is your principal name. On windows -  Execute: kinit -k -t C:\Users\<User_name>\*.keytab <username>@HADOOP.domain.COM This will generate a ticket in user h

Un-escape HTML/ XML character entities

1.     Below characters in XML are escape characters. For example – Escape Charters Actual Character &gt; >   &lt; <   &#x00E1; Latin small letter a with acute (á) T   To un-escape characters  use  Apache Commons library.  For example –      //line of code       println(org.apache.commons.lang3.StringEscapeUtils.unescapeHtml4("&lt; S&#x00E1;chdev Dinesh &gt;"))         //This will result -     < Sáchdev Dinesh >

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results is bigger than spark.driver.maxResultSize

Exception - Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 122266 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) Cause - This happens when we try to collect a Dataframe / RDD on driver and the size of data is more than set by property. Solution - Set :- --conf "spark.driver.maxResultSize=4g"

Copy Phoenix (HBase) Table with different name

It can be done in 2 steps - Just create a new Phoenix Table with a different name but same schema as existing Table. Use below HBase command that will eventually execute a MR Job to copy the data -  hbase org.apache.hadoop.hbase.mapreduce.CopyTable --new.name="<Name of new HBase Table>" "<Name of existing HBase Table>"

Access AWS S3 or HCP HS3 (Hitachi) using Hadoop or HDFS or Distcp

Create Credentials File for S3 Keys hadoop credential create fs.s3a.access.key -value <Access_KEY> -provider localjceks://file/$HOME/aws-dev-keys.jceks hadoop credential create fs.s3a.secret.key -value <Secret_KEY> -provider localjceks://file/$HOME/aws-dev-keys.jceks Where -  <Access_KEY>- S3 access key <Secret_KEY> - S3 secret key Note -  this will create a file local file system, in home directory with name aws-dev-keys.jceks Put this file to HDFS. For, distributed access. To list the details execute below command-  hadoop credential list -provider localjceks://file/$HOME/aws-dev-keys.jceks List files in S3 Bucket with hadoop Shell hdfs dfs -Dhadoop.security.credential.provider.path=jceks://hdfs/myfilelocation/aws-dev-keys.jceks -ls s3a://s3bucketname/ hdfs dfs -Dfs.s3a.access.key=<Access_KEY> -Dfs.s3a.secret.key=<Secret_KEY> -ls s3a://aa-daas-ookla/ Note - Similarly, other hadoop/ hdfs commands

Install AWS Cli in a Virtual Environment

Create a Virtual Environment for your project mkdir $HOME/py36venv python3 -m venv $HOME/py36venv Activate 3.6 virtual Environment source $HOME/py36venv/bin/activate Install AWS Commandline pip install awscli chmod 755 $HOME/py36venv/bin/aws aws --version aws configure AWS Access Key ID [None]: ---------------------- AWS Secret Access Key [None]: ----+----+--------------- Default region name [None]: us-east-2 Default output format [None]: aws s3 ls aws s3 sync local_dir/ s3://my-s3-bucket aws s3 sync s3://my-s3-bucket local_dir/

spark.sql.utils.AnalysisException: cannot resolve 'INPUT__FILE__NAME'

I have a Hive SQL - select regexp_extract(`unenriched`.` input__file__name `,'[^/]*$',0) `SRC_FILE_NM from dl.table1; This query fails running with Spark - spark . sql . utils . AnalysisException : u "cannot resolve 'INPUT__FILE__NAME' given input columns: Anaylsis- INPUT__FILE__NAME is a Hive specific virtual column and it is not supported in Spark. Solution- Spark provides input_file_name function which should work in a similar way: SELECT input_file_name() FROM df but it requires Spark 2.0 or later to work correctly with Spark.

Spark / Hive - org.apache.hadoop.hive.serde2.io.DoubleWritable cannot be cast to org.apache.hadoop.io.Text

Exception - java.lang.ClassCastException: org.apache.hadoop.hive.serde2.io.DoubleWritable cannot be cast to org.apache.hadoop.io.Text at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector.getPrimitiveWritableObject(WritableStringObjectInspector.java:41) at org.apache.spark.sql.hive.HiveInspectors$$anonfun$unwrapperFor$23.apply(HiveInspectors.scala:547) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$15.apply(TableReader.scala:426) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$15.apply(TableReader.scala:426) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:442) This exception may be occurring because underlying ORC File has a column with data Type Double, whereas Hive table has column type as String. This error can be rectified by correcting the data type.