kafka入门(4)-java操作kafka

准备工作

创建maven工程

导入Maven Kafka POM依赖

<repositories><!-- 代码库 --><repository><id>central</id><url>http://maven.aliyun.com/nexus/content/groups/public//</url><releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled><updatePolicy>always</updatePolicy><checksumPolicy>fail</checksumPolicy></snapshots></repository>
</repositories><dependencies><!-- kafka客户端工具 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><!-- 工具类 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-io</artifactId><version>1.3.2</version></dependency><!-- SLF桥接LOG4J日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.6</version></dependency><!-- SLOG4J日志 --><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.16</version></dependency>
</dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.7.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins>
</build>

导入log4j.properties

将log4j.properties配置文件放入到resources文件夹中

log4j.rootLogger=INFO,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n

同步生产消息到Kafka中

需求

编写Java程序,将1-100的数字消息写入到Kafka中。

创建包和类

创建包com.qike.kafka,并创建KafkaProducerTest类

代码开发

  1. 创建连接

    • bootstrap.servers:Kafka的服务器地址
    • acks:表示当生产者生产数据到Kafka中,Kafka中会以什么样的策略返回
    • key.serializer:Kafka中的消息是以key、value键值对存储的,而且生产者生产的消息是需要在网络上传到的,这里指定的是StringSerializer方式,就是以字符串方式发送(将来还可以使用其他的一些序列化框架:Google ProtoBuf、Avro)
    • value.serializer:同上
  2. 创建一个生产者对象KafkaProducer
  3. 调用send方法发送消息(ProducerRecor,封装是key-value键值对)
  4. 调用Future.get表示等带服务端的响应
  5. 关闭生产者

使用同步方式生产消息:

public class KafkaProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建用于连接Kafka的Properties配置Properties props = new Properties();props.put("bootstrap.servers", "192.168.88.100:9092");props.put("acks", "all");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2. 创建一个生产者对象KafkaProducerKafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);// 3. 发送1-100的消息到指定的topic中for(int i = 0; i < 100; ++i) {// 构建一条消息,直接new ProducerRecordProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");Future<RecordMetadata> future = kafkaProducer.send(producerRecord);// 调用Future的get方法等待响应future.get();System.out.println("第" + i + "条消息写入成功!");}// 4.关闭生产者kafkaProducer.close();}
}

使用异步方式生产消息

  • 使用匿名内部类实现Callback接口,该接口中表示Kafka服务器响应给客户端,会自动调用onCompletion方法

    • metadata:消息的元数据(属于哪个topic、属于哪个partition、对应的offset是什么)
    • exception:这个对象Kafka生产消息封装了出现的异常,如果为null,表示发送成功,如果不为null,表示出现异常。
// 二、使用异步回调的方式发送消息
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");
kafkaProducer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {// 1. 判断发送消息是否成功if(exception == null) {// 发送成功// 主题String topic = metadata.topic();// 分区idint partition = metadata.partition();// 偏移量long offset = metadata.offset();System.out.println("topic:" + topic + " 分区id:" + partition + " 偏移量:" + offset);}else {// 发送出现错误System.out.println("生产消息出现异常!");// 打印异常消息System.out.println(exception.getMessage());// 打印调用栈System.out.println(exception.getStackTrace());}}
});

从Kafka的topic中消费消息

需求

从 test topic中,将消息都消费,并将记录的offset、key、value打印出来

准备工作

在com.qike.kafka包下创建KafkaConsumerTest类

代码开发

  1. 创建Kafka消费者配置

  2. 创建Kafka消费者

  3. 订阅要消费的主题

  4. 使用一个while循环,不断从Kafka的topic中拉取消息

  5. 将将记录(record)的offset、key、value都打印出来

代码:

  • group.id:消费者组的概念,可以在一个消费组中包含多个消费者。如果若干个消费者的group.id是一样的,表示它们就在一个组中,一个组中的消费者是共同消费Kafka中topic的数据。
  • Kafka是一种拉消息模式的消息队列,在消费者中会有一个offset,表示从哪条消息开始拉取数据
  • kafkaConsumer.poll:Kafka的消费者API是一批一批数据的拉取
/*** 消费者程序** 1.创建Kafka消费者配置* Properties props = new Properties();* props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");* props.setProperty("group.id", "test");* props.setProperty("enable.auto.commit", "true");* props.setProperty("auto.commit.interval.ms", "1000");* props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");* props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");** 2.创建Kafka消费者* 3.订阅要消费的主题* 4.使用一个while循环,不断从Kafka的topic中拉取消息* 5.将将记录(record)的offset、key、value都打印出来*/
public class KafkaConsumerTest {public static void main(String[] args) {// 1.创建Kafka消费者配置Properties props = new Properties();props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");// 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据// 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的props.setProperty("group.id", "test");// 自动提交offsetprops.setProperty("enable.auto.commit", "true");// 自动提交offset的时间间隔props.setProperty("auto.commit.interval.ms", "1000");// 拉取的key、value数据的props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 2.创建Kafka消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);// 3. 订阅要消费的主题// 指定消费者从哪个topic中拉取数据kafkaConsumer.subscribe(Arrays.asList("test"));// 4.使用一个while循环,不断从Kafka的topic中拉取消息while(true) {// Kafka的消费者一次拉取一批的数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));// 5.将将记录(record)的offset、key、value都打印出来for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {// 主题String topic = consumerRecord.topic();// offset:这条消息处于Kafka分区中的哪个位置long offset = consumerRecord.offset();// key\valueString key = consumerRecord.key();String value = consumerRecord.value();System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);}}}
}

可以参考官网API文档:http://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

异步使用带有回调函数方法生产消息

如果我们想获取生产者消息是否成功,或者成功生产消息到Kafka中后,执行一些其他动作。此时,可以很方便地使用带有回调函数来发送消息。

需求:

