原文地址:http://blog.csdn.net/honglei915/article/details/37697655

Kafka Producer APIs

旧版的Procuder API有两种:kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.它们都实现了同一个接口:

class Producer {/* 将消息发送到指定分区 */  public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);/* 批量发送一批消息 */  public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);/* 关闭producer */   public void close();}

新版的Producer API提供了下面功能:

  1. 能够将多个消息缓存到本地队列里。然后异步的批量发送到broker,能够通过參数producer.type=async做到。缓存的大小能够通过一些參数指定:queue.timebatch.size。一个后台线程((kafka.producer.async.ProducerSendThread)从队列中取出数据并让kafka.producer.EventHandler将消息发送到broker,也能够通过參数event.handler定制handler。在producer端处理数据的不同的阶段注冊处理器,比方能够对这一过程进行日志追踪。或进行一些监控。仅仅需实现kafka.producer.async.CallbackHandler接口,并在callback.handler中配置。
  2. 自己编写Encoder来序列化消息,仅仅需实现以下这个接口。默认的Encoder是kafka.serializer.DefaultEncoder
    interface Encoder<T> {public Message toMessage(T data);
    }
  3. 提供了基于Zookeeper的broker自己主动感知能力,能够通过參数zk.connect实现。假设不使用Zookeeper。也能够使用broker.list參数指定一个静态的brokers列表,这样消息将被随机的发送到一个broker上,一旦选中的broker失败了,消息发送也就失败了。
  4. 通过分区函数kafka.producer.Partitioner类对消息分区
    interface Partitioner<T> {int partition(T key, int numPartitions);
    }

    分区函数有两个參数:key和可用的分区数量。从分区列表中选择一个分区并返回id。默认的分区策略是hash(key)%numPartitions.假设key是null,就随机的选择一个。

    能够通过參数partitioner.class定制分区函数。

新的api完整实比例如以下:

import java.util.*;import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;public class TestProducer {public static void main(String[] args) {long events = Long.parseLong(args[0]);Random rnd = new Random();Properties props = new Properties();props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("partitioner.class", "example.producer.SimplePartitioner");props.put("request.required.acks", "1");ProducerConfig config = new ProducerConfig(props);Producer<String, String> producer = new Producer<String, String>(config);for (long nEvents = 0; nEvents < events; nEvents++) { long runtime = new Date().getTime();  String ip = “192.168.2.” + rnd.nextInt(255); String msg = runtime + “,www.example.com,” + ip; KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);producer.send(data);}producer.close();}
}

以下这个是用到的分区函数:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;public class SimplePartitioner implements Partitioner<String> {public SimplePartitioner (VerifiableProperties props) {}public int partition(String key, int a_numPartitions) {int partition = 0;int offset = key.lastIndexOf('.');if (offset > 0) {partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;}return partition;}}

KafKa Consumer APIs

Consumer API有两个级别。低级别的和一个指定的broker保持连接。并在接收完消息后关闭连接,这个级别是无状态的,每次读取消息都带着offset。

高级别的API隐藏了和brokers连接的细节,在不必关心服务端架构的情况下和服务端通信。还能够自己维护消费状态。并能够通过一些条件指定订阅特定的topic,比方白名单黑名单或者正則表達式。

低级别的API

class SimpleConsumer {/*向一个broker发送读取请求并得到消息集 */ public ByteBufferMessageSet fetch(FetchRequest request);/*向一个broker发送读取请求并得到一个对应集 */ public MultiFetchResponse multifetch(List<FetchRequest> fetches);/*** 得到指定时间之前的offsets* 返回值是offsets列表。以倒序排序* @param time: 时间,毫秒,*              假设指定为OffsetRequest$.MODULE$.LATIEST_TIME(), 得到最新的offset.*              假设指定为OffsetRequest$.MODULE$.EARLIEST_TIME(),得到最老的offset.*/public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}

低级别的API是高级别API实现的基础,也是为了一些对维持消费状态有特殊需求的场景,比方Hadoop consumer这种离线consumer。

高级别的API

/* 创建连接 */
ConsumerConnector connector = Consumer.create(consumerConfig);interface ConsumerConnector {/*** 这种方法能够得到一个流的列表。每一个流都是MessageAndMetadata的迭代,通过MessageAndMetadata能够拿到消息和其它的元数据(眼下之后topic)  *  Input: a map of <topic, #streams>*  Output: a map of <topic, list of message streams>*/public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); /**    * 你也能够得到一个流的列表,它包括了符合TopicFiler的消息的迭代,* 一个TopicFilter是一个封装了白名单或黑名单的正則表達式。*/public List<KafkaStream> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);/* 提交眼下消费到的offset */public commitOffsets()/* 关闭连接 */public shutdown()
}

这个API环绕着由KafkaStream实现的迭代器展开,每一个流代表一系列从一个或多个分区多和broker上汇聚来的消息。每一个流由一个线程处理。所以client能够在创建的时候通过參数指定想要几个流。一个流是多个分区多个broker的合并。可是每一个分区的消息仅仅会流向一流。

