For writing producer we need following classes:
- class Producer<K, V>
- class KeyedMessage[K, V](val topic: String, val key: K, val message: V)
- class ProducerConfig
Major properties:
- metadata.broker.list :- Producers automatically determine the lead broker for the topic, partition it by raising a request for the metadata, and connect to the correct broker before it publishes any message.
- serializer.class :- serializer class that needs to be used while preparing the message for transmission from the producer to the broker. By default, the serializer class for the key and message is the same. Producer configuration key.serializer.class is used to set the custom encoder.
- request.required.acks :- The value 1 means the producer receives an acknowledgment once the lead replica has received the data. By default, the producer works in the “fire and forget” mode and is not informed in the case of message loss.
Sample Code:
package ds.kafka;
import
java.util.Date;
import
java.util.Properties;
//import
import
kafka.javaapi.producer.Producer;
import
kafka.producer.KeyedMessage;
import
kafka.producer.ProducerConfig;
public
class SimpleProducer {
/**
* @param args
* @throws InterruptedException
* @throws NumberFormatException
*/
public
static void main(String[] args) throws NumberFormatException, InterruptedException {
int
argsCount
= args.length;
if
(argsCount != 2)
throw
new IllegalArgumentException("Please provide topic name and Message
count as arguments");
//define properties
Properties
properties
= new Properties();
properties.put("metadata.broker.list", "192.168.56.101:9092,
192.168.56.101:9093, 192.168.56.101:9094");
properties.put("serializer.class","kafka.serializer.StringEncoder");
properties.put("request.required.acks", "1");
ProducerConfig
config
= new ProducerConfig(properties);
Producer<String,
String> producer = new Producer<String, String>(config);
try{
buildMessage(args[0],
producer,
Integer.parseInt(args[1]));
}
finally{
//close producer pool connection to all
//kafka brokers. Also, closes zookeeper
//client connection if any
producer.close();
}
}
private
static void
buildMessage(String topic, Producer<String, String> producer,
int msgCount) throws InterruptedException {
for(int
i =0; i<msgCount;
i++){
String
runtime
= new Date().toString();
String
msg
= "Message Publishing
Time - " + runtime;
//String.format("Dinesh%d", i);
KeyedMessage<String,
String> data = new KeyedMessage<String, String>(topic, msg);
producer.send(data);
Thread.sleep(2000);
}
}
}
|
Comments
Post a Comment