本课程源码地址:https://pan.baidu.com/s/1MhPDUFtqfFFfHJ5K0fMJEQ   提取码:uk5e

关于消息中间件

消息:软件对象彼此通讯的方式

中间件:介于两者之间的技术,像不同服务器之间负责数据传递,服务控制或者性能监控的组件!

常用的中间件有 消息中间件(redis,kafka等)、数据库中间件(mycat,druid等)、应用服务器中间件、PRC中间件等,中间件最大的

特点是解耦服务,让不同的服务之间可以采用某种规范进行交互

ActiveMQ简介:

Apache ActiveMQ是开源、多协议、基于Java的消息服务器。使用无处不在的AMQP协议集成多平台应用程序。使用STOMP通过websockets 在Web应用程序之间交换消息。使用MQTT管理您的IoT设备。支持您现有的JMS(1.1)基础架构及其他。ActiveMQ提供强大的功能和灵活性,以支持任何消息传递用例。

ActiveMQ下载安装(Windows 64):

ActiveMQ官网下载ActiveMQ 5的经典版本

下载后解压,打开bin目录发现有个activemq.bat文件,继续进入win64文件夹,打开这个文件夹下面的activemq.bat文件

访问http://127.0.0.1:8161/,出现如下情形,则表示ActiveMQ服务开启成功!

JMS模式下的ActiveMQ

ActiveMQ的模式

ActiveMQ 有两种模式,分别是队列模式(Queue)主题模式(Topic)

队列模式:其实就是分食模式。 比如生产方发了 10条消息到 activeMQ 服务器, 而此时有多个消费方,那么这些消费方就会瓜分这些10条消息,一条消息只会被一个消费方得到。

主题模式:就是订阅模式。 比如生产方发了10条消息,而此时有多个消费方,那么多个消费方都能得到这 10条消息,就如同订阅公众号那样

队列模式的演示

1.导入相关的pom依赖 pom.xml

        <!--jms模式必须包--><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.9</version></dependency><!--非常好用的工具库--><dependency> <groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>4.3.1</version></dependency><!--以下是集成Spring需额外添加的包--><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-client</artifactId><version>5.13.4</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>4.3.2.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>4.2.4.RELEASE</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency>

2.创建工具类,得知相关服务是否启动成功 ActiveMQUtil.java

package utils;import cn.hutool.core.util.NetUtil;
import javax.swing.*;public class ActiveMQUtil {public static void main(String[] args) {checkServer();}public static void checkServer() {if(NetUtil.isUsableLocalPort(8161)) {JOptionPane.showMessageDialog(null, "ActiveMQ 服务器未启动 ");System.exit(1);}}
}

3.创建一个队列模式的生产者 QueueProducer.java

