Java+Kafka消息队列
本文主要针对,Java端对Kafka消息队列的生产和消费。Kafka的安装部署,请看查看相关文章。
笔者最近所用的是Spring mvc,监听文件路径,然后将读取到的文件内容发送到消息队列中。由另外系统去消费消息。
当然消息队列作为消息交换机,本系统既有生产消息也有消费消息。不做详述。
生成者代码相对简单很多。
package com.dhc.test.kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log4j.Logger;import java.util.Properties;public class ProducerHandler {private final KafkaProducer<String, String> producer;private static Logger logger = Logger.getLogger(DataInManager.class.getName());public ProducerHandler(String topic,String message) {Properties props = new Properties();props.put("bootstrap.servers”,"127.0.0.1:9092");props.put("acks", "all");props.put("retries", "0");props.put("batch.size", "16384");props.put("linger.ms", "1");props.put("buffer.memory", "33554432");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");producer = new KafkaProducer<String, String>(props);//生成消息ProducerRecord record = new ProducerRecord(topic,message);//发送消息producer.send(record);logger.info("【kafka】向Kafka的TOPIC【" + topic + "】中发送消息");logger.info("【kafka】消息内容:" + message);logger.info("【kafka】推送成功");}
}
消费者代码
package com.dhc.test.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ConsumerHandler {static Logger logger = Logger.getLogger(DataInManager.class.getName());private final KafkaConsumer<String, String> consumer;private ExecutorService executors;public ConsumerHandler(List<String> topics) {Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092");props.put("group.id", "test");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(topics);execute(1);}public void execute(int workerNum) {executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue(1000), new ThreadPoolExecutor.CallerRunsPolicy());Thread t = new Thread(new Runnable(){//启动一个子线程来监听kafka消息public void run(){while (true) {ConsumerRecords<String, String> records = consumer.poll(200);for (final ConsumerRecord record : records) {logger.info("【Kafka】监听到kafka的TOPIC【" + record.topic() + "】的消息");logger.info("【Kafka】消息内容:" + record.value());executors.submit(new ConsumerWorker(record));}}}});t.start();}public void shutdown() {if (consumer != null) {consumer.close();}if (executors != null) {executors.shutdown();}try {if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {logger.info("【Kafka】Timeout.... Ignore for this case ");}} catch (InterruptedException ignored) {logger.info("【Kafka】Other thread interrupted this shutdown, ignore for this case.");Thread.currentThread().interrupt();}}
}
package com.dhc.test.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.log4j.Logger;public class ConsumerWorker implements Runnable {private ConsumerRecord<String, String> consumerRecord;public ConsumerWorker(ConsumerRecord record) {this.consumerRecord = record;}private static Logger logger = Logger.getLogger(DataInManager.class.getName());public void run() {// consumer接收消息后,这里可以写针对收到的消息的业务处理System.out.println(consumerRecord.value());}
}
main方法启动
package com.dhc.test;import com.dhc.test.kafka.ConsumerHandler;import java.util.ArrayList;
import java.util.List;
import java.util.Properties;public class Start {public static void main(String[] args) throws Exception {// 启动Kafka consumer监视List<String> topics = new ArrayList<String>();// 监听的消息通道topics.add("test");new ConsumerHandler(topics);}
}
谢谢关注!
Java+Kafka消息队列相关推荐
- SpringBoot集成Kafka消息队列
1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...
- Java面试——消息队列
一.消息队列的使用场景 ☞ 以下介绍消息队列在实际应用常用的使用场景.异步处理.应用解耦.流量削锋和消息通讯四个场景. [1]异步处理:场景说明:用户注册后,需要发注册邮件和注册短信. 引入消息队 ...
- 19 kafka消息队列
文章目录 19 kafka消息队列 一.kafka介绍 1.消息队列基本介绍 2.常用的消息队列介绍 3.消息队列的应用场景 4.消息队列的两种模式 5.kafka的基本介绍 6.kafka的架构介绍 ...
- java kafka消息的发送与接收
java kafka消息的发送与接收 消息队列在java EE级开发是很常用到的工具之一,在众多消息队列当中,active mq与kafka相对比较受开发者的喜爱,那么kafka是怎样实现消息的发送与 ...
- Flink使用KafkaSource从Kafka消息队列中读取数据
Flink使用KafkaSource从Kafka消息队列中读取数据 使用KafkaSource从Kafka消息队列中读取数据 1.KafkaSource创建的DataStream是一个并行的DataS ...
- kafka消息队列使用场景
一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,Rabbit ...
- 【无废话】SpringBoot集成Kafka消息队列
0.前言 本人整理收藏了22年多家公司面试知识点整理 ,以及各种Java核心知识点免费分享给大家,我认为对面试与学习来说是非常有用的,想要资料的话请点白嫖这份答案←戳我** 1.说明 Spring可以 ...
- Kafka 消息队列的使用
本篇概要: 1. 消息队列相关概念: 2. Kafka 消息队列: 3. 安装 Kafka 服务: 4. 安装PHP的 Kafka 扩展 rdkafka: 5. 编写 Kafka 的生产者方法: 6. ...
- Kafka消息队列简介
Kafka消息队列简介 1 基本概念 Broker Kafka集群包含一个或多个服务器,这种服务器被称为broker Topic 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic. ...
最新文章
- python动态心形代码-Python数学方程式画心型图案源码示例
- Web云笔记--CSS
- 动态规划之——最长公共子序列(nyoj36)
- android订阅管理,RXJAVA取消订阅封装-kotlin-Android
- 会动的图解 | 既然IP层会分片,为什么TCP层也还要分段?
- java用二维数组编写地图_[Java] Java二维数组写一个超级简单的扫雷游戏,适合新手...
- Apex弹窗闪退报错问题解决方案清晰讲解(系统软件层面解决,已亲测可行)
- 老男孩Linux架构师实战课程14期教程
- Matlab:实现自定义圆孔阵列远场衍射仿真
- 局域网联机游戏找不到服务器,国庆想局域网联机,除了“吃鸡”,这些Steam游戏别错过...
- PCB学习笔记——0201 0402 0603 0805 1206焊盘封装尺寸
- 学计算机网络布线图片,从业必看!直观的弱电各子系统图!
- 实现Promise的resolve/reject/then/all/race/finally/catch方法
- 淘宝客接入PHP(一)
- 有1234四个数字java_用java程序编写,1234这四个数进行排列组合,
- 新技能Get! 手把手教你接入CG Kit
- 增量式分级判别回归树(IHDR)|翻译与笔记
- 用Qt画圣诞树——要画就画最丑的圣诞树
- oracle创建表空间工具,使用sqlplus命令行工具为oracle创建用户和表空间
- SetForegroundWindow、SetActiveWindow、SetFocus 如何将一个某个窗口提到最顶层