Kafka从上手到实践 - 实践真知:Kafka Java Producer | 凌云时刻
凌云时刻 · 技术
导读:这一节来看看如何使用Java编写Kafka Producer。
作者 | 计缘
来源 | 凌云时刻(微信号:linuxpk)
Creating Kafka Project
创建Maven工程,在POM文件中加入如下两个依赖:
|
第一个是Kafka的依赖包,用于创建Producer、ProducerRecord、Consumer等。第二个是Log4J的依赖包,用于输出日志。
Java Producer
首先创建Producer需要的配置信息,最基本的有三个信息:
Kafka集群的地址。
发送的Message中Key的序列化方式。
发送的Message中Value的序列化方式。
Properties properties = new Properties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:Port");properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
然后传入上面实例化好的配置信息,实例化Producer:
|
然后实例化Record对象,该对象承载了要往哪个Topic发送以及Message内容的信息:
|
再然后发送Record:
|
最后刷新和关闭Producer:
|
以上就是最简单的Kafka Java Producer的编写方法。运行一下,可以看到类似如下的信息:
|
Java Producer With Callback
如果我们希望在发送Message后,能监控发送状态,或者在发送异常时对异常进行处理。那么我们就可以使用带有Callback的发送方法:
|
这样每次发送Message后,都会进入onCompletion这个方法中,然后可以使用RecordMetadata中记录的各种元数据做一些跟踪和监控的事情,同时如果发送异常了,也可以对异常进行处理。
|
Java Producer With Keys
在前文中,Partition的Compaction Cleanup Policy一节中介绍到,在压缩策略时,就涉及到了Message的Key和Value。我们来看看如何在发送Message时带着Key。
首先来看看ProducerRecord
的另一个构造函数:
|
可以看到,刚才我们只使用了topic
和value
两个参数,其中还有一个key
,所以我们在实例化ProducerRecord
时传入Key就可以了:
|
小结
之前三个章节介绍了如何使用Kafka CLI操作Kafka,其中包括Producer CLI和Consumer CLI。这一章节主要带大家实践如何使用Kafka提供的API编写Java Producer,希望可以给使用Java语言的小伙伴们带来帮助。
END
往期精彩文章回顾
Kafka CLI:Reseting Offset & Config CLI
Kafka CLI:Consumer CLI & Producer CLI
Kafka CLI:Topic CLI & Producer CLI
Kafka从上手到实践 - 实践真知:搭建单机Kafka
Kafka从上手到实践 - 庖丁解牛:Consumer
Kafka从上手到实践 - 庖丁解牛:Producer
Kafka从上手到实践 - 庖丁解牛:Partition
Kafka从上手到实践 - 庖丁解牛:Topic & Broker
Kafka从上手到实践 - 初步认知:MQ系统
进阶之路:深入解读 Java 堆外内存
长按扫描二维码关注凌云时刻
每日收获前沿技术与科技洞见
Kafka从上手到实践 - 实践真知:Kafka Java Producer | 凌云时刻相关推荐
- 新书《深入理解Kafka:核心设计与实践原理》上架,感谢支持~
新书上架 初识 Kafka 时,笔者接触的还是 0.8.1 版本,Kafka 发展到目前的 2.x 版本,笔者也见证了Kafka的蜕变,比如旧版客户端的淘汰.新版客户端的设计.Kafka 控制器的迭代 ...
- kafka 在 360 商业化的实践
精选30+云产品,助力企业轻松上云!>>> 本文参考闫锁鹏老师在2019DAMS上海站关于Kafka在360的商业化实践分享. 关于作者:近10年基础架构与大数据开发经验,2013年 ...
- 【kafka系列】kafka之生产者发送消息实践
目录 一.准备工作 二.终端命令 生产者命令 消费者命令 三.Java实践 搭建项目 异步发送-无回调 异步发送-有回调 同步发送 一.准备工作 进入实战之前先熟悉一下topic的相关命令,使用终端命 ...
- 超详细!一文详解 SparkStreaming 如何整合 Kafka !附代码可实践
来源 | Alice菌 责编 | Carol 封图 | CSDN 下载于视觉中国 出品 | CSDN(ID:CSDNnews) 相信很多小伙伴已经接触过 SparkStreaming 了,理论就不讲 ...
- kafka partition java,kafka中partition数量与消费者对应关系以及Java实践
kafka中partition数量与消费者对应关系以及Java实践 kafka中partition数量与消费者对应关系以及Java实践 kafka是由Apache软件基金会开发的一个开源流处理平台.k ...
- Kafka的原理介绍及实践
一.官方定义 根据官网的介绍,kafka是一个提供统一的.高吞吐.低延迟的,用来处理实时数据的流式平台,它具备以下三特性: 流式记录的发布和订阅:类似于消息系统. 存储:在一个分布式.容错的集群中安全 ...
- 超详细!一文告诉你 SparkStreaming 如何整合 Kafka !附代码可实践
来源 | Alice菌 责编 | Carol 封图 | CSDN 下载于视觉中国 相信很多小伙伴已经接触过 SparkStreaming 了,理论就不讲太多了,今天的内容主要是为大家带来的是 Spa ...
- kafka Confluent Schema Registry 简单实践
解释及目的: 使用传统的Avro API自定义序列化类和反序列化类或者使用Twitter的Bijection类库实现Avro的序列化与反序列化,这两种方法都有一个缺点:在每条Kafka记录里都嵌入了s ...
- 反思供应链项目:实践出真知 多反思提升效率的方法
获得的提升: 代码能力 沟通能力 思维能力 变通能力 使用代码工具的能力 知识面 都有了提升 得到的认知: 1.实践出真知 2.实际做了才是自己的,只是看明白了,不是自己的 3.加班加的也是 ...
- 实践出真知之Spring Cloud之基于Eureka、Ribbon、Feign的真实案例
转载自 实践出真知之Spring Cloud之基于Eureka.Ribbon.Feign的真实案例 Eureka是Spring Cloud Eureka的简称,是Netflix提供的组件之一.通过E ...
最新文章
- (二)Docker中以redis.conf配置文件启动Redis
- 数组排序(选择排序和冒泡排序)
- Kubernetes Client-go Informer 源码分析
- Oracle 游标的练习
- 相关和因果是一回事吗?R值低就是不相关?终于有人讲明白了
- 软件测试:web渗透测试怎样入门!讲透了...
- 服装erp软件如何提高企业利润
- 怎样在photoshop中把字体加粗并倒影
- 计量经济学笔记——自相关的检验和处理(转载)
- HDMI设计1--HDMI 1.4b SPEC的阅读个人总结
- docker搭建xui
- MYSQL 基本练习
- 2 Robotics: Computational Motion Planning 第2+3+4周 课后习题解答
- Java基础-面试题精华(2021最新)
- Google街景车在台湾香港出现
- 调查发现女人比男人更喜欢使用社交网站(组图)
- 网易视频云: 网易平台级视频服务存储技术
- 【项目二】爱奇艺分类点击实时统计
- 最新影视双端最全视频教程+源码
- 【毕业设计】 微信小程序购物商城系统 【含代码】