Skip to main content

Posts

Apache Kafka: MultiThreaded High Level Consumer

Below code depicts an example of multi-threaded high level api consumer: package learning.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class MultiThreadHLConsumer {        private ExecutorService executorService ;        private final ConsumerConnector consumer ;        private final String topic ;        public MultiThreadHLConsumer(String topicName ) {               Properties props = new Properties();               props .put( " zookeeper.connect " , "

Apache Kafka: Producer with Custom Partitioner

Below sample code depicts to write Producer with Custom Partitioner PartitionProducer package ds.kafka; import java.util.Date; import java.util.Properties; import java.util.Random; import kafka.producer.ProducerConfig; import kafka.producer.KeyedMessage; import kafka.javaapi.producer.Producer; public class PartitionProducer {        Producer<String, String> producer ;        public PartitionProducer() {               Properties props = new Properties();               props .put( "metadata.broker.list" , "192.168.56.101:9092, 192.168.56.101:9093, 192.168.56.101:9094" );               props .put( "serializer.class" , "kafka.serializer.StringEncoder" );               // determine the partition in the topic where message needs to be sent               props .put( "partitioner.class" , "ds.SimplePartitioner" );               props .p

Apache Kafka: Sample Producer

For writing producer we need following classes: class Producer<K, V> class KeyedMessage[K, V](val topic: String, val key: K, val message: V) class ProducerConfig Major properties:           metadata.broker.list :- Producers automatically determine the lead broker for the topic, partition it by raising a request for the metadata, and connect to the correct broker before it publishes any message.           serializer.class :- serializer class that needs to be used while preparing the message for transmission from the producer to the broker. By default, the serializer class for the key and message is the same. Producer configuration key.serializer.class is used to set the custom encoder.           request.required.acks :- The value 1 means the producer receives an acknowledgment once the lead replica has received the data. By default, the producer works in the “fire and forget” mode and is not informed in the case of message loss.      Sample Code: