Kafka中的消费者有两套API,分别是high level的和low level的。两种消费方式在构造和实现上都是不同的,在此记录一下:

一、High level consumer API

High Level Consumer API围绕着Consumer Group这个逻辑概念展开,它屏蔽了每个Topic的每个Partition的Offset管理(自动读取zookeeper中该Consumer group的last offset )、Broker失败转移以及增减Partition、Consumer时的负载均衡(当Partition和Consumer增减时,Kafka自动进行负载均衡)

特点:
   1)消费过的数据无法再次消费,如果想要再次消费数据,要么换另一个group
   2)为了记录每次消费的位置,必须提交TopicAndPartition的offset,offset提交支持两种方式:
①提交至ZK (频繁操作zk是效率比较低的)
②提交至Kafka集群内部

注:在早期的Kafka版本中,offset默认存在在zookeeper中,但是用这种方式来记录消费者/组的消费进度使得消费者需要频繁地读写zookeeper,而利用zkclient的API的频繁读写本身就是一个相当低效的操作。因此在新版的Kafka中官方做了改动,offset都默认保存在Kafka集群中一个_consumer_offsets的topic里。


   3)客户端通过stream获取数据,stream即指的是来自一个或多个服务器上的一个或者多个partition的消息。每一个stream都对应一个单线程处理。因此,client能够设置满足自己需求的stream数目。总之,一个stream也许代表了多个服务器partion的消息的聚合,但是每一个partition都只能到一个stream。
   4)consumer和partition的关系:
       ①如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
       ②如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
       ③如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同

④增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

⑤High-level接口中获取不到数据的时候是会block住

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.consumer.ConsumerIterator;
/**  * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example  *  * @author Fung  */
public class KafkaHighConsumer {  private final ConsumerConnector consumer;  private final String topic;  private ExecutorService executor;  public KafkaHighConsumer(String a_zookeeper, String a_groupId, String a_topic) {  consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));  this.topic = a_topic;  }  public void shutdown() {  if (consumer != null)  consumer.shutdown();  if (executor != null)  executor.shutdown();  }  public void run(int numThreads) {  Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//设计topic和stream的关系,即K为topic,V为stream的个数N  topicCountMap.put(topic, new Integer(numThreads));
//获取numThreads个stream  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer  .createMessageStreams(topicCountMap);  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);  executor = Executors.newFixedThreadPool(numThreads);  int threadNumber = 0;
//开启N个消费组线程消费这N个stream  for (final KafkaStream stream : streams) {  executor.submit(new ConsumerMsgTask(stream, threadNumber));  threadNumber++;  }  }  private static ConsumerConfig createConsumerConfig(String a_zookeeper,  String a_groupId) {  Properties props = new Properties();  props.put("zookeeper.connect", a_zookeeper);  props.put("group.id", a_groupId);  props.put("zookeeper.session.timeout.ms", "400");  props.put("zookeeper.sync.time.ms", "200");  props.put("auto.commit.interval.ms", "1000");  return new ConsumerConfig(props);  }  public static void main(String[] arg) {  String[] args = {"172.168.63.221:2188", "group-1", "page_visits", "12"};  String zooKeeper = args[0];  String groupId = args[1];  String topic = args[2];  int threads = Integer.parseInt(args[3]);  KafkaHighConsumer demo = new KafkaHighConsumer(zooKeeper, groupId, topic);  demo.run(threads);  try {  Thread.sleep(10000);  } catch (InterruptedException ie) {  }  demo.shutdown();  }  public class ConsumerMsgTask implements Runnable {  private KafkaStream m_stream;  private int m_threadNumber;  public ConsumerMsgTask(KafkaStream stream, int threadNumber) {  m_threadNumber = threadNumber;  m_stream = stream;  }  public void run() {// KafkaStream的本质就是一个网络迭代器  ConsumerIterator<byte[], byte[]> it = m_stream.iterator();  while (it.hasNext())  System.out.println("Thread " + m_threadNumber + ": "  + new String(it.next().message()));  System.out.println("Shutting down Thread: " + m_threadNumber);  }  }  /**  * Created by Administrator on 2016/4/11.  */  public static class KafkaProducer {  }
} 

二、Low level consumer API

Low Level Consumer API,作为底层的Consumer API,提供了消费Kafka Message更大的控制:

  • Read a message multiple times(重复读取)
  • Consume only a subset of the partitions in a topic in a process(跳读)
  • Manage transactions to make sure a message is processed once and only once(Exactly Once原语)

特点

1)一个消息读取多次
   2)在一个处理过程中只消费某个broker上的partition的部分消息
   3)必须在程序中跟踪offset值
   4)必须找出指定TopicPartition中的lead broker
   5)必须处理broker的变动

