Custom Partitoner can be implemented by extending org.apache.kafka.clients.producer.Partitioner.
This can be used with Spark-SQL Kafka Data Source by setting property "kafka.partitioner.class"
For example
df.write.format("kafka").option("kafka.partitioner.class", "com.mycustom.ipartitioner")
We implemented one such custom partitioner extending org.apache.kafka.clients.producer.RoundRobinPartitioner.
Complete Source code is available @ https://github.com/dinesh028/engineering/blob/master/Kafka/com/aquaifer/producer/KeyPartitioner.scala
This paritioner -
- Reads a configuration file which has Kafka Key and PrimaryKey Name mapping. Value in Kafka is a JSON Message which has a Primary Key with unique Value.
- Idea is to partition messages based on this unique value, such that messages with same value for primarykey go into same partition.
- Once, configurations are loaded. For each byte array message-
- convert it to String JSON
- Parse JSON
- Get unique value for PrimaryKey.
- Do Modulus of Postive Hash Code of Value by available partition in Kafka Topic.
- The output of Modulus determines the partition for given message.
Also, we noticed that custom properties are not passed to Partitioner configurations, w.r.t same - we did raise a defect with Spark. Refer - https://issues.apache.org/jira/browse/SPARK-45666
Comments
Post a Comment