package queue;import org.apache.activemq.ActiveMQConnectionFactory;
import utils.ActiveMQUtil;
import javax.jms.*;//队列模式的生产者示例
public class QueueProducer {//服务地址private static final String url = "tcp://127.0.0.1:61616";//这次发送的消息名称private static final String topicName = "queue_style";public static void main(String[] args) throws JMSException {//0.先判断端口是否启动了ActiveMQ服务器ActiveMQUtil.checkServer();//1.创建ConnectionFactory并绑定地址ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);//2.创建ConnectionConnection connection = connectionFactory.createConnection();//3.启动链接connection.start();//4.创建会话Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.创建目标(队列类型)Destination destination = session.createQueue(topicName);//6.创建一个生产者MessageProducer messageProducer = session.createProducer(destination);for (int i = 0; i < 100; i++) {//7.创建消息TextMessage textMessage = session.createTextMessage("队列消息-"+i);//8.发送消息messageProducer.send(textMessage);System.out.println("发送:"+textMessage.toString());}//9.关闭链接connection.close();}}

4.创建一个队列模式的消费者 QueueConsumer.java

package queue;import cn.hutool.core.util.RandomUtil;
import org.apache.activemq.ActiveMQConnectionFactory;
import utils.ActiveMQUtil;
import javax.jms.*;public class QueueConsumer {//服务地址,端口默认61616private static final String url = "tcp://127.0.0.1:61616";//这次消费的消息名称private static final String topicName = "queue_style";//消费者有可能是多个,为了区分不同的消费者,为其创建随机名称private static final String consumerName = "consumer-" + RandomUtil.randomString(5);public static void main(String[] args) throws JMSException {//0. 先判断端口是否启动了 Active MQ 服务器ActiveMQUtil.checkServer();System.out.printf("%s 消费者启动了 %n", consumerName);//1.创建ConnectiongFactory,绑定地址ConnectionFactory factory = new ActiveMQConnectionFactory(url);//2.创建ConnectionConnection connection = factory.createConnection();//3.启动连接connection.start();//4.创建会话Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.创建一个目标 (主题类型)Destination destination = session.createQueue(topicName);//6.创建一个消费者MessageConsumer consumer = session.createConsumer(destination);//7.创建一个监听器consumer.setMessageListener(new MessageListener() {public void onMessage(Message arg0) {// TODO Auto-generated method stubTextMessage textMessage = (TextMessage) arg0;try {System.out.println(consumerName + " 接收消息:" + textMessage.getText());} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}});//8. 因为不知道什么时候有,所以没法主动关闭,就不关闭了,一直处于监听状态//connection.close();}
}

5.启动顺序:队列模式,生产者投放的消息排成队,等待消费者来消费,所以开启顺序可以随意先开哪个都可以

注意消费者需要处于监听状态,随时等待消费,所以连接不能关闭。

以先执行消费者的main方法,后执行生产者的main方法为例:

主题模式的演示

ActiveMQ的主题模式就是每个订阅了的消费者,都可以获取所有的消息,而不像队列模式那样要争抢。

假设有两个消费者在等候消费,这个时候他们监听的同一生产者发送了100条消息,那么主题模式下,两个消费者各自都能收到100消息,而不像队列模式那样需要"争抢"

执行前面的1、2步(做过了就不用做了)

3.创建主体模式的生产者 TopicProducer.java

package topic;import org.apache.activemq.ActiveMQConnectionFactory;
import utils.ActiveMQUtil;import javax.jms.*;public class TopicProducer {//服务地址,端口默认61616private static final String url="tcp://127.0.0.1:61616";//这次发送的消息名称private static final String topicName="topic_style";public static void main(String[] args) throws JMSException {//0. 先判断端口是否启动了  Active MQ 服务器ActiveMQUtil.checkServer();//1.创建ConnectiongFactory,绑定地址ConnectionFactory factory=new ActiveMQConnectionFactory(url);//2.创建ConnectionConnection connection= factory.createConnection();//3.启动连接connection.start();//4.创建会话Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.创建一个目标 (主题类型)Destination destination=session.createTopic(topicName);//6.创建一个生产者MessageProducer producer=session.createProducer(destination);for (int i = 0; i < 100; i++) {//7.创建消息TextMessage textMessage=session.createTextMessage("主题消息-"+i);//8.发送消息producer.send(textMessage);System.out.println("发送:"+textMessage.getText());}//9. 关闭连接connection.close();}
}

4.创建主题模式的消费者 TopicConsumer.java

package topic;import cn.hutool.core.util.RandomUtil;
import org.apache.activemq.ActiveMQConnectionFactory;
import utils.ActiveMQUtil;import javax.jms.*;public class TopicConsumer {//服务地址,端口默认61616private static final String url="tcp://127.0.0.1:61616";//这次消费的消息名称private static final String topicName="topic_style";//消费者有可能是多个,为了区分不同的消费者,为其创建随机名称private static final String consumerName="consumer-" + RandomUtil.randomString(5);public static void main(String[] args) throws JMSException {//0. 先判断端口是否启动了 Active MQ 服务器ActiveMQUtil.checkServer();System.out.printf("%s 消费者启动了。 %n", consumerName);//1.创建ConnectiongFactory,绑定地址ConnectionFactory factory=new ActiveMQConnectionFactory(url);//2.创建ConnectionConnection connection= factory.createConnection();//3.启动连接connection.start();//4.创建会话Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.创建一个目标 (主题类型)Destination destination=session.createTopic(topicName);//6.创建一个消费者MessageConsumer consumer=session.createConsumer(destination);//7.创建一个监听器consumer.setMessageListener(new MessageListener() {public void onMessage(Message arg0) {TextMessage textMessage=(TextMessage)arg0;try {System.out.println(consumerName +" 接收消息:"+textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}});//8. 因为不知道什么时候有,所以没法主动关闭,就不关闭了,一直处于监听状态//connection.close();}
}

5.启动顺序:对于主题模式而言, 消费者要先启动。 如果在生产者生产完成之后,消费者再启动,是看不到生产者之前发送的消息的

Spring集成ActiveMQ

前面是JMS模式下使用ActiveMQ,下面来采用Spring模式使用ActiveMQ

执行前面的1、2步(做过了就不用做了)

3.创建spring-jms.xml (文件名非强制要求,Idea工具需要添加Spring的Module指定这个文件,Maven项目这个放在src/main/resources目录下)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:p="http://www.springframework.org/schema/p"xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"><context:component-scan base-package="spring"></context:component-scan><!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--><bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://127.0.0.1:61616"/></bean><!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --><bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"><!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --><property name="targetConnectionFactory" ref="targetConnectionFactory"/></bean><!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --><property name="connectionFactory" ref="connectionFactory"/></bean><!--这个是队列目的地, ActiveMQQueue 就表示队列模式。 如果要用主题模式就改成 ActiveMQTopic就行了 --><bean id="textDestination" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg value="topic_style"/></bean><!-- 我的监听类 --><bean id="myMessageListener" class="spring.listener.MyMessageListener"></bean><!-- 消息监听容器,会伴随spring的启动 --><bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory" /><property name="destination" ref="textDestination" /><property name="messageListener" ref="myMessageListener" /></bean></beans>

3.创建生产者类 Producer.java

package spring;import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;@Component
public class Producer {@Autowiredprivate JmsTemplate jmsTemplate;@Autowiredprivate Destination textDestination;public void sendTextMessage(final String text){jmsTemplate.send(textDestination, new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(text);}});}
}

