Kafka中的消费者有两套API,分别是high level的和low level的。两种消费方式在构造和实现上都是不同的,在此记录一下:

一、High level consumer API

High Level Consumer API围绕着Consumer Group这个逻辑概念展开,它屏蔽了每个Topic的每个Partition的Offset管理(自动读取zookeeper中该Consumer group的last offset )、Broker失败转移以及增减Partition、Consumer时的负载均衡(当Partition和Consumer增减时,Kafka自动进行负载均衡)

①提交至ZK (频繁操作zk是效率比较低的)





import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.consumer.ConsumerIterator;
/**  * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example  *  * @author Fung  */
public class KafkaHighConsumer {  private final ConsumerConnector consumer;  private final String topic;  private ExecutorService executor;  public KafkaHighConsumer(String a_zookeeper, String a_groupId, String a_topic) {  consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));  this.topic = a_topic;  }  public void shutdown() {  if (consumer != null)  consumer.shutdown();  if (executor != null)  executor.shutdown();  }  public void run(int numThreads) {  Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//设计topic和stream的关系,即K为topic,V为stream的个数N  topicCountMap.put(topic, new Integer(numThreads));
//获取numThreads个stream  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer  .createMessageStreams(topicCountMap);  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);  executor = Executors.newFixedThreadPool(numThreads);  int threadNumber = 0;
//开启N个消费组线程消费这N个stream  for (final KafkaStream stream : streams) {  executor.submit(new ConsumerMsgTask(stream, threadNumber));  threadNumber++;  }  }  private static ConsumerConfig createConsumerConfig(String a_zookeeper,  String a_groupId) {  Properties props = new Properties();  props.put("zookeeper.connect", a_zookeeper);  props.put("group.id", a_groupId);  props.put("zookeeper.session.timeout.ms", "400");  props.put("zookeeper.sync.time.ms", "200");  props.put("auto.commit.interval.ms", "1000");  return new ConsumerConfig(props);  }  public static void main(String[] arg) {  String[] args = {"", "group-1", "page_visits", "12"};  String zooKeeper = args[0];  String groupId = args[1];  String topic = args[2];  int threads = Integer.parseInt(args[3]);  KafkaHighConsumer demo = new KafkaHighConsumer(zooKeeper, groupId, topic);  demo.run(threads);  try {  Thread.sleep(10000);  } catch (InterruptedException ie) {  }  demo.shutdown();  }  public class ConsumerMsgTask implements Runnable {  private KafkaStream m_stream;  private int m_threadNumber;  public ConsumerMsgTask(KafkaStream stream, int threadNumber) {  m_threadNumber = threadNumber;  m_stream = stream;  }  public void run() {// KafkaStream的本质就是一个网络迭代器  ConsumerIterator<byte[], byte[]> it = m_stream.iterator();  while (it.hasNext())  System.out.println("Thread " + m_threadNumber + ": "  + new String(it.next().message()));  System.out.println("Shutting down Thread: " + m_threadNumber);  }  }  /**  * Created by Administrator on 2016/4/11.  */  public static class KafkaProducer {  }

二、Low level consumer API

Low Level Consumer API,作为底层的Consumer API,提供了消费Kafka Message更大的控制:

  • Read a message multiple times(重复读取)
  • Consume only a subset of the partitions in a topic in a process(跳读)
  • Manage transactions to make sure a message is processed once and only once(Exactly Once原语)


   4)必须找出指定TopicPartition中的lead broker

   1)Find an active Broker and find out which Broker is the leader for your topic and partition(从所有活跃的broker中找出哪个是指定TopicPartition中的leader broker)

2)Determine who the replica Brokers are for your topic and partition(找到你所需要topic和partition的副本brokers)
   3)Build the request defining what data you are interested in(构造请求)
   4)Fetch the data(发送请求查询数据)
   5)Identify and recover from leader changes(处理leader broker变更)

