本文主要针对,Java端对Kafka消息队列的生产和消费。Kafka的安装部署,请看查看相关文章。

笔者最近所用的是Spring mvc,监听文件路径,然后将读取到的文件内容发送到消息队列中。由另外系统去消费消息。

当然消息队列作为消息交换机,本系统既有生产消息也有消费消息。不做详述。

生成者代码相对简单很多。

package com.dhc.test.kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log4j.Logger;import java.util.Properties;public class ProducerHandler {private final KafkaProducer<String, String> producer;private static Logger logger = Logger.getLogger(DataInManager.class.getName());public ProducerHandler(String topic,String message) {Properties props = new Properties();props.put("bootstrap.servers”,"127.0.0.1:9092");props.put("acks", "all");props.put("retries", "0");props.put("batch.size", "16384");props.put("linger.ms", "1");props.put("buffer.memory", "33554432");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");producer = new KafkaProducer<String, String>(props);//生成消息ProducerRecord record = new ProducerRecord(topic,message);//发送消息producer.send(record);logger.info("【kafka】向Kafka的TOPIC【" + topic + "】中发送消息");logger.info("【kafka】消息内容:" + message);logger.info("【kafka】推送成功");}
}

消费者代码

package com.dhc.test.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ConsumerHandler {static Logger logger = Logger.getLogger(DataInManager.class.getName());private final KafkaConsumer<String, String> consumer;private ExecutorService executors;public ConsumerHandler(List<String> topics) {Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092");props.put("group.id", "test");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(topics);execute(1);}public void execute(int workerNum) {executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue(1000), new ThreadPoolExecutor.CallerRunsPolicy());Thread t = new Thread(new Runnable(){//启动一个子线程来监听kafka消息public void run(){while (true) {ConsumerRecords<String, String> records = consumer.poll(200);for (final ConsumerRecord record : records) {logger.info("【Kafka】监听到kafka的TOPIC【" + record.topic() + "】的消息");logger.info("【Kafka】消息内容:" + record.value());executors.submit(new ConsumerWorker(record));}}}});t.start();}public void shutdown() {if (consumer != null) {consumer.close();}if (executors != null) {executors.shutdown();}try {if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {logger.info("【Kafka】Timeout.... Ignore for this case ");}} catch (InterruptedException ignored) {logger.info("【Kafka】Other thread interrupted this shutdown, ignore for this case.");Thread.currentThread().interrupt();}}
}
package com.dhc.test.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.log4j.Logger;public class ConsumerWorker implements Runnable {private ConsumerRecord<String, String> consumerRecord;public ConsumerWorker(ConsumerRecord record) {this.consumerRecord = record;}private static Logger logger = Logger.getLogger(DataInManager.class.getName());public void run() {// consumer接收消息后,这里可以写针对收到的消息的业务处理System.out.println(consumerRecord.value());}
}

main方法启动

package com.dhc.test;import com.dhc.test.kafka.ConsumerHandler;import java.util.ArrayList;
import java.util.List;
import java.util.Properties;public class Start {public static void main(String[] args) throws Exception {// 启动Kafka consumer监视List<String> topics = new ArrayList<String>();// 监听的消息通道topics.add("test");new ConsumerHandler(topics);}
}

谢谢关注!

Java+Kafka消息队列相关推荐

  1. SpringBoot集成Kafka消息队列

    1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...

  2. Java面试——消息队列

    一.消息队列的使用场景 ☞ 以下介绍消息队列在实际应用常用的使用场景.异步处理.应用解耦.流量削锋和消息通讯四个场景. [1]异步处理:场景说明:用户注册后,需要发注册邮件和注册短信.   引入消息队 ...

  3. 19 kafka消息队列

    文章目录 19 kafka消息队列 一.kafka介绍 1.消息队列基本介绍 2.常用的消息队列介绍 3.消息队列的应用场景 4.消息队列的两种模式 5.kafka的基本介绍 6.kafka的架构介绍 ...

  4. java kafka消息的发送与接收

    java kafka消息的发送与接收 消息队列在java EE级开发是很常用到的工具之一,在众多消息队列当中,active mq与kafka相对比较受开发者的喜爱,那么kafka是怎样实现消息的发送与 ...

  5. Flink使用KafkaSource从Kafka消息队列中读取数据

    Flink使用KafkaSource从Kafka消息队列中读取数据 使用KafkaSource从Kafka消息队列中读取数据 1.KafkaSource创建的DataStream是一个并行的DataS ...

  6. kafka消息队列使用场景

    一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,Rabbit ...

  7. 【无废话】SpringBoot集成Kafka消息队列

    0.前言 本人整理收藏了22年多家公司面试知识点整理 ,以及各种Java核心知识点免费分享给大家,我认为对面试与学习来说是非常有用的,想要资料的话请点白嫖这份答案←戳我** 1.说明 Spring可以 ...

  8. Kafka 消息队列的使用

    本篇概要: 1. 消息队列相关概念: 2. Kafka 消息队列: 3. 安装 Kafka 服务: 4. 安装PHP的 Kafka 扩展 rdkafka: 5. 编写 Kafka 的生产者方法: 6. ...

  9. Kafka消息队列简介

    Kafka消息队列简介 1 基本概念 Broker Kafka集群包含一个或多个服务器,这种服务器被称为broker Topic 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic. ...

最新文章

  1. python动态心形代码-Python数学方程式画心型图案源码示例
  2. Web云笔记--CSS
  3. 动态规划之——最长公共子序列(nyoj36)
  4. android订阅管理,RXJAVA取消订阅封装-kotlin-Android
  5. 会动的图解 | 既然IP层会分片,为什么TCP层也还要分段?
  6. java用二维数组编写地图_[Java] Java二维数组写一个超级简单的扫雷游戏,适合新手...
  7. Apex弹窗闪退报错问题解决方案清晰讲解(系统软件层面解决,已亲测可行)
  8. 老男孩Linux架构师实战课程14期教程
  9. Matlab:实现自定义圆孔阵列远场衍射仿真
  10. 局域网联机游戏找不到服务器,国庆想局域网联机,除了“吃鸡”,这些Steam游戏别错过...
  11. PCB学习笔记——0201 0402 0603 0805 1206焊盘封装尺寸
  12. 学计算机网络布线图片,从业必看!直观的弱电各子系统图!
  13. 实现Promise的resolve/reject/then/all/race/finally/catch方法
  14. 淘宝客接入PHP(一)
  15. 有1234四个数字java_用java程序编写,1234这四个数进行排列组合,
  16. 新技能Get! 手把手教你接入CG Kit
  17. 增量式分级判别回归树(IHDR)|翻译与笔记
  18. 用Qt画圣诞树——要画就画最丑的圣诞树
  19. oracle创建表空间工具,使用sqlplus命令行工具为oracle创建用户和表空间
  20. SetForegroundWindow、SetActiveWindow、SetFocus 如何将一个某个窗口提到最顶层

热门文章

  1. 低压电缆载流量估算一览表
  2. 零基础Vlog教程!
  3. EXCEL__班表班次次数自动更新
  4. 【腾讯优测月刊】安卓主流自动化测试框架详解与实践
  5. C# NHibernate处理多帐套问题
  6. 模电学习笔记(十二)——跨阻放大器
  7. 博科系统怎么在电脑里装服务器,博科交換机安装和维护手册.doc
  8. 12月22总结--生活要稳住
  9. Linux Centos7 Nginx的安装与配置、反向代理、负载均衡、https配置
  10. Wellner 自适应阈值二值化算法