Skip to main content

Spark Streaming Kafka Errors

 

We observed couple of errors / info messages while running Spark Streaming Applications, which might come  handy for debugging in future. Refer below - 

1) We noticed that Spark streaming Job was running but consuming nothing. We could see following messages being iteratively printed in logs - 


22/11/18 08:13:53 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=difnrt-uat-001] Group coordinator mymachine.com:9093 (id: 601150796 rack: null) is unavailable or invalid, will attempt rediscovery

22/11/18 08:13:53 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=difnrt-uat-001] Discovered group coordinator mymachine.com:9093 (id: 601150796 rack: null)

22/11/18 08:13:53 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=difnrt-uat-001] (Re-)joining group


Possible Causes / Fixes - 

  • May be due to Kafka Coordinator service. Try restarting Kafka it may fix the issue. 
  • In our case, we couldn't get help from Admin Team. So, we changed "group.id" which resolved issue for us.

2) Noticed Spark Streaming Job giving below error while producing messages to Kafka - 


org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.


Possible Causes / Fixes - 
  • It means that Client thread is being timed out before data is being written to Bootstrap server. There can be more reasons which can actually be derived from Kafka Server logs. Client logs don't provide much information.
  • In our case, it was firewall issue with some nodes in cluster which could not connect to Kafka Bootstrap servers. So, it used to work but when tasks used to execute on machine which doesn't have firewall open - that's when application used to fail. To identify it - 
    • we did SSH to those nodes 
    • executed below command's to know if connection is possible -
                    telnet <machine> <port> 
                    or,
                    nc -zv <machine> <port>

  • Opening firewall ports helped solve this intermittent problem. 

3) Spark Streaming with Kafka failing with below error -

java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:231)

Possible Causes / Fixes - 
  • The reason for this error is that we are trying to fetch an offset which is greater then the current latest offset known by the broker. There can be multiple reasons for this behavior - 
  • On Kafka Side - 
    • set clean.leader.election.enable to false, such that there are no in-sync replicas.
    • Restart Kafka after there are no in-sync replicas.
    • Now, if offset was maintained by Kafka it should get corrected and re-running Spark Streaming application should work.
  • In my case - it was due to coding error - 
    • We were maintaining offset manually in an HBase Table. 
    • Due to a coding error - we inserted wrong value in HBase Table. 
    • Next time when our Job ran it selected large offset from HBase Table which did not exists for the topic resulting in this error.
    • We corrected our code and offset entries which resolved the issue.

Comments

Popular posts

Hive Parse JSON with Array Columns and Explode it in to Multiple rows.

 Say we have a JSON String like below -  { "billingCountry":"US" "orderItems":[       {          "itemId":1,          "product":"D1"       },   {          "itemId":2,          "product":"D2"       }    ] } And, our aim is to get output parsed like below -  itemId product 1 D1 2 D2   First, We can parse JSON as follows to get JSON String get_json_object(value, '$.orderItems.itemId') as itemId get_json_object(value, '$.orderItems.product') as product Second, Above will result String value like "[1,2]". We want to convert it to Array as follows - split(regexp_extract(get_json_object(value, '$.orderItems.itemId'),'^\\["(.*)\\"]$',1),'","') as itemId split(regexp_extract(get_json_object(value, '$.orderItems.product'),'^\\["(.*)\\"]$',1),&




Read from a hive table and write back to it using spark sql

In context to Spark 2.2 - if we read from an hive table and write to same, we get following exception- scala > dy . write . mode ( "overwrite" ). insertInto ( "incremental.test2" ) org . apache . spark . sql . AnalysisException : Cannot insert overwrite into table that is also being read from .; org . apache . spark . sql . AnalysisException : Cannot insert overwrite into table that is also being read from .; 1. This error means that our process is reading from same table and writing to same table. 2. Normally, this should work as process writes to directory .hiveStaging... 3. This error occurs in case of saveAsTable method, as it overwrites entire table instead of individual partitions. 4. This error should not occur with insertInto method, as it overwrites partitions not the table. 5. A reason why this happening is because Hive table has following Spark TBLProperties in its definition. This problem will solve for insertInto met




Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary

Exception -  Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:44) at org.apache.spark.sql.execution.vectorized.ColumnVector.getUTF8String(ColumnVector.java:645) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) Analysis - This might occur because of data type mismatch between Hive Table & written Parquet file. Solution - Correct the data type to match between Hive Table & Parquet




Hadoop Distcp Error Duplicate files in input path

  One may face following error while copying data from one cluster to other, using Distcp  Command: hadoop distcp -i {src} {tgt} Error: org.apache.hadoop.toolsCopyListing$DulicateFileException: File would cause duplicates. Ideally there can't be same file names. So, what might be happening in your case is you trying to copy partitioned table from one cluster to other. And, 2 different named partitions have same file name. Your solution is to correct Source path  {src}  in your command, such that you provide path uptil partitioned sub directory, not the file. For ex - Refer below : /a/partcol=1/file1.txt /a/partcol=2/file1.txt If you use  {src}  as  "/a/*/*"  then you will get the error  "File would cause duplicates." But, if you use  {src}  as  "/a"  then you will not get error in copying.




org.apache.spark.sql.AnalysisException: Cannot overwrite a path that is also being read from.;

  Caused by: org.apache.spark.sql.AnalysisException: Cannot overwrite a path that is also being read from.; at org.apache.spark.sql.execution.command.DDLUtils$.verifyNotReadPath(ddl.scala:906) at org.apache.spark.sql.execution.datasources.DataSourceAnalysis$$anonfun$apply$1.applyOrElse(DataSourceStrategy.scala:192) at org.apache.spark.sql.execution.datasources.DataSourceAnalysis$$anonfun$apply$1.applyOrElse(DataSourceStrategy.scala:134) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) at org.apache.spark.sql.execution.datasources.DataSourceAnalysis.apply(DataSourceStrategy.scala:134) at org.apache.spark.sql.execution.datasource