While Running Spark Streaming application, we observed following exception -
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 410, mymachine.com, executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {mygrouidid=233318826}
at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1260)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:607)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1313)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:200)
at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:129)
at org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
at org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:218)
at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1260)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:607)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1313)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:200)
at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:129)
at org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
at org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:218)
at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
Cause -
Stating Simply, this error signifies that Spark application is trying to fetch a message from offset which is terminated or no existing, leading to error above.
What can be reason for such behavior -
- Our application is processing messages slower then the retention time of messages in the topic.
What can be done -
- Spark Checkpointing.
- Increasing kafka retention.
- Instead of "Assign" to specific last read offset. "Subscribe" to EARLIEST message of topic. This can still fail. So, one can reset offset to LATEST. Note - This will lead to loss of some of data.
- You can "Assign" to some known offset which is not expiring immediately. Note - This will lead to loss of some of data.
- You can reduce the Batch Window Size & time. So, as to get better process time because number of messages get reduced in a batch.
Comments
Post a Comment