众所周知,Kafka自己实现了一套二进制协议(binary protocol)用于各种功能的实现,比如发送消息,获取消息,提交位移以及创建topic等。具体协议规范参见:Kafka协议  这套协议的具体使用流程为:

  1. 客户端创建对应协议的请求
  2. 客户端发送请求给对应的broker
  3. broker处理请求,并发送response给客户端

  虽然Kafka提供的大量的脚本工具用于各种功能的实现,但很多时候我们还是希望可以把某些功能以编程的方式嵌入到另一个系统中。这时使用Java API的方式就显得异常地灵活了。本文我将尝试给出Java API底层框架的一个范例,同时也会针对“创建topic”和“查看位移”这两个主要功能给出对应的例子。 需要提前说明的是,本文给出的范例并没有考虑Kafka集群开启安全的情况。另外Kafka的KIP4应该一直在优化命令行工具以及各种管理操作,有兴趣的读者可以关注这个KIP。

  本文中用到的API依赖于kafka-clients,所以如果你使用Maven构建的话,请加上:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.2.0</version>
</dependency>

如果是gradle,请加上:

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.2.0'

底层框架

 1 /**
 2      * 发送请求主方法
 3      * @param host          目标broker的主机名
 4      * @param port          目标broker的端口
 5      * @param request       请求对象
 6      * @param apiKey        请求类型
 7      * @return              序列化后的response
 8      * @throws IOException
 9      */
10     public ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKey) throws IOException {
11         Socket socket = connect(host, port);
12         try {
13             return send(request, apiKey, socket);
14         } finally {
15             socket.close();
16         }
17     }
18
19     /**
20      * 发送序列化请求并等待response返回
21      * @param socket            连向目标broker的socket
22      * @param request           序列化后的请求
23      * @return                  序列化后的response
24      * @throws IOException
25      */
26     private byte[] issueRequestAndWaitForResponse(Socket socket, byte[] request) throws IOException {
27         sendRequest(socket, request);
28         return getResponse(socket);
29     }
30
31     /**
32      * 发送序列化请求给socket
33      * @param socket            连向目标broker的socket
34      * @param request           序列化后的请求
35      * @throws IOException
36      */
37     private void sendRequest(Socket socket, byte[] request) throws IOException {
38         DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
39         dos.writeInt(request.length);
40         dos.write(request);
41         dos.flush();
42     }
43
44     /**
45      * 从给定socket处获取response
46      * @param socket            连向目标broker的socket
47      * @return                  获取到的序列化后的response
48      * @throws IOException
49      */
50     private byte[] getResponse(Socket socket) throws IOException {
51         DataInputStream dis = null;
52         try {
53             dis = new DataInputStream(socket.getInputStream());
54             byte[] response = new byte[dis.readInt()];
55             dis.readFully(response);
56             return response;
57         } finally {
58             if (dis != null) {
59                 dis.close();
60             }
61         }
62     }
63
64     /**
65      * 创建Socket连接
66      * @param hostName          目标broker主机名
67      * @param port              目标broker服务端口, 比如9092
68      * @return                  创建的Socket连接
69      * @throws IOException
70      */
71     private Socket connect(String hostName, int port) throws IOException {
72         return new Socket(hostName, port);
73     }
74
75     /**
76      * 向给定socket发送请求
77      * @param request       请求对象
78      * @param apiKey        请求类型, 即属于哪种请求
79      * @param socket        连向目标broker的socket
80      * @return              序列化后的response
81      * @throws IOException
82      */
83     private ByteBuffer send(AbstractRequest request, ApiKeys apiKey, Socket socket) throws IOException {
84         RequestHeader header = new RequestHeader(apiKey.id, request.version(), "client-id", 0);
85         ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
86         header.writeTo(buffer);
87         request.writeTo(buffer);
88         byte[] serializedRequest = buffer.array();
89         byte[] response = issueRequestAndWaitForResponse(socket, serializedRequest);
90         ByteBuffer responseBuffer = ByteBuffer.wrap(response);
91         ResponseHeader.parse(responseBuffer);
92         return responseBuffer;
93     }

  有了这些方法的铺垫,我们就可以创建具体的请求了。

创建topic

 1 /**
 2      * 创建topic
 3      * 由于只是样例代码,有些东西就硬编码写到程序里面了(比如主机名和端口),各位看官自行修改即可
 4      * @param topicName             topic名
 5      * @param partitions            分区数
 6      * @param replicationFactor     副本数
 7      * @throws IOException
 8      */
 9     public void createTopics(String topicName, int partitions, short replicationFactor) throws IOException {
10         Map<String, CreateTopicsRequest.TopicDetails> topics = new HashMap<>();
11         // 插入多个元素便可同时创建多个topic
12         topics.put(topicName, new CreateTopicsRequest.TopicDetails(partitions, replicationFactor));
13         int creationTimeoutMs = 60000;
14         CreateTopicsRequest request = new CreateTopicsRequest.Builder(topics, creationTimeoutMs).build();
15         ByteBuffer response = send("localhost", 9092, request, ApiKeys.CREATE_TOPICS);
16         CreateTopicsResponse.parse(response, request.version());
17     }

