文章目录

  • **第一章 消息中间件概述**
    • 1. 消息中间件的好处
    • 2. 什么是消息中间件
    • 3. 什么是JMS(规范)
    • 4. 什么是AMQP(协议)
    • 5. 几个常用消息中间对比
  • **第二章 初始JMS**
    • **2.1 JSM相关概念**
    • **2.2 队列模式**
    • **2.3 主题模式**
    • **2.4 JSM编码接口**
  • **第三章 ActiveMQ的使用**
    • 3.1 activeMQ在Windows平台上的安装
    • 3.2 ActiveMQ的队列模式
    • 3.3 ActiveMQ的主题模式(发布/订阅)
    • **3.4 spring集成JMS连接ActiveMQ**
      • **3.4.1 几个相关类**
      • **3.4.2 消息队列模式与spring集成**
      • 3.4.3 主题模式与spring的集成
  • **第四章 ActiveMQ集群**
    • **4.1 集群方式**
    • **4.2 客户端配置**
      • 4.2.1. ActiveMQ失效转移(failover):
    • **4.3 Broker Cluster集群配置**
    • **4.4 Master/Slave集群配置**
    • **4.5 Broker clusters和Master Slave对比**
    • **4.6 高可用且负载均衡的集群方案**
  • 第五章 消息中间件如何传对象

第一章 消息中间件概述

1. 消息中间件的好处

解耦、异步、横向扩展、安全可靠、顺序保证

2. 什么是消息中间件

发送和接收数据,利用高效可靠的异步消息传递机制集成分布式系统

3. 什么是JMS(规范)

Java消息服务(Java Message Service),是一个Java平台中面向消息中间件的API

4. 什么是AMQP(协议)

AMQP(advanced message queuing protocol),是一个提供统一消息服务的应用层标准协议。
此协议不受客户端和中间件的不同产品和不同开发语言的限制。

5. 几个常用消息中间对比

. ActiveMQ RabbitMQ Kafka
优点 遵循JMS规范,安装方便 继承Erlang天生的并发性,最初用于金融行业,稳定性和安全性有保障 依赖zk,可动态扩展节点,高性能、高吞吐量、无限扩容、消息可指定追溯
缺点 有可能会丢失消息。现在的重心在下一代产品apolle上,所以5.x的产品不怎么维护了 Erlang语言难度较大,不支持动态扩展 严格的顺序机制,不支持消息优先级,不支持标准的消息协议,不利于平台迁移
支持协议 AMQP,OpenWire,Stomp,XMPP AMQP
应用 适合中小企业,不适合好千个队列的应用 适合对稳定性要求高的企业级应用 应用在大数据日志处理或对实时性、可靠性(少量数据丢失)要求较低的场景应用

第二章 初始JMS

2.1 JSM相关概念

  1. 提供者: 实现JMS规范的消息中间件服务器
  2. 客户端:发送或接收消息的应用程序
  3. 生产者/发布者: 创建并发送消息的客户端
  4. 消费者/订阅者:接收并处理消息的客户端
  5. 消息:应用程序之间传递的数据内容
  6. 消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式

2.2 队列模式

1. 特性
客户端包括生产者和消费者
队列中的消息只能被一个消息费者消息
消费者可以随时消费队列中的消息

2. 队列模型示意图

2.3 主题模式

1. 特性
客户端包括发布者和订阅者
主题中的消息被所有订阅者消息
消费者不能消费订阅之前就发送到主题中的消息

2. 主题模型示意图

2.4 JSM编码接口

ConnectionFactory 用于创建连接到消息中间件的连接工厂
Connection 代表了应用程序和消息服务器之间的通信链路
Destination 指消息发布和接收的地点,包括队列或主题
Session 表示一个单线程的上下文,用于发送和接收消息
MessageProducer 由会话创建,用于发送消息到目标
MessageConsumer 由会话创建,用于接收发送到目标的消息
Message 是在消费者和生产者之间传送的对象, 消息头,一组消息属性,一个消息体