每次通话createMessageStreams会consumer注册到topic上,此consumer和brokers负载平衡将之间调节。

API每次调用创建激励许多人topic流动,以减少这种调整。createMessageStreamsByFilter方法来注册监听器可以感知一个新雅阁filter的tipic。

转载于:https://www.cnblogs.com/zfyouxi/p/4590435.html

漫游Kafka实战篇clientAPI相关推荐

  1. 漫游Kafka实战篇之客户端编程实例

    原文地址:http://blog.csdn.net/honglei915/article/details/37697655 Kafka视频教程同步首发,欢迎观看! Kafka Producer API ...

  2. 漫游kafka实战篇之搭建Kafka开发环境

    转载注明出处:http://blog.csdn.net/honglei915/article/details/37563647 Kafka视频教程同步首发,欢迎观看! 上篇文章中我们搭建了kafka的 ...

  3. 漫游Kafka实战篇之搭建Kafka运行环境

    原文地址:http://blog.csdn.net/honglei915/article/details/37564329 Kafka视频教程同步首发,欢迎观看! 接下来一步一步搭建Kafka运行环境 ...

  4. kafka实战篇(二):消息消费实战

    写在前面:我是「且听风吟」,目前是某上市游戏公司的大数据开发工程师,热爱大数据开源技术,喜欢分享自己的所学所悟,现阶段正在从头梳理大数据体系的知识,以后将会把时间重点放在Spark和Flink上面. ...

  5. 漫游Kafka设计篇之性能优化(7)

    Kafka在提高效率方面做了很大努力.Kafka的一个主要使用场景是处理网站活动日志,吞吐量是非常大的,每个页面都会产生好多次写操作.读方面,假设每个消息只被消费一次,读的量的也是很大的,Kafka也 ...

  6. 漫游Kafka设计篇之性能优化

    原文地址:http://blog.csdn.net/honglei915/article/details/37564757 Kafka视频教程同步首发,欢迎观看! Kafka在提高效率方面做了很大努力 ...

  7. 漫游Kafka实现篇之分布式

    原文地址:http://blog.csdn.net/honglei915/article/details/37932819 Zookeeper节点标记 当路径中的元素包括在方括号里比如[xyz],则表 ...

  8. 漫游Kafka实现篇之消息和日志

    原文地址:http://blog.csdn.net/honglei915/article/details/37760631 Kafka视频教程同步首发,欢迎观看! 消息格式 日志 一个叫做" ...

  9. 漫游Kafka设计篇之主从同步

    原文地址:http://blog.csdn.net/honglei915/article/details/37565289 Kafka视频教程同步首发,欢迎观看! Kafka允许topic的分区拥有若 ...

最新文章

  1. golang中的互斥锁
  2. python html parse
  3. 动态规划 dp04 凸n边形的三角形划分 c代码
  4. Linux - How to Take ‘Snapshot of Logical Volume and Restore’ in LVM
  5. Windows7系统资源怎么看?
  6. 拥抱变化——从Atlas到ASP.NET AJAX(4):大大简化的了的Extender扩展器控件
  7. 是Excel的图,不!是R的图
  8. 哈工大理论力学第八版电子版_理论力学哈工大第八版1第一章思考题课后题
  9. linux make项目管理器,Linux中makefile项目管理
  10. 【数据结构(C语言)】数据结构-表
  11. 对996的一些看法与个人价值实现
  12. 正确认识P2P,从容面对风暴
  13. native mysql 分区_MySQL-表分区
  14. Linux中Bin文件压缩包解压运行
  15. ios怎么ftp上传文件到服务器,Mac OS通过 FTP工具上传文件的方法
  16. 三种最常见的框架解析 | 如何创建JUnit参数化测试
  17. Nicholas C. Zakas谈怎样才能成为优秀的前端工程师
  18. 中国科学: 信息科学 中文模板2019 CCT-LaTeX texlive2019 成功编译
  19. 【蓝桥杯经典数学题】杨辉三角形
  20. 蒜头君是一位高中电脑老师,这学期正在教学生写 \text{C++}C++ 程序。他的评分标准是依照每一位学生在蒜厂 \text{OJ}OJ 上解出的题数,去计算出对应的得分。为了不让分数落差太大,因此

热门文章

  1. python的时间序列,Python时间序列
  2. 图论(二)--各种图介绍
  3. 图像融合(四)-- 对比度金字塔
  4. mysql数据库视图_MySQL数据库8(二十)视图
  5. python为什么会出现无响应怎么办_python定时检测无响应进程并重启的实例代码
  6. 大学计算机 学生成绩表格,学生成绩分析系统的设计与实现
  7. 享元模式在文本编辑器中的应用
  8. scala json处理入门
  9. Hbase shell练习题
  10. openresty查看log