public class KafkaSimpleConsumer {  private List<String> m_replicaBrokers = new ArrayList<String>();  public KafkaSimpleConsumer() {  m_replicaBrokers = new ArrayList<String>();  }  public static void main(String args[]) {  KafkaSimpleConsumer example = new KafkaSimpleConsumer();  // 最大读取消息数量  long maxReads = Long.parseLong("3");  // 要订阅的topic  String topic = "mytopic";  // 要查找的分区  int partition = Integer.parseInt("0");  // broker节点的ip  List<String> seeds = new ArrayList<String>();  seeds.add("");  seeds.add("");  seeds.add("");  // 端口  int port = Integer.parseInt("9092");  try {  example.run(maxReads, topic, partition, seeds, port);  } catch (Exception e) {  System.out.println("Oops:" + e);  e.printStackTrace();  }  }  public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {  // 获取指定Topic partition的元数据  PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);  if (metadata == null) {  System.out.println("Can't find metadata for Topic and Partition. Exiting");  return;  }  if (metadata.leader() == null) {  System.out.println("Can't find Leader for Topic and Partition. Exiting");  return;  }  //找到leader broker  String leadBroker = metadata.leader().host();  String clientName = "Client_" + a_topic + "_" + a_partition;
//链接leader broker  SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
//获取topic的最新偏移量  long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);  int numErrors = 0;  while (a_maxReads > 0) {  if (consumer == null) {  consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);  }
//本质上就是发送FetchRequest请求  FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();  FetchResponse fetchResponse = consumer.fetch(req);  if (fetchResponse.hasError()) {  numErrors++;  // Something went wrong!  short code = fetchResponse.errorCode(a_topic, a_partition);  System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);  if (numErrors > 5)  break;  if (code == ErrorMapping.OffsetOutOfRangeCode()) {  // We asked for an invalid offset. For simple case ask for  // the last element to reset  readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);  continue;  }  consumer.close();  consumer = null;  //处理topic的partition的leader发生变更的情况  leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);  continue;  }  numErrors = 0;  long numRead = 0;  for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {  long currentOffset = messageAndOffset.offset();  if (currentOffset < readOffset) {//过滤旧的数据  System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);  continue;  }  readOffset = messageAndOffset.nextOffset();  ByteBuffer payload = messageAndOffset.message().payload();  byte[] bytes = new byte[payload.limit()];  payload.get(bytes);
//打印消息  System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));  numRead++;  a_maxReads--;  }  if (numRead == 0) {  try {  Thread.sleep(1000);  } catch (InterruptedException ie) {  }  }  }  if (consumer != null)  consumer.close();  }  public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {  TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);  Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();  requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));  kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);  OffsetResponse response = consumer.getOffsetsBefore(request);  if (response.hasError()) {  System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));  return 0;  }  long[] offsets = response.offsets(topic, partition);  return offsets[0];  }  /**  * @param a_oldLeader  * @param a_topic  * @param a_partition  * @param a_port  * @return String  * @throws Exception  *找一个leader broker,其实就是发送TopicMetadataRequest请求  */  private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {  for (int i = 0; i < 3; i++) {  boolean goToSleep = false;  PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);  if (metadata == null) {  goToSleep = true;  } else if (metadata.leader() == null) {  goToSleep = true;  } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {  // first time through if the leader hasn't changed give  // ZooKeeper a second to recover  // second time, assume the broker did recover before failover,  // or it was a non-Broker issue  //  goToSleep = true;  } else {  return metadata.leader().host();  }  if (goToSleep) {  try {  Thread.sleep(1000);  } catch (InterruptedException ie) {  }  }  }  System.out.println("Unable to find new leader after Broker failure. Exiting");  throw new Exception("Unable to find new leader after Broker failure. Exiting");  }  private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {  PartitionMetadata returnMetaData = null;  loop: for (String seed : a_seedBrokers) {  SimpleConsumer consumer = null;  try {  consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");  List<String> topics = Collections.singletonList(a_topic);  TopicMetadataRequest req = new TopicMetadataRequest(topics);  kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);  List<TopicMetadata> metaData = resp.topicsMetadata();  for (TopicMetadata item : metaData) {  for (PartitionMetadata part : item.partitionsMetadata()) {  if (part.partitionId() == a_partition) {  returnMetaData = part;  break loop;  }  }  }  } catch (Exception e) {  System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);  } finally {  if (consumer != null)  consumer.close();  }  }  if (returnMetaData != null) {  m_replicaBrokers.clear();  for (kafka.cluster.Broker replica : returnMetaData.replicas()) {  m_replicaBrokers.add(replica.host());  }  }  return returnMetaData;  }

