Below code depicts an example of multi-threaded high level api consumer:
package learning.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import
java.util.Properties;
import java.util.concurrent.ExecutorService;
import
java.util.concurrent.Executors;
import
java.util.concurrent.TimeUnit;
import
kafka.consumer.Consumer;
import
kafka.consumer.ConsumerConfig;
import
kafka.consumer.ConsumerIterator;
import
kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import
kafka.message.MessageAndMetadata;
public class
MultiThreadHLConsumer {
private ExecutorService executorService;
private final ConsumerConnector consumer;
private final String topic;
public MultiThreadHLConsumer(String topicName) {
Properties props = new Properties();
ConsumerConfig config = new ConsumerConfig(props);
consumer = Consumer.createJavaConsumerConnector(config);
topic = topicName;
}
public void shutdown() throws InterruptedException
{
if (consumer != null)
consumer.shutdown();
if (executorService != null){
executorService.shutdown();
executorService.awaitTermination(10000,
TimeUnit.MILLISECONDS);
}
}
private void
testMultiThreadConsumer(int threadCount) {
Map<String, Integer> topicMap = new HashMap<String,
Integer>();
topicMap.put(topic, new Integer(threadCount));
Map<String,
List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer
.createMessageStreams(topicMap);
List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap
.get(topic);
int count = 0;
for (final KafkaStream
sL : streamList) {
final int threadNumber = count;
executorService.submit(new Runnable() {
@Override
public void run() {
try {
ConsumerIterator<byte[], byte[]> conumeItr = sL.iterator();
while (conumeItr.hasNext()) {
MessageAndMetadata
out =conumeItr.next();
System.out.print("---:Partition:"+out.partition()+"---");
System.out.println("Thread
Number " + threadNumber
+
":
"
+
new String((byte[])out.message()));
}
System.out.println("Shutting
down Thread Number: "
+
threadNumber);
} catch (Exception e) {
e.printStackTrace();
}
}
});
count++;
}
}
public static void main(String[] args) throws
InterruptedException {
MultiThreadHLConsumer consumer = new
MultiThreadHLConsumer("replicated-kafkatopic");
consumer.testMultiThreadConsumer(10);
System.out.println("........");
java.util.Scanner s= new
java.util.Scanner(System.in);
while(!s.next().equals("yes")){
//wait
}
consumer.shutdown();
}
}
|
Comments
Post a Comment