查看位移

 1 /**
 2      * 获取某个consumer group下的某个topic分区的位移
 3      * @param groupID           group id
 4      * @param topic             topic名
 5      * @param parititon         分区号
 6      * @throws IOException
 7      */
 8     public void getOffsetForPartition(String groupID, String topic, int parititon) throws IOException {
 9         TopicPartition tp = new TopicPartition(topic, parititon);
10         OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, singletonList(tp))
11                 .setVersion((short)2).build();
12         ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
13         OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
14         OffsetFetchResponse.PartitionData partitionData = resp.responseData().get(tp);
15         System.out.println(partitionData.offset);
16     }

 1 /**
 2      * 获取某个consumer group下所有topic分区的位移信息
 3      * @param groupID           group id
 4      * @return                  (topic分区 --> 分区信息)的map
 5      * @throws IOException
 6      */
 7     public Map<TopicPartition, OffsetFetchResponse.PartitionData> getAllOffsetsForGroup(String groupID) throws IOException {
 8         OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, null).setVersion((short)2).build();
 9         ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
10         OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
11         return resp.responseData();
12     }

okay, 上面就是“创建topic”和“查看位移”的样例代码,各位看官可以参考着这两个例子构建其他类型的请求。

转载于:https://www.cnblogs.com/huxi2b/p/6508274.html

Java API方式调用Kafka各种协议相关推荐

  1. java接口方式调用海康大华摄像机预览。

    客户有海康和大华的监控设备,没有买各类安防平台,国标方式需要预留给其他需要接入的系统,得兼容高版本chrome,询问了大华的客服人员,最后选择了该方案进行解决,记录下曲折的过程.延迟大约10秒的样子, ...

  2. request参数升序排序 md5加密 防重播 header信息 java API接口调用 切片机制实现

    api接口大多都支持访问信息的验证,其中参数的排序,加密都是经常用到的.有时候还需要将验证信息放到header中. 将api调用者的参数的key及头信息(时间戳.随机串,调用者标识)按照ascii码升 ...

  3. 使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载...

    原文:http://blog.csdn.net/changong28/article/details/39325079 使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指 ...

  4. kafka java api 删除_使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)...

    使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题 ...

  5. swagger2markup(maven方式及java代码方式)

    任务:通过同事的json文件生成相应的html和pdf文档 前言     开始时swagger2markup和asciidoctorj是什么都不知道,只能百度,看官方文档(翻译...), 遇到问题就一 ...

  6. java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明

    本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...

  7. java kafka api_kafka java API的使用

    Kafka包含四种核心的API: 1.Producer API支持应用将数据流发送到Kafka集群的主题 2.Consumer API支持应用从Kafka集群的主题中读取数据流 3.Streams A ...

  8. Elasticsearch Java API四种实现方式

    0.题记 之前Elasticsearch的应用比较多,但大多集中在关系型.非关系型数据库与Elasticsearch之间的同步.以上内容完成了Elasticsearch所需要的基础数据量的供给.但想要 ...

  9. kafka详解(JAVA API操作kafka、kafka原理、kafka监控)-step2

    1.JAVA API操作kafka  修改Windows的Host文件: 目录:C:\Windows\System32\drivers\etc (win10) 内容: 192.168.40.150 k ...

  10. 2021年大数据Kafka(五):❤️Kafka的java API编写❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka的java API编写 一.生产者代码 第一步: ...

最新文章

  1. JAVA课上动手动脑问题以及课后测试1,2总结
  2. 简洁好用的项目管理工具推荐~马起来
  3. Android下的Linux
  4. 任务调度与上下文切换时间测试
  5. 空间谱专题12:二维测向的基本方法
  6. Keras——用Keras搭建自编码神经网络(AutoEncoder)
  7. HTML5 -canvas拖拽、移动 绘制图片可操作移动,拖动
  8. scala的三个排序方法
  9. POJ 2502 Subway dij
  10. 提取Flash源文件中的素材
  11. 如何在VS2013中配置一个DirectX开发环境
  12. 错误: 此上下文中不允许函数定义。
  13. JS方法 数组倒序排列
  14. 虚拟电厂 3D 可视化,节能减排绿色发展
  15. [Erlang危机](3.2)限制输入
  16. 拼多多 标题 html,拼多多的创意图和创意标题怎么测试?为什么要测试?怎样测试呢?...
  17. 网络socket编程--多路复用
  18. 跨专业考浙大计算机考研难度,0基础跨专业计算机考研经验-2013浙大
  19. MT6762/MT6765处理器参数比较/芯片资料介绍
  20. Linux 磁盘- 存储

热门文章

  1. django settings 定义的变量不存在_【Django】第一期|初识Django以及基本安装方法和配置...
  2. git 设置忽略文件类型 gitignore
  3. Calvin: Fast Distributed Transactions for Partitioned Database Systems研读
  4. linux 实时查看日志 最新最后100行 tail
  5. 变换型设计与事务型设计
  6. java 正则 非贪婪_正则表达式中贪婪模式与非贪婪模式的区别
  7. oracle虚拟件不活动,BOM 中的虚拟件
  8. Could not get constructor for org.hibernate.persister.entity.SingleTableEntityPersister
  9. 阶段3 2.Spring_05.基于XML的IOC的案例1_4 注解IOC案例-把自己编写的类使用注解配置...
  10. LGOJP2831 愤怒的小鸟