第三章 ActiveMQ的使用

3.1 activeMQ在Windows平台上的安装

1.下载ActiveMQ
去官方网站下载:http://activemq.apache.org/activemq-5152-release.html

2.运行ActiveMQ
解压缩apache-activemq-5.5.1-bin.zip到C盘,然后双击C:\apache-activemq-5.15.2\bin\win64\activemq.bat运行ActiveMQ程序。

启动ActiveMQ以后,登陆:http://localhost:8161/admin/,进入管理界面。
用户名与密码均为:admin

3.2 ActiveMQ的队列模式

生产者代码片

package com.queue;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;
/*** 生产者* @author Peter**/
public class Proceducer {/*** */private final static String URL = "tcp://localhost:61616";/*** */private final static String QUEUE_NAME = "queue-name";public static void main(String[] args) throws JMSException {// 1. 创建ConnectionFactoryConnectionFactory factory = new ActiveMQConnectionFactory(URL);// 2. 创建ConnectionConnection con = factory.createConnection();// 3. 启动连接con.start();// 4. 创建会话Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 创建一个目标Destination dest = session.createQueue(QUEUE_NAME);// 6. 创建一个生产者MessageProducer pro = (MessageProducer) session.createProducer(dest);for(int i = 0; i<10;i++) {// 7. 创建消息TextMessage msg = session.createTextMessage("消息"+i); // 8. 发布消息pro.send(msg);System.out.println(msg);}// 9. 关闭连接con.close();}
}

执行上面代码后,在管理界面看到的结果是:

消费者代码片

/*** 消费者* @author Peter*/
public class Consumer {/*** 中间件地址*/private final static String URL = "tcp://localhost:61616";/*** 中间件队列名,与生产者的一致*/private final static String QUEUE_NAME = "queue-name";public static void main(String[] args) throws JMSException {// 1. 创建ConnectionFactoryConnectionFactory factory = new ActiveMQConnectionFactory(URL);// 2. 创建ConnectionConnection con = factory.createConnection();// 3. 启动连接con.start();// 4. 创建会话Session session = con.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 5. 创建一个目标Destination dest = session.createQueue(QUEUE_NAME);             // 6. 创建一个消费者MessageConsumer consumer = session.createConsumer(dest);// 7. 创建一个监听器consumer.setMessageListener(new MessageListener() {          @Overridepublic void onMessage(Message message) {TextMessage msg = (TextMessage) message;try {System.out.println("接收消息为:"+msg.getText());} catch (JMSException e) {e.printStackTrace();}}});// 先不关闭,不然还没接收到消息就关闭了//con.close();}
}

执行上面代码后,在管理界面的结果如下:

如果我再新建一个消费者,我们会发现,两个消费者在抢收消息,即一个消费者收到了消息,则另一个消费者就收不到该消息了。

3.3 ActiveMQ的主题模式(发布/订阅)

由于订阅者是收不到还未订阅主题之前的内容的,所以必须要先启动订阅者。

订阅者代码片:

/*** 订阅者* @author Peter**/
public class Consumer {private final static String URL = "tcp://localhost:61616";private final static String TOPIC_NAME = "topic-name";public static void main(String[] args) throws JMSException {// 1. 创建ConnectionFactoryConnectionFactory factory = new ActiveMQConnectionFactory(URL);// 2. 创建ConnectionConnection con = factory.createConnection();// 3. 启动连接con.start();// 4. 创建会话Session session = con.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 5. 创建一个目标【与队列模式的区别就在这里,相当于订阅了该主题】Destination dest = session.createTopic(TOPIC_NAME);          // 6. 创建一个消费者MessageConsumer consumer = session.createConsumer(dest);// 7. 创建一个监听器consumer.setMessageListener(new MessageListener() {          @Overridepublic void onMessage(Message message) {TextMessage msg = (TextMessage) message;try {System.out.println("接收消息为:"+msg.getText());} catch (JMSException e) {e.printStackTrace();}}});// 先不关闭,不然还没接收到消息就关闭了//con.close();}
}

发布者代码片

/*** 发布者* @author Peter**/
public class Proceducer {private final static String URL = "tcp://localhost:61616";private final static String TOPIC_NAME = "topic-name";public static void main(String[] args) throws JMSException {// 1. 创建ConnectionFactoryConnectionFactory factory = new ActiveMQConnectionFactory(URL);// 2. 创建ConnectionConnection con = factory.createConnection();// 3. 启动连接con.start();// 4. 创建会话Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 创建一个目标【与队列模式的区别就在这里,相当于发布一个主题】Destination dest = session.createTopic(TOPIC_NAME);// 6. 创建一个生产者MessageProducer pro = (MessageProducer) session.createProducer(dest);for(int i = 0; i<10;i++) {// 7. 创建消息TextMessage msg = session.createTextMessage("消息"+i); // 8. 发布消息pro.send(msg);System.out.println(msg);}// 9. 关闭连接con.close();}
}

如果我们再新建一个订阅者,我们会发现两个订阅者收到的消息完全一样。

3.4 spring集成JMS连接ActiveMQ

我们下载的activeMQ压缩文件里解压后,能找到相关的jar包,但spring-jms这个可去maven仓库下载

3.4.1 几个相关类

1. ConnectionFactory 用于管理连接的连接工厂【也是连接池:管理JmsTemplate每次发送消息都会重新创建的连接、会话、productor】
实现类:
SingleConnectionFactory:每次都返回同一个连接
CachingConnectionFactory:继承了SingleConnectionFactory,并实现了缓存

2.JmsTemplate 用于发送和接收消息的模板类
由spring提供,它是线程安全类,可以在整个应用范围内应用

3.MessageListener 消息监听器
只需实现一个只接收Message参数的onMesssage方法

3.4.2 消息队列模式与spring集成

1. 发送消息的接口

public interface ProducerInter {public void sendMessage(String message);
}

2. 发送消息实现类

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;public class ProducerImpl implements ProducerInter {@AutowiredJmsTemplate jms;// 由于可能会有多个目标,所以一定要以注入bean的id区分@Resource(name="destination")Destination destination;@Overridepublic void sendMessage(String message) {jms.send(destination, new MessageCreator() {            @Overridepublic Message createMessage(Session sessioin) throws JMSException {TextMessage msg = sessioin.createTextMessage(message);System.out.println("发送消息:"+msg.getText());return msg;}});}}

3. 配置文件(producer.xml)

<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd  http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd  http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd  http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsdhttp://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd"default-autowire="byName" default-lazy-init="false"><!-- 开启注解 --><context:component-scan base-package="com.jms.spring"></context:component-scan><!-- ActiveMQ提供的 --><bean id="targetConnectionFactoryId" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616"/></bean><!-- spring提供的连接池 --><bean id="connectionFactoryId" class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="targetConnectionFactoryId"/></bean><!-- 创建一个点对点的队列目标对象 --><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="queuename"/></bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactoryId"/></bean><!--  --><bean id="producerImpl" class="com.jms.spring.ProducerImpl"></bean>
</beans> 

4. 测试发送
执行之后,进入管理界面可查看结果

public class TestProducer {public static void main(String[] args) {// 从classpath下加载配置文件ApplicationContext applicationContext = new ClassPathXmlApplicationContext("producer.xml");ProducerImpl pro = (ProducerImpl) applicationContext.getBean("producerImpl");pro.sendMessage("hello world");}
}

5. 监听消息类

public class ConsumerMessageListener implements MessageListener{// 监听消息@Overridepublic void onMessage(Message message) {TextMessage msg = (TextMessage) message;try {System.out.println("收到消息:"+msg.getText());} catch (JMSException e) {e.printStackTrace();}}}

6. 接收消息的配置

<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd  http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd  http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd  http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsdhttp://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd"default-autowire="byName" default-lazy-init="false"><!-- 开启注解 --><context:component-scan base-package="com.jms.spring"></context:component-scan><!-- ActiveMQ提供的 --><bean id="targetConnectionFactoryId" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616"/></bean><!-- spring提供的连接池 --><bean id="connectionFactoryId" class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="targetConnectionFactoryId"/></bean><!-- 创建一个点对点的队列目标对象 --><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="queuename"/></bean><!-- 上面的配置与producer.xml里是一样的 --><!-- 注入消息监听器 --><bean id="consumerMessageListener" class="com.jms.spring.ConsumerMessageListener"></bean><!-- 配置消息监听容器 --><bean id="jmsContainerListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactoryId"/><property name="destination" ref="destination"/><property name="messageListener" ref="consumerMessageListener"/></bean></beans>

7. 测试消费者

public class TestConsumer {public static void main(String[] args) {new ClassPathXmlApplicationContext("consumer.xml");     }
}

3.4.3 主题模式与spring的集成

只需要将配置文件中的目标对象org.apache.activemq.command.ActiveMQQueue改成org.apache.activemq.command.ActiveMQTopic即可。需要注意的是,在主题模式下,一定要先启动消费者。

第四章 ActiveMQ集群

4.1 集群方式

客户端集群:让多个消费者消费同一个队列
Broker clusters:多个Broker之间同步消息
Master Slave(主从):实现高可用

4.2 客户端配置

4.2.1. ActiveMQ失效转移(failover):

定义:允许当其中一台消息服务器宕机时,客户端在传输层上重新连接到其它消息服务器
语法:failover:(uri1,uri2,…,uriN)?transportOptions
transportOptions参数说明
randomize 默认为true,表示在uri列表中选择uri连接时,是否采用随机策略
initialReconnectDelay 默认为10,单位毫秒,表示第一尝试重连之间等待的时间
maxReconnectionDelay 默认30000,单位毫秒,最长重连的时间间隔

4.3 Broker Cluster集群配置

1. 原理:

2. NetworkConnector(网络连接器)
网络连接器主要用于配置ActiveMQ服务器与服务器之间的网络通讯方式,用于服务器透传消息
分为静态连接器和动态连接器

3. 静态连接器:适用连接地址不多的情况

<networkConnectors><networkConnector uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)">
</networkConnectors>

4. 动态连接器

<networkConnectors><networkConnector uri="multicast://default">
</networkConnectors>
<transportConnectors><transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default">
</transportConnectors>

4.4 Master/Slave集群配置

1. Master/Slave集群方案
Share nothing storage master/slave (5.8 以后的版本删除了)
Share storage master/slave 共享存储
Replicated LevelDB Store 基于可复制的LevelDB Store

2. 共享存储集群的原理
先启动A,A就因为排他锁独占资源成为Master,此时A有外部服务能力,而B没有

如果A挂了,则B获取资源成为Master,这时所有请求都会交给B

3. 基于复制的LevelDB Store的原理
因为是基于ZooKeeper的,所以至少需要3劝服务器。zk选举A作为Master后,A就具有了外部服务能力,而B、C没有。当A获取到外部资源存储后,会通过zk将资源同步到B和C。

如果A故障,则zk会重新选举一个节点作为Master

4.5 Broker clusters和Master Slave对比

. 高可用 负载均衡
Master/Slave
Broker Cluster

4.6 高可用且负载均衡的集群方案

第五章 消息中间件如何传对象

利用Json

Java消息中间件(activeMQ)相关推荐

  1. Java消息中间件--ActiveMq,RabbitMQ,Kafka

    Java消息中间件–acitceMq,Kafuka,RubiitMq 能否用代理模式来写的,订阅,中间件 中间件, 非底层操作系统软件,非业务应用软件.不是直接给最终用户使用的,不能直接给客户带来价值 ...

  2. 消息中间件-Activemq之Broker-Cluster

    2019独角兽企业重金招聘Python工程师标准>>> 接着上一篇消息中间件-Activemq之Master-Slaver,下面看看Broker-Cluster实现负载均衡 Brok ...

  3. 消息中间件ActiveMQ、RabbitMQ、RocketMQ、ZeroMQ、Kafka如何选型

    转载自 消息中间件ActiveMQ.RabbitMQ.RocketMQ.ZeroMQ.Kafka如何选型? 最近要为公司的消息队列中间件进行选型,市面上相关的开源技术又非常多,如ActiveMQ.Ra ...

  4. Java消息中间件--JMS规范

    Java消息中间件–JMS规范

  5. 消息中间件activeMQ原理和使用

    一: 消息中间件的优势     1.使用消息中间件,降低了dubbo服务之间的耦合度,使得两个原本使用远程注入耦合了的模块可以实现松耦合,使用消息中间件进了间接通信.     2.缓冲能力,消息中间件 ...

  6. 以ActiveMQ为例JAVA消息中间件学习【4】——消息中间件实际应用场景

    前言 当前真正学习消息中间件,当前已经走到了,可以简单的使用,网上有很多那种复杂的高可用的架构,但是那些都是对于一些比较大型的项目来说的. 对于一些小型的项目可能用不到那么大的架构,于是我们需要从最简 ...

  7. python消息中间件activemq_消息中间件ActiveMQ和JMS基础

    MQ主要流程 解耦,异步,消峰 其中目的地主要为队列或者主题 队列点对点 消息的生产者 或者 这时消息的生产者名字已经出来 并且入队的数量变成了3 上述完成的也就是这部分 消息的消费者 前四步大同小异 ...

  8. 剑指offer之消息中间件ActiveMQ知识总结

    1.JMS(Java Message Service,Java消息服务) 1.1 定义 Java消息服务(Java Message Service,即JMS)应用程序接口是一个Java平台中关于面向消 ...

  9. JAVA消息中间件面试题

    前言 文章开始前,我们先了解一下什么是消息中间件? 什么是中间件? 非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件. 什么是消息中间件? 是关注 ...

最新文章

  1. 八种简易健康减肥瘦身法
  2. mysql数据库21_Mysql5.7.21安装文档
  3. 计算机毕业优秀作品展观后感,信息学院毕业设计作品展观后感
  4. c语言 元组顺序随机化,为什么关系中的元组没有先后顺序且不允许有重复元组?...
  5. koa连接mysql怎么做_koa-连接mysql数据库
  6. 数字IC设计经典书籍
  7. P3313-[SDOI2014]旅行【树链剖分,线段树】
  8. linux无盘工作站互不干扰,Linux环境下无盘工作站的架设和实现二
  9. 羡慕的泪水!显卡对决!GPU A6000和RTX 3090 性能对比
  10. css定位、position与float同时使用的情况
  11. Java多窗口编程示例
  12. 牛客网算法题目-最优乘车题解
  13. mysql 表名 下划线_我们可以在MySQL表名中加下划线吗?
  14. 米尔格拉姆连锁信实验_连锁信:使客户对个性化电子邮件感到满意
  15. 前端jquery实现图片点击放大缩小
  16. .java输出n行数字塔
  17. 从CSDN博客到出书,我的新书《SQL编程思想》是这样诞生的
  18. 2021virtualbox中Ubuntu16.04:开发环境配置,更换源
  19. CSS颜色:RGB颜色/HEX颜色/HSL颜色(网页颜色完全总结)
  20. mysql中esc是什么意思_esc按钮是什么意思

热门文章

  1. TEEC_Context和TEEC_InitializeContext介绍
  2. IDA分析shellcode导入windows结构体
  3. 安卓在线按钮设计网站
  4. Windows RDP协议 Fuzzing 漏洞挖掘研究
  5. 都21世纪20年代了,还有人问我学网络安全干嘛,你自己看吧
  6. 1.API的调用过程(3环部分)
  7. 【Prometheus Pushgateway】 推送数据踩坑
  8. Git push error: Unable to unlink old (Permission denied)
  9. 爬虫Selenium报错“cannot find Chrome binary“解决方案
  10. 三对角矩阵(带状矩阵)的压缩存储原理