Problem Description


Kafka Streaming job for a source table fails with the below error in IWX v2.6.1


infoworks.tools.ExceptionHandling.IWException: Error while converting record to in memory format at consumers.spark.message.mappers.SemiStructuredMessageMapper.call(SemiStructuredMessageMapper.java:196) at consumers.spark.message.mappers.SemiStructuredMessageMapper.call(SemiStructuredMessageMapper.java:42) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions:{M.DAS.ETD.TRANSACTION.FULLTRANS.XML.V1-1=8338}

 

Cause


This issue occurs if the streaming job is submitted after the retention period and the kafka messages after the offset 8338 are deleted for that particular partition from Kafka's end.


Solution


Perform the below steps to remove the kafka_partition_offset_info field from the mongo document for the source table so that the offset will be set to 0 and all the messages will be fetched from the beginning.



a) Get the table id for the source table which is configured with Kafka streaming.

b) Login to mongo db by running the below command.     

      mongo -uinfoworks -pIN11**rk infoworks-new

c) Run the below query to remove the stored offset information.
db.tables.update({ _id: "ObjectId("provide the table id")" },{ $unset: {kafka_partition_offset_info: 1} })
d) Submit the job and the offset will be set to 0 and all the messages would be fetched from the beginning.


Starting from 2.6.2, streaming Offset can now be reset by running the “truncate table data” job.

 

Applicable Infoworks DataFoundry Versions

v2.3.x,2.4.x,2.5.x