条件:搭建好kafka环境

搭建zookeeper+kafka地址:https://www.cnblogs.com/weibanggang/p/12377055.html

1、java无注解方式

加入kafka包:

 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version></dependency>

消费者代码

package com.wbg.springboot_kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class Consumer extends Thread {KafkaConsumer<Integer,String> consumer;String topic;public Consumer(String topic){Properties properties=new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.198.129:9092,192.168.198.130:9092,192.168.198.131:9092");properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"consumer");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"consumer");properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); //自动提交(批量确认)properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//一个新的group的消费者去消费一个topicproperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //这个属性. 它能够消费昨天发布的数据 consumer=new KafkaConsumer<Integer, String>(properties); this.topic = topic; } @Override public void run() { consumer.subscribe(Collections.singleton(this.topic)); while (true){ ConsumerRecords<Integer,String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); consumerRecords.forEach(record ->{ System.out.println(record.key()+"->"+record.value()+"->"+record.offset()); }); } } public static void main(String[] args) { new Consumer("test_partition").start(); } }

生产者代码

package com.wbg.springboot_kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.TimeUnit;public class Producer extends Thread {KafkaProducer<Integer, String> producer;String topic;public Producer(String topic) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.198.129:9092,192.168.198.130:9092,192.168.198.131:9092");properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());producer = new KafkaProducer<Integer, String>(properties);this.topic = topic;}@Overridepublic void run() {int num = 0;while (num < 20) {try {String msg = "kafka msg " + num;producer.send(new ProducerRecord<>(topic, 3, msg), ((recordMetadata, e) -> {System.out.println(recordMetadata.offset() + "->" + recordMetadata.partition());}));TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {new Producer("test_partition").start();}
}

启动生产者

启动消费者

2、SpringBoot注解方式

pom依赖:

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.0.RELEASE</version></dependency>

application.properties文件

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializerspring.kafka.bootstrap-servers=192.168.198.129:9092,192.168.198.130:9092,192.168.198.131:9092spring.kafka.consumer.group-id=springboot-groupid
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

消费者代码

@Component
public class KafkaMyConsumer {@KafkaListener(topics = {"test"})public void listener(ConsumerRecord record){Optional msg = Optional.ofNullable(record.value());if(msg.isPresent()){System.out.println(msg.get());;}}
}

View Code

生产者代码

@Component
public class KafkaMyProducer {@Autowiredprivate KafkaTemplate<Integer,String> kafkaTemplate;public void send(){kafkaTemplate.send("test",1,"msgData");}
}

View Code

启动

@SpringBootApplication
public class SpringbootKafkaApplication {public static void main(String[] args) throws InterruptedException {ConfigurableApplicationContext context = SpringApplication.run(SpringbootKafkaApplication.class,args);KafkaMyProducer kafkaMyProducer = context.getBean(KafkaMyProducer.class);for (int i = 0; i < 10; i++) {kafkaMyProducer.send();TimeUnit.SECONDS.sleep(3);}}}

java实现kafka发送消息和接收消息(java无注解方式+springBoot注解方式)相关推荐

  1. 【Java从0到架构师】RocketMQ 使用 - 发送消息、接收消息、特殊消息

    RocketMQ 消息中间件 入门案例 NameServer 地址 发送消息 同步发送消息 异步发送消息 一次性发送消息 生产者组.消息封装 接收消息 消费方式:推式消费.拉式消费 消息方式:集群模式 ...

  2. 简单Rabbitmq 发送消息和接收消息

    简单Rabbitmq 发送消息和接收消息 1 先在Rabbitmq配置文件中预先创建好交换器,队列,路由等信息. 2 创建生产者发送消息 @Autowiredprivate RabbitTemplat ...

  3. 企业微信 接收消息服务器,接收消息与事件

    [TOC] 关于接收消息 为了能够让自建应用和企业微信进行双向通信,企业可以在应用的管理后台开启接收消息模式. 开启接收消息模式的企业,需要提供可用的接收消息服务器URL. 开启接收消息模式后,用户在 ...

  4. java后台解析json并保存到数据库_[Java教程]ajax 发送json 后台接收 遍历保存进数据库...

    [Java教程]ajax 发送json 后台接收 遍历保存进数据库 0 2017-09-25 15:00:23 前台怎么拿参数的我就不管了我也不会 反正用这个ajax没错 ajax 代码   一定要写 ...

  5. Java微信公众平台开发(三)--接收消息的分类及实体的创建

    转自:http://www.cuiyongzhi.com/post/41.html 前面一篇有说道应用服务器和腾讯服务器是通过消息进行通讯的,并简单介绍了微信端post的消息类型,这里我们将建立消息实 ...

  6. C# 企业微信:开启消息接受接收消息推送消息

    前言:微信吧!接触的人都会100%各种踩坑,就算同样东西去年做过,今年来一样踩坑,因为太多你稍微不记得一点点的细节就能让你研究N久.为此,我要把这个过程详细的记录下来. 一.开启消息接受 1.拿到企业 ...

  7. RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)方案一

    RabbitMQ是用于应用程序之间或者程序的不同组件之间的消息通信,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量,也就是生产-消费模型,一端往消息队列中 ...

  8. 浅析 postMessage 方法介绍、如何接收数据(监听message事件及其属性介绍)、使用postMessage的安全注意事项、具体使用方式(父子页面如何互发消息、接收消息)

    postMessage 是 html5 引入的API,postMessage()方法允许来自不同源的脚本采用异步方式进行有效的通信,可以实现跨文本文档.多窗口.跨域消息传递,多用于窗口间数据通信,这也 ...

  9. java实现邮件发送_基于JavaMail的Java实现简单邮件发送功能

    电子邮件的应用非常广泛,例如在某网站注册了一个账户,自动发送一封欢迎邮件,通过邮件找回密码,自动批量发送活动信息等.但这些应用不可能和我们自己平时发邮件一样,先打开浏览器,登录邮箱,创建邮件再发送.本 ...

  10. kafka为什么用java重写,kafka怎么发布订阅 怎么在java中实现

    匿名用户 1级 2017-03-28 回答 这是我们项目中用到的代码 public class ProducerService { private static Logger log = Logger ...

最新文章

  1. Java实现文件的预览
  2. 谷歌最新视频抠图术:影子烟雾都能抠,添加水印更顺滑,UP主剪辑利器 | 开源...
  3. flink on yarn部分源码解析 (FLIP-6 new mode)
  4. PHP对象的内存模型
  5. 一切转型始于数据和模型 | 2020 MATLAB EXPO 中国线上用户大会:即将上线
  6. 数据库系统实训——实验五——存储过程
  7. mysql中的rman备份与恢复_RMAN备份与恢复实践(转)
  8. 计算机408重点知识及其他(面试)
  9. JSP入门必须了解的知识详解
  10. ENVI监督分类错误:分离度为0.00000解决办法
  11. 50个最新漂亮的国外网站模板下载
  12. cve_2019_0708_bluekeep复现采坑
  13. 阿里开源软件替换指南 1
  14. Ellisys Bluetooth Vanguard - 软件
  15. 想做AR/VR相关创新项目,有什么好方向?要怎么做?
  16. 广义表的概念及存储表示
  17. c语言绝对值——abs和fabs
  18. 从决策树学习谈到贝叶斯分类算法、EM、HMM
  19. dell刷sn_戴尔笔记本怎么查询sn码
  20. 【科普篇】云存储与传统存储

热门文章

  1. win10+64位 安装Theano并实现GPU加速
  2. mysql like in 组合_mysql like in 组合 黄小柔junior分手原因
  3. android python .xlsx_python读写xlsx
  4. python开发技术文档范文_常用python编程模板汇总
  5. Hyperledger fabric v2.3 通道channel 翻译
  6. linux拨号上网的命令,CentOS 6.4 电信ADSL拨号上网网络配置
  7. 阿里云ddns解决动态IP问题
  8. 51nod1712 区间求和
  9. 阶段1 语言基础+高级_1-3-Java语言高级_08-JDK8新特性_第2节 Stream流式思想概述_3_流式思想概述...
  10. Xml解析作业与Xml建模andXml建模作业