1、概念:SpringBoot 整合消息服务2、具体内容对于异步消息组件在实际的应用之中会有两类:· JMS:代表作就是 ActiveMQ,但是其性能不高,因为其是用 java 程序实现的;· AMQP:直接利用协议实现的消息组件,其大众代表作:RabbitMQ,高性能代表作:Kafka。2.1、SpringBoot 整合 ActiveMQ1、 如果要想在项目之中去使用 ActiveMQ 组件,则应该为项目添加依赖支持库,修改 pom.xml 配置文件:<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>2、 修改 application.properties 配置文件进行 activemq 的配置;spring.jms.pub-sub-domain=false
spring.activemq.user=admin
spring.activemq.password=1234
spring.activemq.broker-url=tcp://59.115.158.145:616163、 随后定义一个消息的消费者,消费者主要是进行一个监听控制,在 SpringBoot 里面可以直接利用
注解@JmsListener进行监听:package com.microboot.consumer;import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;@Service
public class MessageConsumerService {@JmsListener(destination="study.msg.queue")public void receiveMessage(String text) {    // 进行消息接收处理System.err.println("【*** 接收消息 ***】" + text);}
}
4、 随后建立消息的发送者服务,一般而言如果进行消息的发送往往会准备出一个业务接口来:package com.microboot.producer;public interface IMessageProducerService {public void sendMessage(String msg) ;
}
5、 随后建立一个配置程序类,定义 ActiveMQ 的消息发送模版处理类:package com.microboot.config;import javax.jms.Queue;import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;@Configuration
@EnableJms
public class ActiveMQConfig {@Beanpublic Queue queue() {return new ActiveMQQueue("study.msg.queue");}
}
6、 创建消息发送的子类实现消息发送处理:package com.microboot.producer;import javax.annotation.Resource;
import javax.jms.Queue;import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;@Service
public class MessageProducerServiceImpl implements IMessageProducerService {@Resourceprivate JmsMessagingTemplate jmsMessagingTemplate;@Resourceprivate Queue queue;@Overridepublic void sendMessage(String msg) {this.jmsMessagingTemplate.convertAndSend(this.queue, msg);}}
7、 编写测试类来观察消息的处理:package com.microboot.test;import javax.annotation.Resource;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;import com.microboot.StartSpringBootMain;
import com.microboot.producer.IMessageProducerService;@SpringBootTest(classes = StartSpringBootMain.class)
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
public class TestActiveMQ {@Resourceprivate IMessageProducerService messageProducer;@Testpublic void testSend() throws Exception {for (int x = 0; x < 10; x++) {this.messageProducer.sendMessage("study - " + x);}}
}基于 SpringBoot 配置的 JMS 的组件访问整体的处理十分简单
2.2、SpringBoot 整合 RabbitMQ如果要进行 RabbitMQ 整合的时候一定要注意以下几个概念:交换空间、虚拟主机、队列信息。本次为了方便起见将项目分为 两个:RabbitMQ-Consumer、RabbitMQ-Producer。1、 【两个项目】将 rabbitmq 的依赖支持包拷贝到项目之中;<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>2、 【microboot-rabbitmq-producer、microboot-rabbitmq-consumer】修改 application.properties配置文件,追加 rabbitmq 的相关配置项:spring.rabbitmq.addresses=127.0.0.1
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
3、 【microboot-rabbitmq-producer】建立一个消息的发送接口:package cn.study.microboot.producer;public interface IMessageProducerService {public void sendMessage(String msg) ;
}
4、 【microboot-rabbitmq-producer】为了可以正常使用 RabbitMQ 进行消息处理,你还需要做一个消息生产配置类;package cn.study.microboot.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ProducerConfig {public static final String EXCHANGE = "study.microboot.exchange"; // 交换空间名称public static final String ROUTINGKEY = "study.microboot.routingkey"; // 设置路由keypublic static final String QUEUE_NAME = "study.microboot.queue"; // 队列名称@Beanpublic Binding bindingExchangeQueue(DirectExchange exchange,Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY) ;}@Beanpublic DirectExchange getDirectExchange() { // 使用直连的模式return new DirectExchange(EXCHANGE, true, true);}@Beanpublic Queue queue() { // 要创建的队列信息return new Queue(QUEUE_NAME);}
}
5、 【microboot-rabbitmq-producer】创建消息服务的实现子类:package cn.study.microboot.producer.impl;import javax.annotation.Resource;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import cn.study.microboot.config.ProducerConfig;
import cn.study.microboot.producer.IMessageProducerService;@Service
public class MessageProducerServiceImpl implements IMessageProducerService {@Resourceprivate RabbitTemplate rabbitTemplate;@Overridepublic void sendMessage(String msg) {this.rabbitTemplate.convertAndSend(ProducerConfig.EXCHANGE,ProducerConfig.ROUTINGKEY, msg);}}
6、 【microboot-rabbitmq-consumer】依然需要做一个消费者的配置程序类,而这个程序类里面主要的目的依然是设置交换空间、 路由 KEY 等信息。package cn.study.microboot.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConsumerConfig {public static final String EXCHANGE = "study.microboot.exchange"; // 交换空间名称public static final String ROUTINGKEY = "study.microboot.routingkey"; // 设置路由keypublic static final String QUEUE_NAME = "study.microboot.queue"; // 队列名称@Beanpublic Queue queue() { // 要创建的队列信息return new Queue(QUEUE_NAME);}@Beanpublic DirectExchange getDirectExchange() { // 使用直连的模式return new DirectExchange(EXCHANGE, true, true);}@Beanpublic Binding bindingExchangeQueue(DirectExchange exchange,Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY) ;}
}
7、 【microboot-rabbitmq-consumer】实现监听处理类:package cn.study.microboot.consumer;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class MessageConsumerService {@RabbitListener(queues="study.microboot.queue")public void receiveMessage(String text) {    // 进行消息接收处理System.err.println("【*** 接收消息 ***】" + text);}
}
9、 【microboot-rabbitmq-consumer】编写消息接收测试类,这里面不需要编写代码,只需要做一个休眠即可:package cn.study.microboot.test;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;import cn.study.microboot.StartSpringBootMain;@SpringBootTest(classes = StartSpringBootMain.class)
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
public class AppTest {@Testpublic void testStart() throws Exception {Thread.sleep(Long.MAX_VALUE);}
}整体进行项目开发之中整合的处理步骤还是简单,但是千万要注意,由于是第一次整合处理,所以将生产者与消费者的配置 类分开了,实际上这两个类的作用是完全一样的。
2.3、SpringBoot 整合 KafkaKafka 是现在最好的开源消息组件,其仿照 AMQP 协议操作,而且处理的性能也是最高的。本次使用已经配置好的 Kafka 服 务器,而且这台服务器上使用了 kerberos 认证,所以应该首先准备好一个 jass 配置文件:1、 定义“kafka_client_jaas.conf”配置文件:KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="bob"password="bob-pwd";
}; 
2、 为了方便进行项目的观察, 本次依然准备出了两个项目:生产者( microboot-kafka-producer )、 消 费 者 (microboot-kafka-consumer),随后为这两个项目添加 kafka 配置支持:<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
3、 【micorboot-kafka-consumer】修改 application.yml 配置文件,进行 kafka 配置项编写:server:port: 80
spring:messages:basename: i18n/Messages,i18n/Pageskafka:bootstrap-servers:- kafka-single:9095template:default-topic: mldn-microbootconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: group-1properties:sasl.mechanism: PLAINsecurity.protocol: SASL_PLAINTEXT
4、 【micorboot-kafka-consumer】建立一个 Kafka 的消息的消费程序类:package cn.study.microboot.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class MessageConsumerService {@KafkaListener(topics = {"study-microboot"})public void receiveMessage(ConsumerRecord<String, String> record) { // 进行消息接收处理System.err.println("【*** 接收消息 ***】key = " + record.key() + "、value = "+ record.value());}
}
5、 【micorboot-kafka-consumer】随后还需要修改 SpringBoot 的启动程序类,追加 kerberos 配置:package cn.study.microboot;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication // 启动SpringBoot程序,而后自带子包扫描
public class StartSpringBootMain {static {System.setProperty("java.security.auth.login.config","d:/kafka_client_jaas.conf"); // 表示系统环境属性}public static void main(String[] args) throws Exception {SpringApplication.run(StartSpringBootMain.class, args);}
}
6、 【microboot-kafka-producer】修改 application.yml 配置文件:server:port: 80
spring:messages:basename: i18n/Messages,i18n/Pageskafka:bootstrap-servers:- kafka-single:9095template:default-topic: mldn-microbootproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:sasl.mechanism: PLAINsecurity.protocol: SASL_PLAINTEXT
7、 【microboot-kafka-producer】定义消息发送的服务接口:package cn.study.microboot.producer;public interface IMessageProducerService {public void sendMessage(String msg) ;
}package cn.study.microboot.service.impl;import javax.annotation.Resource;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;import cn.study.microboot.service.IMessageProducerService;@Service
public class MessageProducerServiceImpl implements IMessageProducerService {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@Overridepublic void send(String msg) {this.kafkaTemplate.sendDefault("study-key", msg);}}
8、 【microboot-kafka-producer】修改程序启动类:package cn.mldn.microboot;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication    // 启动SpringBoot程序,而后自带子包扫描
public class StartSpringBootMain {static {System.setProperty("java.security.auth.login.config","d:/kafka_client_jaas.conf"); // 表示系统环境属性}public static void main(String[] args) throws Exception {SpringApplication.run(StartSpringBootMain.class, args);}
}
9、 【microboot-kafka-producer】编写消息发送的程序类:package cn.study.microboot;import javax.annotation.Resource;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;import cn.study.microboot.service.IMessageProducerService;
@SpringBootTest(classes = StartSpringBootMain.class)
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
public class TestMessageService {@Resourceprivate IMessageProducerService messageService;@Testpublic void testStart() throws Exception {for (int x = 0; x < 100; x++) {this.messageService.send("study - " + x);}}
}

SpringBoot整合 ActiveMQ、SpringBoot整合RabbitMQ、SpringBoot整合Kafka相关推荐

  1. SpringBoot SpringBoot 开发实用篇 5 整合第三方技术 5.21 SpringBoot 整合 ActiveMQ

    SpringBoot [黑马程序员SpringBoot2全套视频教程,springboot零基础到项目实战(spring boot2完整版)] SpringBoot 开发实用篇 文章目录 Spring ...

  2. SpringBoot整合activeMQ消息队列手动签收(Session.CLIENT_ACKNOWLEDGE)为什么失效啊?

    今天在家隔离办公,不太忙,然后就琢磨起来消息队列activeMQ的消息事务来解决分布式事务,但是奈何在SpringBoot整合activeMQ时,其消费者手动签收消息时出现了问题-->当acti ...

  3. activeMQ基础学习和SpringBoot整合activeMQ案例

    昨天仔细研究了activeMQ消息队列,也遇到了些坑,昨天晚上也写了篇文章记录坑的内容,其实上篇文章(SpringBoot整合activeMQ消息队列手动签收(Session.CLIENT_ACKNO ...

  4. springboot整合ActiveMQ(点对点和发布订阅)

    springboot整合ActiveMQ(点对点和发布订阅) ActiveMQ是什么,为什么使用MQ 是基于 Java 中的 JMS 消息服务规范实现的一个消息中间件. 1.系统解耦 采用中间件之后, ...

  5. Springboot 整合Websocket+Stomp协议+RabbitMQ做消息代理 实例教程

    前言 如果你还没有了解过websocket,关于整合websocket的简单入门使用,可以先看看我这篇: <SpringBoot 整合WebSocket 简单实战案例> https://b ...

  6. 详细的springboot整合activeMq安装与使用(上)

    最近在学习activeMQ消息中间件,特此记录一下,方便以后使用. 如果有不严谨的地方,欢迎大家提出,共同进步呀. 本章,会先讲解activeMq的基本介绍.安装.和更改用户名.端口号.下节会仔细讲解 ...

  7. springboot整合activemq加入会签,自动重发机制,持久化

    消费者客户端成功接收一条消息的标志是:这条消息被签收. 消费者客户端成功接收一条消息一般包括三个阶段:          1.消费者接收消息,也即从MessageConsumer的receive方法返 ...

  8. 32位数据源中没有mysql_[SpringBoot实战]快速配置多数据源(整合MyBatis)

    前言 由于业务需求,需要同时在SpringBoot中配置两套数据源(连接两个数据库),要求能做到service层在调用各数据库表的mapper时能够自动切换数据源,也就是mapper自动访问正确的数据 ...

  9. SpringBoot进阶教程 | 第四篇:整合Mybatis实现多数据源

    这篇文章主要介绍,通过Spring Boot整合Mybatis后如何实现在一个工程中实现多数据源.同时可实现读写分离. 准备工作 环境: windows jdk 8 maven 3.0 IDEA 创建 ...

  10. 2.6 springBoot学习笔记(2.6)——- 整合mybatisPlus

    更多文章 更多系列文章在个人网站 springBoot学习系列笔记文章 springBoot学习笔记(1)-- 搭建springBoot项目 文章目录 更多文章 springBoot学习系列笔记文章 ...

最新文章

  1. docker搭建redis
  2. 【机器学习】朴素贝叶斯(Naive Bayes)
  3. 宇视硬盘录像机onvif_视频监控系统中强大的录像机,兼容不同品牌,看看有哪些监控厂家...
  4. Spring学习笔记之AOP配置篇(一) 注解配置
  5. 华云网际:虚拟机+廉价服务器 一体机就这么简单
  6. python神经网络案例——CNN卷积神经网络实现mnist手写体识别
  7. nfine mysql_全开源版NFine快速开发框架C#源码
  8. 软考信息系统项目管理师真题及答案之案例分析
  9. Excel常用功能(持续更新)
  10. Iptables-外网地址及端口映射到内网地址及端口
  11. 【PCL】—欧式聚类算法详解
  12. [转] 使用CPN Tools工具做简单的登录模型()
  13. 投机之殇——解说史上最大CPU漏洞
  14. 如何在一张ppt中插入多张图片并能依次播放
  15. GBase 8c 全文检索-表检索
  16. 哈夫曼树(Huffman Tree)及哈夫曼编码(Huffman Coding)
  17. 微信小程序支付 php后台
  18. NSSCTF刷题wp——Crypto入门
  19. android 加载动画素材,八种APP启动界面的Loading进度条设计动效方案
  20. 服务器管理器角色添加失败

热门文章

  1. 自动化运帷之分布式监控 zabbix 使用
  2. [转载] Linux性能测试 strace命令
  3. zigbee 频谱波段
  4. 错误提示之:已超过传入消息(65536)的最大消息大小配额。若要增加配额,请使用相应绑定元素上的 MaxReceivedMessageSize 属性...
  5. (原创)Linux下的磁盘碎片整理(2)
  6. 【Java线程】线程协作实现多对多聊天
  7. top进阶命令htop运用
  8. Android App内部自动更新Library的使用(转载)
  9. 【规范】前端编码规范——css 规范
  10. C#中的三种委托方式:Func委托,Action委托,Predicate委托