凌云时刻 · 技术

导读:这一节来看看如何使用Java编写Kafka Producer。

作者 | 计缘

来源 | 凌云时刻(微信号:linuxpk)

Creating Kafka Project

创建Maven工程,在POM文件中加入如下两个依赖:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version>
</dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version>
</dependency>

第一个是Kafka的依赖包,用于创建Producer、ProducerRecord、Consumer等。第二个是Log4J的依赖包,用于输出日志。

Java Producer

首先创建Producer需要的配置信息,最基本的有三个信息:

  • Kafka集群的地址。

  • 发送的Message中Key的序列化方式。

  • 发送的Message中Value的序列化方式。

    Properties properties = new Properties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:Port");properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    

然后传入上面实例化好的配置信息,实例化Producer:

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

然后实例化Record对象,该对象承载了要往哪个Topic发送以及Message内容的信息:

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first_topic", "hello world!");

再然后发送Record:

kafkaProducer.send(producerRecord);

最后刷新和关闭Producer:

kafkaProducer.flush();
kafkaProducer.close();

以上就是最简单的Kafka Java Producer的编写方法。运行一下,可以看到类似如下的信息:

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.Process finished with exit code 0

Java Producer With Callback

如果我们希望在发送Message后,能监控发送状态,或者在发送异常时对异常进行处理。那么我们就可以使用带有Callback的发送方法:

kafkaProducer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {logger.info("Received new metadata. \n" +"Topic: " + recordMetadata.topic()  + "\n" +"Partition: " + recordMetadata.partition() + "\n" +"Offset: " + recordMetadata.offset() + "\n" +"Timestamp: " + recordMetadata.timestamp());} else {logger.error("Error while producing: ", e);}}
});

这样每次发送Message后,都会进入onCompletion这个方法中,然后可以使用RecordMetadata中记录的各种元数据做一些跟踪和监控的事情,同时如果发送异常了,也可以对异常进行处理。

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg
[kafka-producer-network-thread | producer-1] INFO com.devtalking.jacefu.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata.
Topic: first_topic
Partition: 0
Offset: 22
Timestamp: 1546421392063
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.Process finished with exit code 0

Java Producer With Keys

在前文中,Partition的Compaction Cleanup Policy一节中介绍到,在压缩策略时,就涉及到了Message的Key和Value。我们来看看如何在发送Message时带着Key。

首先来看看ProducerRecord的另一个构造函数:

public ProducerRecord(String topic, K key, V value) {this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
}

可以看到,刚才我们只使用了topicvalue两个参数,其中还有一个key,所以我们在实例化ProducerRecord时传入Key就可以了:

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first_topic", "This is the key", "hello world!");

小结

之前三个章节介绍了如何使用Kafka CLI操作Kafka,其中包括Producer CLI和Consumer CLI。这一章节主要带大家实践如何使用Kafka提供的API编写Java Producer,希望可以给使用Java语言的小伙伴们带来帮助。

END

往期精彩文章回顾

Kafka CLI:Reseting Offset & Config CLI

Kafka CLI:Consumer CLI & Producer CLI

Kafka CLI:Topic CLI & Producer CLI

Kafka从上手到实践 - 实践真知:搭建单机Kafka

Kafka从上手到实践 - 庖丁解牛:Consumer

Kafka从上手到实践 - 庖丁解牛:Producer

Kafka从上手到实践 - 庖丁解牛:Partition

Kafka从上手到实践 - 庖丁解牛:Topic & Broker

Kafka从上手到实践 - 初步认知:MQ系统

进阶之路:深入解读 Java 堆外内存

长按扫描二维码关注凌云时刻

每日收获前沿技术与科技洞见

