java实现kafka发送消息和接收消息(java无注解方式+springBoot注解方式)
条件:搭建好kafka环境
搭建zookeeper+kafka地址:https://www.cnblogs.com/weibanggang/p/12377055.html
1、java无注解方式
加入kafka包:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version></dependency>
消费者代码
package com.wbg.springboot_kafka;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Collections; import java.util.Properties;public class Consumer extends Thread {KafkaConsumer<Integer,String> consumer;String topic;public Consumer(String topic){Properties properties=new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.198.129:9092,192.168.198.130:9092,192.168.198.131:9092");properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"consumer");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"consumer");properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); //自动提交(批量确认)properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//一个新的group的消费者去消费一个topicproperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //这个属性. 它能够消费昨天发布的数据 consumer=new KafkaConsumer<Integer, String>(properties); this.topic = topic; } @Override public void run() { consumer.subscribe(Collections.singleton(this.topic)); while (true){ ConsumerRecords<Integer,String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); consumerRecords.forEach(record ->{ System.out.println(record.key()+"->"+record.value()+"->"+record.offset()); }); } } public static void main(String[] args) { new Consumer("test_partition").start(); } }
生产者代码
package com.wbg.springboot_kafka;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties; import java.util.concurrent.TimeUnit;public class Producer extends Thread {KafkaProducer<Integer, String> producer;String topic;public Producer(String topic) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.198.129:9092,192.168.198.130:9092,192.168.198.131:9092");properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());producer = new KafkaProducer<Integer, String>(properties);this.topic = topic;}@Overridepublic void run() {int num = 0;while (num < 20) {try {String msg = "kafka msg " + num;producer.send(new ProducerRecord<>(topic, 3, msg), ((recordMetadata, e) -> {System.out.println(recordMetadata.offset() + "->" + recordMetadata.partition());}));TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {new Producer("test_partition").start();} }
启动生产者
启动消费者
2、SpringBoot注解方式
pom依赖:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.0.RELEASE</version></dependency>
application.properties文件
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializerspring.kafka.bootstrap-servers=192.168.198.129:9092,192.168.198.130:9092,192.168.198.131:9092spring.kafka.consumer.group-id=springboot-groupid spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
消费者代码
@Component public class KafkaMyConsumer {@KafkaListener(topics = {"test"})public void listener(ConsumerRecord record){Optional msg = Optional.ofNullable(record.value());if(msg.isPresent()){System.out.println(msg.get());;}} }
View Code
生产者代码
@Component public class KafkaMyProducer {@Autowiredprivate KafkaTemplate<Integer,String> kafkaTemplate;public void send(){kafkaTemplate.send("test",1,"msgData");} }
View Code
启动
@SpringBootApplication public class SpringbootKafkaApplication {public static void main(String[] args) throws InterruptedException {ConfigurableApplicationContext context = SpringApplication.run(SpringbootKafkaApplication.class,args);KafkaMyProducer kafkaMyProducer = context.getBean(KafkaMyProducer.class);for (int i = 0; i < 10; i++) {kafkaMyProducer.send();TimeUnit.SECONDS.sleep(3);}}}
java实现kafka发送消息和接收消息(java无注解方式+springBoot注解方式)相关推荐
- 【Java从0到架构师】RocketMQ 使用 - 发送消息、接收消息、特殊消息
RocketMQ 消息中间件 入门案例 NameServer 地址 发送消息 同步发送消息 异步发送消息 一次性发送消息 生产者组.消息封装 接收消息 消费方式:推式消费.拉式消费 消息方式:集群模式 ...
- 简单Rabbitmq 发送消息和接收消息
简单Rabbitmq 发送消息和接收消息 1 先在Rabbitmq配置文件中预先创建好交换器,队列,路由等信息. 2 创建生产者发送消息 @Autowiredprivate RabbitTemplat ...
- 企业微信 接收消息服务器,接收消息与事件
[TOC] 关于接收消息 为了能够让自建应用和企业微信进行双向通信,企业可以在应用的管理后台开启接收消息模式. 开启接收消息模式的企业,需要提供可用的接收消息服务器URL. 开启接收消息模式后,用户在 ...
- java后台解析json并保存到数据库_[Java教程]ajax 发送json 后台接收 遍历保存进数据库...
[Java教程]ajax 发送json 后台接收 遍历保存进数据库 0 2017-09-25 15:00:23 前台怎么拿参数的我就不管了我也不会 反正用这个ajax没错 ajax 代码 一定要写 ...
- Java微信公众平台开发(三)--接收消息的分类及实体的创建
转自:http://www.cuiyongzhi.com/post/41.html 前面一篇有说道应用服务器和腾讯服务器是通过消息进行通讯的,并简单介绍了微信端post的消息类型,这里我们将建立消息实 ...
- C# 企业微信:开启消息接受接收消息推送消息
前言:微信吧!接触的人都会100%各种踩坑,就算同样东西去年做过,今年来一样踩坑,因为太多你稍微不记得一点点的细节就能让你研究N久.为此,我要把这个过程详细的记录下来. 一.开启消息接受 1.拿到企业 ...
- RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)方案一
RabbitMQ是用于应用程序之间或者程序的不同组件之间的消息通信,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量,也就是生产-消费模型,一端往消息队列中 ...
- 浅析 postMessage 方法介绍、如何接收数据(监听message事件及其属性介绍)、使用postMessage的安全注意事项、具体使用方式(父子页面如何互发消息、接收消息)
postMessage 是 html5 引入的API,postMessage()方法允许来自不同源的脚本采用异步方式进行有效的通信,可以实现跨文本文档.多窗口.跨域消息传递,多用于窗口间数据通信,这也 ...
- java实现邮件发送_基于JavaMail的Java实现简单邮件发送功能
电子邮件的应用非常广泛,例如在某网站注册了一个账户,自动发送一封欢迎邮件,通过邮件找回密码,自动批量发送活动信息等.但这些应用不可能和我们自己平时发邮件一样,先打开浏览器,登录邮箱,创建邮件再发送.本 ...
- kafka为什么用java重写,kafka怎么发布订阅 怎么在java中实现
匿名用户 1级 2017-03-28 回答 这是我们项目中用到的代码 public class ProducerService { private static Logger log = Logger ...
最新文章
- Java实现文件的预览
- 谷歌最新视频抠图术:影子烟雾都能抠,添加水印更顺滑,UP主剪辑利器 | 开源...
- flink on yarn部分源码解析 (FLIP-6 new mode)
- PHP对象的内存模型
- 一切转型始于数据和模型 | 2020 MATLAB EXPO 中国线上用户大会:即将上线
- 数据库系统实训——实验五——存储过程
- mysql中的rman备份与恢复_RMAN备份与恢复实践(转)
- 计算机408重点知识及其他(面试)
- JSP入门必须了解的知识详解
- ENVI监督分类错误:分离度为0.00000解决办法
- 50个最新漂亮的国外网站模板下载
- cve_2019_0708_bluekeep复现采坑
- 阿里开源软件替换指南 1
- Ellisys Bluetooth Vanguard - 软件
- 想做AR/VR相关创新项目,有什么好方向?要怎么做?
- 广义表的概念及存储表示
- c语言绝对值——abs和fabs
- 从决策树学习谈到贝叶斯分类算法、EM、HMM
- dell刷sn_戴尔笔记本怎么查询sn码
- 【科普篇】云存储与传统存储
热门文章
- win10+64位 安装Theano并实现GPU加速
- mysql like in 组合_mysql like in 组合 黄小柔junior分手原因
- android python .xlsx_python读写xlsx
- python开发技术文档范文_常用python编程模板汇总
- Hyperledger fabric v2.3 通道channel 翻译
- linux拨号上网的命令,CentOS 6.4 电信ADSL拨号上网网络配置
- 阿里云ddns解决动态IP问题
- 51nod1712 区间求和
- 阶段1 语言基础+高级_1-3-Java语言高级_08-JDK8新特性_第2节 Stream流式思想概述_3_流式思想概述...
- Xml解析作业与Xml建模andXml建模作业