If you want greater control over partition consumption then High Level api consumer you may implement it in low level api. It will require to do more work that is not required in consumer group, like:
- Keeping track of offset where consumer left consumption.
- Identify lead Broker and adjust with Broker leader changes.
Steps for implementing :
- Find an active Broker and find out which Broker is the leader for your topic and partition
- Determine who the replica Brokers are for your topic and partition
- Build the request defining what data you are interested in
- Fetch the data
- Identify and recover from leader changes
package learning.kafka;
import
java.nio.ByteBuffer;
import
java.util.ArrayList;
import
java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import
kafka.api.FetchRequestBuilder;
import
kafka.api.PartitionOffsetRequestInfo;
import
kafka.cluster.Broker;
import
kafka.common.ErrorMapping;
import
kafka.common.TopicAndPartition;
import
kafka.javaapi.FetchResponse;
import
kafka.javaapi.OffsetRequest;
import
kafka.javaapi.OffsetResponse;
import
kafka.javaapi.PartitionMetadata;
import
kafka.javaapi.TopicMetadata;
import
kafka.javaapi.TopicMetadataRequest;
import
kafka.javaapi.TopicMetadataResponse;
import
kafka.javaapi.consumer.SimpleConsumer;
import
kafka.message.MessageAndOffset;
public class SimpleExample {
public static void main(String[] args) {
SimpleExample example = new SimpleExample();
// max messages to read
long maxReads = Long.parseLong(args[0]);
String topic = args[1];
int partition = Integer.parseInt(args[2]);
List<String> seeds = new
ArrayList<String>();
// broker to detrmine lead
seeds.add(args[3]);
// port
try {
example.run(maxReads, topic, partition, seeds, port);
} catch (Exception e) {
System.out.println("Oops:" + e);
e.printStackTrace();
}
}
public void run(long maxReads, String topic, int partition,
List<String> seeds, int port) throws Exception {
PartitionMetadata metadata = findLeader(seeds, port, topic, partition);
if (metadata == null) {
System.err.println("Can't find
metadata for Topic and Partition. Exiting");
return;
}
if(metadata.leader() ==null){
System.err.println("Can't find
Leader for Topic and Partition. Exiting");
return;
}
String leadBroker = metadata.leader().host();
String clientName = "Client_" + topic + "_" + partition;
SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 *
1024, clientName);
long readoffset = getLastOffset(consumer, topic, partition,
kafka.api.OffsetRequest.EarliestTime(), clientName);
int numErrors = 0;
while(maxReads > 0){
if (consumer == null) {
consumer = new SimpleConsumer(leadBroker, port, 100000, 64 *
1024, clientName);
}
kafka.api.FetchRequest req = new
FetchRequestBuilder().clientId(clientName)
.addFetch(topic, partition, readoffset, 100000)
.build();
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) {
numErrors++;
// Something went
wrong!
short code = fetchResponse.errorCode(topic, partition);
System.out.println("Error
fetching data from the Broker:" + leadBroker + " Reason: " + code);
if (numErrors > 5) break;
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for
an invalid offset. For simple case ask for the last element to reset
readoffset = getLastOffset(consumer,topic, partition,
kafka.api.OffsetRequest.LatestTime(), clientName);
continue;
}
consumer.close();
consumer = null;
leadBroker = findNewLeader(leadBroker, topic, partition, port);
continue;
}
numErrors = 0;
long numRead = 0;
for(MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)){
long currentOffset = messageAndOffset.offset();
// Safety check:
It may be case we get old messages so just discard them (like in case of
compression)
if(currentOffset< readoffset){
System.out.println("Found an
old offset: " + currentOffset + " Expecting: " + readoffset);
continue;
}
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
numRead++;
maxReads--;
}
if (numRead == 0) {
try {
Thread.sleep(1000);
} catch
(InterruptedException ie) {
}
}
}
if (consumer != null) consumer.close();
}
private List<String> m_replicaBrokers = new
ArrayList<String>();
private PartitionMetadata
findLeader(List<String> a_seedBrokers,
int a_port, String a_topic, int a_partition) {
PartitionMetadata returnMetaData = null;
loop: for (String seed : a_seedBrokers) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed, a_port, 100000, 64 *
1024,
"leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest
req = new
TopicMetadataRequest(topics);
TopicMetadataResponse
resp = consumer.send(req);
List<TopicMetadata>
metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata
part : item.partitionsMetadata())
{
if (part.partitionId() == a_partition) {
returnMetaData = part;
break loop;
}
}
}
} catch (Exception e) {
System.out.println("Error
communicating with Broker [" + seed
+ "] to find
Leader for [" + a_topic + ", "
+ a_partition + "] Reason:
"
+ e);
} finally {
if (consumer != null)
consumer.close();
}
}
if (returnMetaData != null) {
m_replicaBrokers.clear();
for (Broker replica : returnMetaData.replicas()) {
m_replicaBrokers.add(replica.host());
}
}
return returnMetaData;
}
public static long
getLastOffset(SimpleConsumer consumer, String topic,
int partition, long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
Map<TopicAndPartition,
PartitionOffsetRequestInfo> requestInfo = new
HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new
PartitionOffsetRequestInfo(
whichTime, 1));
OffsetRequest request = new OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(),
clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out
.println("Error
fetching data Offset Data the Broker. Reason: "
+
response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
private String findNewLeader(String a_oldLeader, String a_topic,
int a_partition, int a_port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port,
a_topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())
&& i == 0) {
// first time
through if the leader hasn't changed give
// ZooKeeper a
second to recover
// second time,
assume the broker did recover before failover,
// or it was a
non-Broker issue
//
goToSleep = true;
} else {
return metadata.leader().host();
}
if (goToSleep) {
try {
Thread.sleep(1000);
} catch
(InterruptedException ie) {
}
}
}
System.out
.println("Unable to
find new leader after Broker failure. Exiting");
throw new Exception(
"Unable to
find new leader after Broker failure. Exiting");
}
}
|
Comments
Post a Comment