Kafka从上手到实践 - 实践真知:Kafka Java Producer | 凌云时刻相关推荐

  1. 新书《深入理解Kafka:核心设计与实践原理》上架,感谢支持~

    新书上架 初识 Kafka 时,笔者接触的还是 0.8.1 版本,Kafka 发展到目前的 2.x 版本,笔者也见证了Kafka的蜕变,比如旧版客户端的淘汰.新版客户端的设计.Kafka 控制器的迭代 ...

  2. kafka 在 360 商业化的实践

    精选30+云产品,助力企业轻松上云!>>> 本文参考闫锁鹏老师在2019DAMS上海站关于Kafka在360的商业化实践分享. 关于作者:近10年基础架构与大数据开发经验,2013年 ...

  3. 【kafka系列】kafka之生产者发送消息实践

    目录 一.准备工作 二.终端命令 生产者命令 消费者命令 三.Java实践 搭建项目 异步发送-无回调 异步发送-有回调 同步发送 一.准备工作 进入实战之前先熟悉一下topic的相关命令,使用终端命 ...

  4. 超详细!一文详解 SparkStreaming 如何整合 Kafka !附代码可实践

    来源 | Alice菌 责编 | Carol 封图 |  CSDN 下载于视觉中国 出品 | CSDN(ID:CSDNnews) 相信很多小伙伴已经接触过 SparkStreaming 了,理论就不讲 ...

  5. kafka partition java,kafka中partition数量与消费者对应关系以及Java实践

    kafka中partition数量与消费者对应关系以及Java实践 kafka中partition数量与消费者对应关系以及Java实践 kafka是由Apache软件基金会开发的一个开源流处理平台.k ...

  6. Kafka的原理介绍及实践

    一.官方定义 根据官网的介绍,kafka是一个提供统一的.高吞吐.低延迟的,用来处理实时数据的流式平台,它具备以下三特性: 流式记录的发布和订阅:类似于消息系统. 存储:在一个分布式.容错的集群中安全 ...

  7. 超详细!一文告诉你 SparkStreaming 如何整合 Kafka !附代码可实践

    来源 | Alice菌 责编 | Carol 封图 |  CSDN 下载于视觉中国 相信很多小伙伴已经接触过 SparkStreaming 了,理论就不讲太多了,今天的内容主要是为大家带来的是 Spa ...

  8. kafka Confluent Schema Registry 简单实践

    解释及目的: 使用传统的Avro API自定义序列化类和反序列化类或者使用Twitter的Bijection类库实现Avro的序列化与反序列化,这两种方法都有一个缺点:在每条Kafka记录里都嵌入了s ...

  9. 反思供应链项目:实践出真知 多反思提升效率的方法

    获得的提升: 代码能力  沟通能力  思维能力  变通能力  使用代码工具的能力  知识面 都有了提升 得到的认知: 1.实践出真知 2.实际做了才是自己的,只是看明白了,不是自己的 3.加班加的也是 ...

  10. 实践出真知之Spring Cloud之基于Eureka、Ribbon、Feign的真实案例

    转载自  实践出真知之Spring Cloud之基于Eureka.Ribbon.Feign的真实案例 Eureka是Spring Cloud Eureka的简称,是Netflix提供的组件之一.通过E ...

最新文章

  1. (二)Docker中以redis.conf配置文件启动Redis
  2. 数组排序(选择排序和冒泡排序)
  3. Kubernetes Client-go Informer 源码分析
  4. Oracle 游标的练习
  5. 相关和因果是一回事吗?R值低就是不相关?终于有人讲明白了
  6. 软件测试:web渗透测试怎样入门!讲透了...
  7. 服装erp软件如何提高企业利润
  8. 怎样在photoshop中把字体加粗并倒影
  9. 计量经济学笔记——自相关的检验和处理(转载)
  10. HDMI设计1--HDMI 1.4b SPEC的阅读个人总结
  11. docker搭建xui
  12. MYSQL 基本练习
  13. 2 Robotics: Computational Motion Planning 第2+3+4周 课后习题解答
  14. Java基础-面试题精华(2021最新)
  15. Google街景车在台湾香港出现
  16. 调查发现女人比男人更喜欢使用社交网站(组图)
  17. 网易视频云: 网易平台级视频服务存储技术
  18. 【项目二】爱奇艺分类点击实时统计
  19. 最新影视双端最全视频教程+源码
  20. 【毕业设计】 微信小程序购物商城系统 【含代码】

热门文章

  1. 动手才能进步(冒泡法示例)
  2. 作为一个程序员,数学对你到底有多重要
  3. 2009年十大Java技术解决方案
  4. 样式中的url加载探疑
  5. jQuery动态网址标签
  6. 对输入法的人机交互设计评价
  7. ActiveMQ实现负载均衡+高可用部署方案 -转载
  8. 怎样把网站js文件合并成一个?几种方法可以实现
  9. linux ubuntu 11.10 下的android开发环境的搭建!
  10. 20191222每日一句