通过Java程序来进行Kafka收发消息的演示

Kafka自身提供的Java客户端来演示消息的收发,与Kafka的Java客户端相关的Maven依赖如下:

<properties><scala.version>2.11</scala.version><slf4j.version>1.7.21</slf4j.version><kafka.version>2.0.0</kafka.version><lombok.version>1.18.8</lombok.version><junit.version>4.11</junit.version><gson.version>2.2.4</gson.version><protobuff.version>1.5.4</protobuff.version><spark.version>2.3.1</spark.version></properties><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version>
</dependency>

创建生产者

package com.demo.kafkademo.ch1;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
/*** Kafka 消息生产者*/
public class ProducerFastStart {// Kafka集群地址private static final String brokerList = "192.168.33.129:9092";// 主题名称-之前已经创建private static final String topic = "topicone";public static void main(String[] args) {Properties properties = new Properties();// 设置key序列化器properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//另外一种写法//properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 10);// 设置值序列化器properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 设置集群地址properties.put("bootstrap.servers", brokerList);// KafkaProducer 线程安全KafkaProducer<String, String> producer = new KafkaProducer<>(properties);ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!");try {producer.send(record);//RecordMetadata recordMetadata = producer.send(record).get();//System.out.println("part:" + recordMetadata.partition() + ";topic:" + recordMetadata.topic());} catch (Exception e) {e.printStackTrace();}producer.close();}
}

消费者

package com.demo.kafkademo.ch1;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** Kafka 消息消费者*/
public class ConsumerFastStart {// Kafka集群地址private static final String brokerList = "192.168.33.129:9092";// 主题名称-之前已经创建private static final String topic = "topicone";// 消费组private staticfinal String groupId = "group.demo";public static void main(String[] args) {Properties properties = new Properties();properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("bootstrap.servers", brokerList);properties.put("group.id", groupId);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Collections.singletonList(topic));while (true) {ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(5000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());}}}
}

先启动消费端,再启动生产端进行消息的发送

附:

注意 : waring:使用java连接linux下kafka集群需要设置hosts绑定;
kafka 安装目录 config/server.properties 文件 其中 listeners=PLAINTEXT://:9092
改为listeners=PLAINTEXT://192.168.33.129:9092 (加上kafka服务所在虚拟机ip)
否则会出现异常: Connection to node 1 (localhost/127.0.0.1:9092) could not be established

Java操作Kafka收发消息demo相关推荐

  1. Java操作Kafka执行不成功

    使用kafka-clients操作kafka始终不成功,原因不清楚,下面贴出相关代码及配置,请懂得指点一下,谢谢! 环境及依赖 <dependency><groupId>org ...

  2. 云计算大数据之 Java 操作 Kafka

    云计算大数据之 Java 操作 Kafka 版权声明: 本文为博主学习整理原创文章,如有不正之处请多多指教. 未经博主允许不得转载.https://blog.csdn.net/qq_42595261/ ...

  3. kafka入门(4)-java操作kafka

    kafka入门(4)-java操作kafka 准备工作 创建maven工程 导入Maven Kafka POM依赖 <repositories><!-- 代码库 -->< ...

  4. 使用spring集成的kafka收发消息

    1. 引入maven依赖 <dependency><groupId>org.springframework.integration</groupId><art ...

  5. java整合kafka做消息消费

    1. 导入依赖 <!--kafka 3.2.0,版本根据自己kafka服务端版本选择--> <dependency><groupId>org.apache.kafk ...

  6. Java操作Kafka创建Topic、Producer、Consumer

    环境 JDK 1.8 Zookeeper 3.6.1 Kafka 2.6.0 引入依赖 <dependency><groupId>org.apache.kafka</gr ...

  7. Kafka实现消息生产和消费

    文章目录 一.Kafka测试消息生产与消费 二.Java程序进行Kafka收发消息 1.消息生产者 2.消息消费者 一.Kafka测试消息生产与消费 # 首先创建一个主题 [root@192 kafk ...

  8. kafka java_Java操作Kafka

    java操作kafka非常的简单,然后kafka也提供了很多缺省值,一般情况下我们不需要修改太多的参数就能使用.下面我贴出代码. pom.xml org.apache.kafka kafka-clie ...

  9. 用串口操作手机收发短信总结

    终于完成了用Java操作手机的模块,遇到了不少麻烦,在这里总结一下,也希望对大家有所帮助. 可能有很多人会问,现在的手机与计算机连接都是USB口,那研究串口手机通讯有什么意义?我开始也是这样考虑,所以 ...

最新文章

  1. 浅谈Greenplum的Boolean类型与Text类型之间的转换
  2. 如何从0写一个服务网关?
  3. [原]关于鼠标滚轮的编程
  4. EJB(四)JPA 分布式事务处理
  5. Powershell基础(一)
  6. Installshield获取安装包版本的系统变量是IFX_PRODUCT_VERSION
  7. pat 乙级 1015 德才论(C++)
  8. java试讲题目,常见的Java面试题汇总
  9. Python Imaging Library: ImageFile Module(图像文件模块)
  10. [Go] 通过 17 个简短代码片段,切底弄懂 channel 基础
  11. Linux内核入门(五)——必要的硬件知识
  12. 计算机vb基础知识试题及答案,2014年计算机二级VB试题及答案
  13. 英语单词词性顺口溜_英语十大词性口诀
  14. JPEG添加EXIF
  15. 参加江大白手把手教你-----AidLux智慧安防AI训练营
  16. 我在51CTO微职位学软考——我是mata宇我为自己代言
  17. MIUI——添加学校邮箱到电子邮件解决方案
  18. 推荐25种自媒体运营必备工具 (建议收藏)
  19. 联想Thinkpad E15 息屏后 无法唤醒
  20. linux 路由转发 ipv6,IPv6路由

热门文章

  1. php获取 url 井号,php获取url井号后的参数(描点#后参数)
  2. PMP考试没过的十大原因!你中招了吗
  3. python爬虫获取图片
  4. Vue 进阶系列丨vuex持久化
  5. Postman调用上传文件接口提示Error: read ECONNRESET
  6. 以太坊中web3j调用公链超时问题,重构httpservice即可
  7. Android 仿微信朋友圈拍照原理解读,技术分析
  8. html5 保存图片,H5 dom元素保存为图片
  9. 浙江人事考试网计算机考试科目,『浙江事业单位』2020浙江省事业单位统考笔试科目及考试内容介绍...
  10. d3dx9_43.dll是什么文件、d3dx9_43.dll缺失的解决方法