  1. 在发送消息出现异常时,能够及时打印出异常信息

  2. 在发送消息成功时,打印Kafka的topic名字、分区id、offset

代码开发

代码:

public class KafkaProducerTest {public static void main(String[] args) {// 1. 创建用于连接Kafka的Properties配置Properties props = new Properties();props.put("bootstrap.servers", "192.168.88.100:9092");props.put("acks", "all");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2. 创建一个生产者对象KafkaProducerKafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);// 3. 调用send发送1-100消息到指定Topic testfor(int i = 0; i < 100; ++i) {// 一、同步方式// 获取返回值Future,该对象封装了返回值// Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));// 调用一个Future.get()方法等待响应// future.get();// 二、带回调函数异步方式producer.send(new ProducerRecord<String, String>("test", null, i + ""), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception != null) {System.out.println("发送消息出现异常");}else {String topic = metadata.topic();int partition = metadata.partition();long offset = metadata.offset();System.out.println("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!");}}});}// 5. 关闭生产者producer.close();}
}

kafka入门(4)-java操作kafka相关推荐

  1. Java操作Kafka执行不成功

    使用kafka-clients操作kafka始终不成功,原因不清楚,下面贴出相关代码及配置,请懂得指点一下,谢谢! 环境及依赖 <dependency><groupId>org ...

  2. 云计算大数据之 Java 操作 Kafka

    云计算大数据之 Java 操作 Kafka 版权声明: 本文为博主学习整理原创文章,如有不正之处请多多指教. 未经博主允许不得转载.https://blog.csdn.net/qq_42595261/ ...

  3. Java操作Kafka收发消息demo

    通过Java程序来进行Kafka收发消息的演示 Kafka自身提供的Java客户端来演示消息的收发,与Kafka的Java客户端相关的Maven依赖如下: <properties>< ...

  4. kafka实战教程(python操作kafka),kafka配置文件详解

    全栈工程师开发手册 (作者:栾鹏) 架构系列文章 应用往Kafka写数据的原因有很多:用户行为分析.日志存储.异步通信等.多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量 ...

  5. redis入门及java操作

    redis 命令可以去菜鸟教程http://www.runoob.com/redis/redis-tutorial.html 或者以下地址去学习http://www.cnblogs.com/huang ...

  6. ffmpeg入门及java操作ffmpeg对视频进行处理

    一.ffmpeg 1.简介 FFmpeg是一个开源免费跨平台的视频和音频流方案,属于自由软件,采用LGPL或GPL许可证(依据你选择的组件).它提供了录制.转换以及流化音视频的完整解决方案.它包含了非 ...

  7. Java操作Kafka创建Topic、Producer、Consumer

    环境 JDK 1.8 Zookeeper 3.6.1 Kafka 2.6.0 引入依赖 <dependency><groupId>org.apache.kafka</gr ...

  8. elasticSearch入门到java操作api一套搞定

    目录 写在前面 一.下载地址 二.solr与es比较 三.安装elasticsearch 四.安装可视化界面(hand插件) 使用 五.安装kibana 六.学习es核心概念 七.IK分词器插件 八. ...

  9. kafka为什么用java重写,kafka怎么发布订阅 怎么在java中实现

    匿名用户 1级 2017-03-28 回答 这是我们项目中用到的代码 public class ProducerService { private static Logger log = Logger ...

最新文章

  1. 超级实用的linux 下shell快捷键汇总
  2. mac添加取消开机启动
  3. python一行输入多个值用空格隔开_2020-09-22-Python-函数嵌套、filter()函数、一行输入多个整数(空格分隔)、多维列表的输入...
  4. UPS不间断电源放电时间计算方法
  5. 基于Nginx的媒体服务器技术
  6. lamp配置python_LAMP搭建笔记
  7. ios自定义控件,使UIScrollView自己处理输入时键盘遮挡控件
  8. 9大门类,99个系列课程,几乎所有AI免费课程都在这里啦
  9. RxJava操作符lift笔记25
  10. 关于javaweb中sql语句中使用变量的情况
  11. 波士顿动力SpotMini改造有胳膊半人马,这家意大利创企打造极致机械手臂
  12. qq附近的人怎么引流?如何利用手机QQ附近功能引流
  13. poi excel密码加密
  14. SCRM升级--企业微信数字营销解决方案
  15. MySQL必知必会6
  16. BinaryWriter
  17. char*、char**和char***的使用
  18. ILRuntime热更的小技巧
  19. css设置文本斜体代码实例
  20. Access和VBA

热门文章

  1. Web端解码视频流:(Part1 纯Java Script 解码[不建议大家这样做,因为普适性不强])
  2. 速记计算机键盘,速记键盘的原理
  3. k8s中kubectl命令的使用
  4. 信息论杂谈之文理科融合与东西方思维差异
  5. Android开发新手入门教程!阿里面试100%会问到的JVM,一线互联网公司面经总结
  6. dcs系统服务器配置,DCS系统中动态数据服务器设置方法
  7. Matplotlib笔记 · 绘图区域的结构和子图布局与划分(figure, axes, subplots)
  8. [csp2019]Emiya家今天的饭
  9. 【luoguP5664】 Emiya 家今天的饭 动态规划
  10. Docker容器与系统时间同步