Below sample code depicts to write Producer with Custom Partitioner
If partiton() returns an integer that is greater than the actual number of topic partition then above code will not able to produce message to topic and will fail with below exception trace:
PartitionProducer
|
package ds.kafka;
import java.util.Date;
import
java.util.Properties;
import java.util.Random;
import
kafka.producer.ProducerConfig;
import
kafka.producer.KeyedMessage;
import
kafka.javaapi.producer.Producer;
public class PartitionProducer
{
Producer<String, String> producer;
public PartitionProducer() {
Properties props = new Properties();
props.put("metadata.broker.list", "192.168.56.101:9092,
192.168.56.101:9093, 192.168.56.101:9094");
props.put("serializer.class", "kafka.serializer.StringEncoder");
// determine the partition in the
topic where message needs to be sent
props.put("partitioner.class", "ds.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
producer = new
Producer<String, String>(config);
}
public static void main(String[] args) {
if (args.length != 2) {
throw new
IllegalArgumentException(
"Please
provide topic name and Message count as arguments");
}
PartitionProducer partitionProducer = new
PartitionProducer();
try {
partitionProducer
.publishMessage(args[0], Integer.parseInt(args[1]));
} finally {
partitionProducer.close();
}
}
private void close() {
producer.close();
}
private void
publishMessage(String topic, int msgCount) {
Random random = new Random();
for (int i = 0; i < msgCount; i++) {
String clientIP = "192.168.56." + random.nextInt(256);
String accessTime = new Date().toString();
String message = accessTime + ",kafka.apache.org," + clientIP;
System.out.println(message);
KeyedMessage<String,
String> keyedMessage = new KeyedMessage<String, String>(
topic, clientIP, message);
// Publish the
message
producer.send(keyedMessage);
}
}
}
|
Partitioner
|
package ds.kafka;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
public
SimplePartitioner(VerifiableProperties properties) {
System.out.println("Creating
Object of Simple Partitioner");
}
@Override
public int
partition(Object key, int a_numPartitions) {
int
part = 0;
String
partitionKey = (String) key;
int
offset = partitionKey.lastIndexOf('.');
if
(offset > 0) {
part
= Integer.parseInt(partitionKey.substring(offset + 1))% a_numPartitions;
}
return
part;
}
}
|
Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at learning.kafka.PartitionProducer.publishMessage(PartitionProducer.java:62)
Comments
Post a Comment