4.创建Spring模式的生产者测试类 SpringProducer.java

package spring;import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import utils.ActiveMQUtil;@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:spring-jms.xml")
public class SpringProducer {@Autowiredprivate Producer producer;@Beforepublic void checkServer() {ActiveMQUtil.checkServer();}@Testpublic void testSend(){for (int i = 0; i < 100; i++) {producer.sendTextMessage("消息 " + i);}}
}

5.创建监听类,用于获取新的消息  MyMessageListener.java

package spring.listener;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;import cn.hutool.core.util.RandomUtil;public class MyMessageListener implements MessageListener {String name = "consumer-"+ RandomUtil.randomString(5);public  MyMessageListener() {System.out.println(name + " started");}public void onMessage(Message message) {TextMessage textMessage=(TextMessage)message;try {System.out.println(name+" 接收到消息:"+textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}
}

6.创建Spring模式的消费者测试类 SpringConsumer.java

package spring;
import java.io.IOException;import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import utils.ActiveMQUtil;@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:spring-jms.xml")
public class SpringConsumer {@Beforepublic void checkServer() {ActiveMQUtil.checkServer();}@Testpublic void test(){try {//写这个是为了不让当前测试退出 因为spring的配置,MyMessageListener会自动启动System.in.read();} catch (IOException e) {e.printStackTrace();}}
}

7.启动顺序,当前是队列模式,启动顺序谁先启动都可以;主题模式则还是先启动消费者

8.切换模式,当前是队列模式,要切换为主题模式,在spring-jms.xml文件中找到如下进行修改

    <!--这个是队列目的地, ActiveMQQueue 就表示队列模式。 如果要用主题模式就将ActiveMQQueue改成ActiveMQTopic就行了 value值最好也跟着变为queue_style,容易区分 --><bean id="textDestination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="topic_style"/></bean>

至此ActiveMQ简单入门就完成了。。。

消息中间件之ActiveMQ入门相关推荐

  1. 消息中间件之ActiveMQ 入门

    1: 2: 3: 综上所述:总结为: MQ的作用定义

  2. ActiveMQ入门-amq入门

    ActiveMQ是什么 ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线. ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS ...

  3. ActiveMQ入门系列二:入门代码实例(点对点模式)

    在上一篇<ActiveMQ入门系列一:认识并安装ActiveMQ(Windows下)>中,大致介绍了ActiveMQ和一些概念,并下载.安装.启动他,还访问了他的控制台页面. 这篇,就用代 ...

  4. 消息中间件 --- Kafka快速入门

    消息中间件 --- Kafka 快速入门 消息中间件:https://blog.51cto.com/u_9291927/category33 GitHub: GitHub - scorpiostudi ...

  5. 手把手教你如何玩转消息中间件(ActiveMQ)

    原 手把手教你如何玩转消息中间件(ActiveMQ) 2018年07月15日 18:07:39 Cs_hnu_scw 阅读数 6494 </div><div class=" ...

  6. ActiveMQ入门 (三) JMS消息组成

    ActiveMQ入门 (三) JMS消息组成 文章目录 ActiveMQ入门 (三) JMS消息组成 一.JMS协议组成结构 二.JMS Message消息组成 1.JMS消息头 2.JMS消息体 1 ...

  7. ActiveMQ入门教程(三) - ActiveMQ P2P版的HelloWorld

    为什么80%的码农都做不了架构师?>>>    在这篇博客,我们来写一个ActiveMQ版的HelloWorld. 其实,要想写程序的话,还是要先了解一下JMS规范里的东西. 可以参 ...

  8. ActiveMQ入门教程(一) - JMS和ActiveMQ简介

    为什么80%的码农都做不了架构师?>>>    最近,想要学习一下JMS,进而了解到了ActiveMQ,就在这里记录一下. 1.ActiveMQ简介 ActiveMQ官方网站:htt ...

  9. 简单了解消息中间件及ActiveMQ

    最近阅读文献,发现一篇介绍中间件及ActiveMQ的好文章,特此转载留存,同时也分享给大家.原文献地址:https://segmentfault.com/a/1190000014958916?utm_ ...

最新文章

  1. linux grep 例子,Linux中Grep常用的15个例子
  2. honeywell新风系统控制面板说明_如何对新风系统维护保养?有哪些方法?
  3. 如何编译Android的kernel,如何下载并编译Android 4.0内核源码Goldfish(图文)
  4. ffmpeg m4a 转pcm_使用ffmpeg解码音频文件到PCM格式
  5. postman如何使用mockserver?
  6. windows 密码过期_为什么我的密码在Windows中过期?
  7. 群晖NPV套件(NPV Server)官方下载方法
  8. 1800 Flying to the Mars 大数 最多不上升序列 简化题意
  9. linux hd4000显卡驱动,AMD Radeon HD 2000/HD 3000/HD 4000系列显卡驱动怎么样
  10. 安装mysql提示oxc000007b_重装win7系统黑屏提示oxc000007b怎么办
  11. 多元回归分析。策略生成程序和策略分析程序二合一
  12. Intouch学习笔记一
  13. 平板N710无限重启解决办法——包括恢复出厂设置的办法
  14. 关于使用华为手机进行自动化测试的一个坑
  15. IT行业里的热门技术
  16. 家家都有机器人(转载)
  17. 安卓移动办公软件_移动办公软件,是让老板喜欢还是员工喜欢?
  18. golang 引入包报错package xxx is not in GOROOT
  19. C++ 之 C++ 操作 json 文件(C++读写json文件)及jsoncpp配置详解
  20. Linux配DD虚拟带库,ubuntu安装虚拟磁带库mhvtl的方法

热门文章

  1. 90亿拿下德邦 京东物流加速“造血”
  2. 王府井上半年营收46.88亿元 同比上涨36.88%
  3. 滴滴市值超百度 达到791亿美金
  4. 京东:截至11月11日00:09 累计下单金额超2000亿元
  5. 滴滴上线特快和特惠:极端天气绝不动态加价
  6. 刘强东宣布向瑞士捐赠160万只口罩及其他大量急需医疗物资
  7. BOSS直聘:2020一季度平均招聘薪资8609元 同比增长2.8%
  8. 聚划算百亿补贴上线新iPhone SE 售价2799元
  9. 华为P40系列国行版来了:价格成最大悬念!
  10. 多点Dmall发布系统Mini OS 宣称要五年覆盖百万门店