客户端编程必须按照以下步骤:
   1)Find an active Broker and find out which Broker is the leader for your topic and partition(从所有活跃的broker中找出哪个是指定TopicPartition中的leader broker)

2)Determine who the replica Brokers are for your topic and partition(找到你所需要topic和partition的副本brokers)
   3)Build the request defining what data you are interested in(构造请求)
   4)Fetch the data(发送请求查询数据)
   5)Identify and recover from leader changes(处理leader broker变更)

public class KafkaSimpleConsumer {  private List<String> m_replicaBrokers = new ArrayList<String>();  public KafkaSimpleConsumer() {  m_replicaBrokers = new ArrayList<String>();  }  public static void main(String args[]) {  KafkaSimpleConsumer example = new KafkaSimpleConsumer();  // 最大读取消息数量  long maxReads = Long.parseLong("3");  // 要订阅的topic  String topic = "mytopic";  // 要查找的分区  int partition = Integer.parseInt("0");  // broker节点的ip  List<String> seeds = new ArrayList<String>();  seeds.add("192.168.4.30");  seeds.add("192.168.4.31");  seeds.add("192.168.4.32");  // 端口  int port = Integer.parseInt("9092");  try {  example.run(maxReads, topic, partition, seeds, port);  } catch (Exception e) {  System.out.println("Oops:" + e);  e.printStackTrace();  }  }  public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {  // 获取指定Topic partition的元数据  PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);  if (metadata == null) {  System.out.println("Can't find metadata for Topic and Partition. Exiting");  return;  }  if (metadata.leader() == null) {  System.out.println("Can't find Leader for Topic and Partition. Exiting");  return;  }  //找到leader broker  String leadBroker = metadata.leader().host();  String clientName = "Client_" + a_topic + "_" + a_partition;
//链接leader broker  SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
//获取topic的最新偏移量  long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);  int numErrors = 0;  while (a_maxReads > 0) {  if (consumer == null) {  consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);  }
//本质上就是发送FetchRequest请求  FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();  FetchResponse fetchResponse = consumer.fetch(req);  if (fetchResponse.hasError()) {  numErrors++;  // Something went wrong!  short code = fetchResponse.errorCode(a_topic, a_partition);  System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);  if (numErrors > 5)  break;  if (code == ErrorMapping.OffsetOutOfRangeCode()) {  // We asked for an invalid offset. For simple case ask for  // the last element to reset  readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);  continue;  }  consumer.close();  consumer = null;  //处理topic的partition的leader发生变更的情况  leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);  continue;  }  numErrors = 0;  long numRead = 0;  for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {  long currentOffset = messageAndOffset.offset();  if (currentOffset < readOffset) {//过滤旧的数据  System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);  continue;  }  readOffset = messageAndOffset.nextOffset();  ByteBuffer payload = messageAndOffset.message().payload();  byte[] bytes = new byte[payload.limit()];  payload.get(bytes);
//打印消息  System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));  numRead++;  a_maxReads--;  }  if (numRead == 0) {  try {  Thread.sleep(1000);  } catch (InterruptedException ie) {  }  }  }  if (consumer != null)  consumer.close();  }  public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {  TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);  Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();  requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));  kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);  OffsetResponse response = consumer.getOffsetsBefore(request);  if (response.hasError()) {  System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));  return 0;  }  long[] offsets = response.offsets(topic, partition);  return offsets[0];  }  /**  * @param a_oldLeader  * @param a_topic  * @param a_partition  * @param a_port  * @return String  * @throws Exception  *找一个leader broker,其实就是发送TopicMetadataRequest请求  */  private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {  for (int i = 0; i < 3; i++) {  boolean goToSleep = false;  PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);  if (metadata == null) {  goToSleep = true;  } else if (metadata.leader() == null) {  goToSleep = true;  } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {  // first time through if the leader hasn't changed give  // ZooKeeper a second to recover  // second time, assume the broker did recover before failover,  // or it was a non-Broker issue  //  goToSleep = true;  } else {  return metadata.leader().host();  }  if (goToSleep) {  try {  Thread.sleep(1000);  } catch (InterruptedException ie) {  }  }  }  System.out.println("Unable to find new leader after Broker failure. Exiting");  throw new Exception("Unable to find new leader after Broker failure. Exiting");  }  private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {  PartitionMetadata returnMetaData = null;  loop: for (String seed : a_seedBrokers) {  SimpleConsumer consumer = null;  try {  consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");  List<String> topics = Collections.singletonList(a_topic);  TopicMetadataRequest req = new TopicMetadataRequest(topics);  kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);  List<TopicMetadata> metaData = resp.topicsMetadata();  for (TopicMetadata item : metaData) {  for (PartitionMetadata part : item.partitionsMetadata()) {  if (part.partitionId() == a_partition) {  returnMetaData = part;  break loop;  }  }  }  } catch (Exception e) {  System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);  } finally {  if (consumer != null)  consumer.close();  }  }  if (returnMetaData != null) {  m_replicaBrokers.clear();  for (kafka.cluster.Broker replica : returnMetaData.replicas()) {  m_replicaBrokers.add(replica.host());  }  }  return returnMetaData;  }
}  

Kafka:High level consumer vs. Low level consumer相关推荐

  1. Kafka High Level API vs. Low Level API

    目录: 1.ConsumerApi 2.High Level Consumer (屏蔽细节管理) 3.Low Level API (细节需要自己处理) 1.Kafka提供了两种Consumer API ...

  2. Consumer设计-high/low Level Consumer

    1 Producer和Consumer的数据推送拉取方式   Producer Producer通过主动Push的方式将消息发布到Broker n Consumer Consumer通过Pull从Br ...

  3. Elasticsearch Java Low Level REST Client(通用配置)

    Elasticsearch Java Low Level REST Client(通用配置) 通用配置 正如初始化中所解释的,RestClientBuilder支持提供RequestConfigCal ...

  4. zabbix企业应用之low level discovery监控mysql

    之前介绍了使用zabbix监控固定3306端口的mysql(文章地址为http://dl528888.blog.51cto.com/2382721/1346590),有个不好的地方是只能监控固定的33 ...

  5. C++ Low level performance optimize 2

    C++ Low level performance optimize 2 上一篇 文章讨论了一些底层代码的优化技巧,本文继续讨论一些相关的内容. 首先,上一篇文章讨论cache missing的重要性 ...

  6. Using HumanVisual System modeling for bio-inspired low level image processing

    摘要 在本文中,我们提出了一种图像处理方法,即所谓的"基于生物视觉的方法". 基本思想是通过对人类视觉系统(HVS)的某些部分进行建模来复制人类视觉系统(HVS),以开发低级图像处 ...

  7. ML-Agents与python的Low Level API通信

    本文基于我前面的文章Unity强化学习之ML-Agents的使用 参考Github链接:https://github.com/Unity-Technologies/ml-agents 参考文档:htt ...

  8. Elasticsearch java api操作(一)(Java Low Level Rest Client)

    一.说明: 一.Elasticsearch提供了两个JAVA REST Client版本: 1.java low level rest client: 低级别的rest客户端,通过http与集群交互, ...

  9. 如何使用 Elastic Search Low Level API 构造请求进行搜索

    场景: 需要在客户端构造请求调解, 调用 Elastic Search 的 API 取到结果,并且能够使用 ES 的授权机制. 方案: 一.在客户端构造 Low Level API . 二.Low L ...

最新文章

  1. 秒杀商品超卖事故:Redis分布式锁请慎用!
  2. 一个PHP的HTTP POST方法
  3. javascript 红宝书笔记之如何使用对象 如何操作数组
  4. makefile例子
  5. Android异常总结---1.异常原因: java.lang.IllegalArgumentException: URI: content://com.android.contacts/con
  6. SAP License:MM-采购订单migo,101收货,有三种方式冲销,可以使库存减少,有何不同?
  7. tkinter Scale滑块
  8. 复杂链表的复制(C++解法)
  9. 怎么判断间隙过渡过盈配合_什么是间隙配合、过盈配合、过渡配合?它们在汽车上有哪些应用?...
  10. vmware server激活码
  11. 定位到excel最后一个非空单元格操作技巧,你一定要知道!(二)
  12. 从unity3d官网下载教程
  13. 根证书、服务器证书、用户证书的区别
  14. android落花效果 字体渐变,落花有情 亲花有趣
  15. 如何反编译 cocos creator 生成 的jsc文件/反编译jsc文件(一)
  16. iOS直播app原理
  17. 应用层读写i2c从设备寄存器
  18. Python 音频随机播放器脚本
  19. android设置路由器,如何在Android手机上设置无线路由器以实现WiFi互联网访问
  20. 8Flask-----------Flask框架------------安装使用、基本介绍

热门文章

  1. eva新世纪福音战士_新世纪福音战士EVA,一部划时代的作品!
  2. UWB超宽带 DW1000 通道和带宽
  3. vba word 查找_教你一招!学会这些word小技巧,班级数据管理不犯愁,老师们一定要知道!...
  4. 更精进,更超能 | 一起开启 AI+X 微软认证未来创变者计划
  5. 进制之间的转换(计算机系统基础)
  6. 2013 HTML5 峰会,HTML5 守望者的盛宴
  7. CAD显示全屏控件(网页版)
  8. 为什么阿里那么难进,原来精髓在这
  9. python版钉钉回调
  10. 论文笔记(十六):Learning to Walk in Minutes Using Massively Parallel Deep Reinforcement Learning