为什么要使用消息中间件?

在这一个新时代,随着用户越来越大,产品越来越规划,对数据冥等性,数据完整性要求越来越高,传统得TCP,业务交换等交换方式,满足不了我们得业务场景。

嘿嘿 ~~~~ 首先什么叫做消息中间件?

按照逻辑理解---> 存储 处理消息得一个中间件(工具)

按照业务理解---> 特殊消息业务处理得一个系统

消息中间件能为我们解决什么问题?

首先我们想象一个场景 用户网上下单 ,A系统(订单) B系统(下单)采用传统得方式 ,tcp或者RPC调用,如果A系统调用B系统进行下单业务处理得时候 ,出现调用失败(超时等)。

可能想到这里得童靴,既然调用失败,那我就重复调用,For-each ,如果系统负载大得话,我就定时进行重复调用(调用失败得数据 存入数据库,通过定时器重复调用消费,调用成功则进行删除),确实这个是一个解决得办法,但是细想,是否增大系统得开销和系统之间得耦合消费,于是乎中间件(MQ),就是它横空出示,它来替我们处理相关问题,它值得相信.

具体它(MQ)有什么优势呢?

  1. 保证消息得一致性,顺序
  2. 做存储服务
  3. 服务解耦
  4. 解决一些复杂业务场景(延迟通知,队列等)

哈哈 如果你感兴趣得话,请容我慢慢道来。。。。。。

什么是JMS?

JMS JAVA消息服务(Java Message Service,JMS),应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。 Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

MOM?

MOM架构消息中间件,它得主要原理类似消费者设计模式,比如 A系统,B系统,而在他们之中添加MOM ,A只需要把消息放入MOM,B什么时候来MOM来取,A也不需要知道,然后B使用完通过状态,通知,回调得方式通知A就行,不仅提高了效率和并发,还降低了系统之间耦合,间接性增加系统性能。

关于JMS消息模型?

P2P(Point-to-Point)

如图:

由此图看出:

1.发送者(Sender)

2.接收者(Receiver)

3.MOM(队列)

4.每个消息发送到特定得队列,接收者从队列获取消息。队列保留消息,直到Receiver通知MOM,消费完毕在进行剔除(被消费/或超时)

特点:

1.每个消息对应一个消费者,被消费完毕,消息不在存在

2.发送者和接收者没有任何依赖,不管接收者是否运行,都不影响发送者正常运行,如果发送者没有运行,消费者进行等待

3.消费者每消费一个消息时,都需要对MOM进行通知。

应用场景:

比如服务之间指定衔接,调用成功一致性。也可以用于A发送消息于B

发布/订阅(Publish/Subscribe(Pub/Sub)

如图:

由此图看出:

1.发布者(Pub)

2.订阅者(Sub)容许多个

3.发布服务器MOM(service)

客户端将消息发送到服务器。发布者将消息发送到订阅服务器,然后将这些消息传递给多个订阅者。

特点:

每个发布的消息,对应多个消费端(sub)

发布者和订阅者之间没有太多时间上依赖,往往之间依赖的只是服务(类似创建了一个主题topic),只有创建了订阅者,才能消费发布者消息,为了消费,订阅必须保存运行的状态

举例:类似Redis 发布和订阅

比如 

这个redisChat可以是一个List或者hash等结构

我们订阅频道 SUBSCRIBE  redisChat

频道上面发布消息 PUBLISH redisChat "Redis is a great caching technique"

那么订阅的客户端会收到这些消息

Redis is a great caching technique......

消费:

同步:

订阅者或接收者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞

异步:

订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。redis队列中的进行弹出消费,一般使用 Brpop 或者定时Brpoplpush等弹出消息进行

应用场景:

socket集群实现

业务解耦

异步处理

消息中间件之间得对比?

可以参考:https://blog.csdn.net/linsongbin1/article/details/47781187

直接从复制过来了......

RabbitMQ

使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了一个经纪人(Broker)构架,这意味着消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)或者数据持久化都有很好的支持。

Redis

Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。

ZeroMQ

最快的消息队列系统,尤其针对大吞吐量的需求场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演了这个服务角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果down机,数据将会丢失。其中,Twitter的Storm中使用ZeroMQ作为数据流的传输。

ActiveMQ

Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。RabbitMQ、ZeroMQ、ActiveMQ均支持常用的多种语言客户端 C++、Java、.Net,、Python、 Php、 Ruby等

Jafka/Kafka

Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现复杂均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制来统一了在线和离线的消息处理,这一点也是本课题所研究系统所看重的。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

其他一些队列列表HornetQ、Apache Qpid、Sparrow、Starling、Kestrel、Beanstalkd、Amazon SQS就不再一一分析。

使用ActiveMQ

1.官网链接:

https://www.cnblogs.com/biehongli/p/11522793.html

2.下载文件

apache-activemq-5.15.12-bin.tar.gz

3.解压文件

tar -zxvf apache-activemq-5.15.12-bin.tar.gz

4.查看是否安装JDK

JDK必须1.7以上

如果没有安装,可以看服务器是否有jdk包  yum -y list java*

yum -y  install java.xxxxxx JDK包

5.查看端口8161端口是否开放

netstat -lntp |grep 8161
firewall-cmd --zone=public --add-port=8161/tcp --permanent
firewall-cmd --reload
然后IP加端口进行访问

6.开机自启动

!/bin/sh### BEGIN INIT INFO
# Provides:          activemq
# Required-Start:    $remote_fs $network $syslog
# Required-Stop:     $remote_fs $network $syslog
# Default-Start:     3 5
# Default-Stop:      0 1 6
# Short-Description: Starts ActiveMQ
# chkconfig: 2345 64 36
# Description:       Starts ActiveMQ Message Broker Server
### END INIT INFO
export ACTIVEMQ_HOME=/usr/local/activemq/
case $1 instart)sh $ACTIVEMQ_HOME/bin/activemq start;;stop)sh $ACTIVEMQ_HOME/bin/activemq stop;;restart)sh $ACTIVEMQ_HOME/bin/activemq restart;;
esac
exit 0
开机自启动 chkconfig activemq on
启动服务systemctl start activemq
访问地址http://ip:prot/admin/ 默认用户名密码admin/admin

好了 B话不多说直接上代码

实现点对点

// 生产者
public class ProductService {public static void main(String[] args) throws Exception {// ConnectionFactory :连接工厂,JMS 用它创建连接ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin", "tcp://192.168.0.71:61616/");// JMS 客户端到JMS Provider 的连接Connection connection = connectionFactory.createConnection();connection.start();// Session: 一个发送或接收消息的线程// true 是否支持事务, false不支持事务Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// Destination :消息的目的地;消息发送给谁.// 获取session注意参数值my-queue是Query的名字Queue queue = session.createQueue("test-queue");// MessageProducer:createProducer 消息生产者MessageProducer producer = session.createProducer(queue);// 设置不持久化producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 发送一条消息for (int i = 1; i <= 5; i++) {sendMsg(session, producer, i);}session.commit();//手动关闭实时等待System.in.read();//关闭连接producer.close();session.close();connection.close();}/*** 在指定的会话上,通过指定的消息生产者发出一条消息** @param session*            消息会话* @param producer*            消息生产者*/public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {// 创建一条文本消息TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);// 通过消息生产者发出消息producer.send(message);}
}// 消费者
public class ConsumeService {public static void main(String[] args) throws Exception {
// ConnectionFactory :连接工厂,JMS 用它创建连接// ConnectionFactory :连接工厂,JMS 用它创建连接ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin", "tcp://192.168.0.71:61616/");// JMS 客户端到JMS Provider 的连接Connection connection = connectionFactory.createConnection();connection.start();// Session: 一个发送或接收消息的线程Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// Destination :消息的目的地;消息发送给谁.// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置Destination destination = session.createQueue("my-queue");// 消费者,消息接收者 createConsumerMessageConsumer consumer = session.createConsumer(destination);while (true) {TextMessage message = (TextMessage) consumer.receive();//  message.acknowledge();  手动签收if (null != message) {System.out.println("收到消息:" + message.getText());session.commit();} elsebreak;}session.close();connection.close();}
}1.由此我们可以看出,创建连接ActiveMQ连接池
2.获取连接
3.启动连接
4.创建ActiveMQ会话对象(用来处理接收和发送消息)
connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
第一个参数
TRUE如果session带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。
FALSE不带事务的session的签收方式,取决于session的配置。 参考第二个参数
第二个参数的作用
参考:https://docs.oracle.com/javaee/7/api/javax/jms/Session.html
AUTO_ACKNOWLEDGE 自动确认会话 当会话已成功从调用返回receive时,或会话已被
调用以处理消息的消息侦听器成功返回时,会话将自动确认客户端对消息的接收。
CLIENT_ACKNOWLEDGE 客户端通过调用消息的acknowledge方法来确认已使用的消息
DUPS_OK_ACKNOWLEDGE 此确认模式指示会话延迟确认消息的传递。
SESSION_TRANSACTED 该值可以作为参数传递给对象createSession(int sessionMode) 上的方法,
Connection以指定会话应使用本地事务。

代码运行成功如图:

 队列名称 待处理消息 消费人数 已入队消息数量 一出对消息数量

实现订阅/发布

发布
public class Pro {//频道top1, 或者说生成到topicsprivate static final String TOPIC = "top1";public static void main(String[] args) throws Exception {// ConnectionFactory :连接工厂,JMS 用它创建连接// ConnectionFactory :连接工厂,JMS 用它创建连接ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin", "tcp://192.168.0.71:61616/");// JMS 客户端到JMS Provider 的连接Connection connection = connectionFactory.createConnection();connection.start();// Session: 一个发送或接收消息的线程Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 创建生产者对象MessageProducer  producer = session.createProducer(null);// 进行持久话producer.setDeliveryMode(DeliveryMode.PERSISTENT);//生成宝贝for (int i = 1; i <= 5; i++) {System.out.println("我是消息" + i);TextMessage textMessage = session.createTextMessage("我是消息" + i);// 想topic 发送消息Destination destination = session.createTopic(TOPIC);producer.send(destination, textMessage);}session.close();connection.close();}
}订阅,创建topic
public class Sub {private static final String TOPIC = "top1";public static void main(String[] args) throws Exception {// ConnectionFactory :连接工厂,JMS 用它创建连接ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin", "tcp://192.168.0.71:61616/");// JMS 客户端到JMS Provider 的连接Connection connection = connectionFactory.createConnection();connection.start();// Session: 一个发送或接收消息的线程// true 是否支持事务, false不支持事务Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//// 创建一个队列Topic topic = session.createTopic(TOPIC);MessageConsumer consumer = session.createConsumer(topic);// 发送一条消息while (true) {TextMessage textMessage = (TextMessage) consumer.receive();if (textMessage != null) {System.out.println("接受到消息:" + textMessage.getText());
//                 textMessage.acknowledge();// 手动签收
//                 session.commit();} else {break;}}session.commit();//手动关闭实时等待System.in.read();//关闭连接session.close();connection.close();}}

如图

集成SpringBoot

导入POM
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--消息队列连接池-->
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.0</version>
</dependency><dependency><groupId>org.messaginghub</groupId><artifactId>pooled-jms</artifactId><version>1.0.3</version>
</dependency><!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-core -->
<dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId><version>1.2.3</version>
</dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency>
2。初始化配置类
package com.persist.sneer.mon.activemq.config;import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.jms.Queue;
import javax.jms.Topic;/*** @author dengxiao* @version 1.0* @className ActiveMQBean* @description TODO* @date 2020/5/2116:06**/
@Configuration
public class ActiveMQBean {@Value("${queque.police}")private String policeQueue;@Value("${queque.edu}")private String eduQueue;@Value("${topic.pbc}")private String pbcTopic;@Value("${topic.veh}")private String vehTopic;@Beanpublic Queue policeQueue() {return new ActiveMQQueue(policeQueue);}@Beanpublic Queue eduQueue() {return new ActiveMQQueue(eduQueue);}@Beanpublic Topic pbcTopic() {return new ActiveMQTopic(pbcTopic);}@Beanpublic Topic vehTopic() {return new ActiveMQTopic(vehTopic);}
}
3.创建ActiveMQConfig连接池对象
@Configuration
@Slf4j
public class ActiveMQConfig {@Beanpublic ActiveMQConnectionFactory connectionFactory() {return new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.0.71:61616");}@Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();bean.setPubSubDomain(true);bean.setConnectionFactory(connectionFactory);log.info("注入的connectionFactory>>>>>"+connectionFactory.getUserName());return bean;}@Beanpublic JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();bean.setConnectionFactory(connectionFactory);bean.setPubSubDomain(false);return bean;}/*@Beanpublic JmsMessagingTemplate jmsMessagingTemplate(ActiveMQConnectionFactory connectionFactory){return new JmsMessagingTemplate(connectionFactory);}*/}
4.创建消费者对象
@Slf4j
@Controller
public class ConsumerObj {@JmsListener(destination = "${queque.police}",containerFactory="jmsListenerContainerQueue")@SendTo("out.queue")public String receiveQueue(String text) {log.info("police消费者1接受到的消息");log.info(text);return "sample.queue接受的消息>>>>>" + text;}@JmsListener(destination = "${queque.police}",containerFactory="jmsListenerContainerQueue")public void receiveQueue2(String text){log.info("police消费者2接受到的消息");log.info(text);}@JmsListener(destination = "out.queue",containerFactory="jmsListenerContainerQueue")public void receiveOutQueue(String text) {log.info("out.Queue接受到的信息" + text);}@JmsListener(destination = "${topic.pbc}",containerFactory="jmsListenerContainerTopic")public void receiveTopic(String text){log.info("pbc1消费者接受的信息");log.info(text);}@JmsListener(destination = "${topic.pbc}",containerFactory="jmsListenerContainerTopic")public void receiveTopic2(String text){log.info("pbc2消费者接受的信息"+text);}@JmsListener(destination = "${topic.veh}",containerFactory="jmsListenerContainerTopic")public void receiveTopicUser(Message m){log.info("接受到的对象信息message>>>>"+m);
//        ObjectMessage m1 = (ObjectMessage) m;
//        m1.getObject();    获取序列化对象log.info("消费者接受的用户对象信息"+m);}
}
5 创建发布者
/*** @author* @version 1.0* @className ProducerObj   生成者* @description TODO* @date 2020/5/2116:07**/
@Component
public class ProducerObj implements CommandLineRunner {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Autowiredprivate Queue policeQueue;@Autowiredprivate Topic pbcTopic;@Autowiredprivate Topic vehTopic;@Overridepublic void run(String... args) throws Exception {for(int i=0;i<1;i++){send("Sample message"+i);}sendTopic("topic send");}public void send(String msg) {this.jmsMessagingTemplate.convertAndSend(this.policeQueue, msg);System.out.println("Message was sent to the policeQueue");}public void sendTopic(String msg){System.out.println("消费者发送topic消息");this.jmsMessagingTemplate.convertAndSend(this.pbcTopic,msg);System.out.println("Message was sent to the pbcTopic");}
}分别实现了P2P和发布/订阅功能

如图:

后续有时间学习并更新代码到github实现集群点对点,发布订阅,流量削峰等实现!!

来自互联网中一位落魄的搬砖人的自述:

虚怀若谷,卑谦前行 欧里给

一篇简单的散文了解消息中间件相关推荐

  1. iOS开发UI篇—简单介绍静态单元格的使用

    iOS开发UI篇-简单介绍静态单元格的使用 一.实现效果与说明 说明:观察上面的展示效果,可以发现整个界面是由一个tableview来展示的,上面的数据都是固定的,且几乎不会改变. 要完成上面的效果, ...

  2. java生成sm4算法的对称密钥_技术分享丨这是一篇简单的小科普——什么是对称加密算法?(下)...

    原标题:技术分享丨这是一篇简单的小科普--什么是对称加密算法?(下) 大家好~我是贾正经,又到了干货满满的技术分享趴啦~ 上期我们讲解了对称加密算法的小知识,并介绍了国密算法中SM4算法的原理. 本期 ...

  3. numpy基础篇-简单入门教程4

    numpy基础篇-简单入门教程4 np.set_printoptions(precision=3),只显示小数点后三位 np.random.seed(100)rand_arr = np.random. ...

  4. Android四大组件之Activity(第一篇-简单使用)

    Android四大组件之Activity(第一篇-简单使用) 前言 一.Activity是什么? 二.如何使用 1.继承 2.重写onCreate() 总结 文章目录 前言 一.Activity是什么 ...

  5. 计算机完成一篇文稿制作步骤,PPT制作一篇简单的演示文稿的详细步骤

    最近不少伙伴咨询PPT怎样制作一篇简单的演示文稿的操作,今天小编就带来了PPT制作一篇简单的演示文稿的详细步骤,感兴趣的小伙伴一起来看看吧! PPT制作一篇简单的演示文稿的详细步骤 1.在单击此处添加 ...

  6. PwnTheBox(web篇)简单题

    PwnTheBox(web篇)简单题 文章结构生成 # //*[@id="app"]/div/section/main/div[1]/div/div[6]/div/div/div/ ...

  7. 系列课程 ElasticSearch 之第 3 篇 —— 简单认识 Kibana 操作 ElasticSearch,ElasticSearch 的版本控制

    接上一篇博客继续讲解 Kibana 如何操作 ElasticSearch. 首先我们认识 Kibana 1.管理后台:http://127.0.0.1:5601/ 2.锁定左边的菜单栏 不然点击一个就 ...

  8. android 网络篇简单介绍

    1 简介 本文简单介绍android 开发中常用的webview .url. volley. json解析等网络工具.由于篇幅问题,这里只做简单介绍并不做详解. 2 WebView的用法 2.1 简单 ...

  9. 实战篇-简单多语言的实现

    对于 秋色园 的多语言的实现,很多人都问了一下是怎么实现的,这里,给网友简单介绍一下. 实现多语言,通常有以下方式: 1:使用系统的资源文件进行翻译 2:读取外部文件进行翻译 3:利用google等外 ...

最新文章

  1. 虫洞协议和闪电网络如何混为一谈?
  2. Exp9 Web安全基础
  3. 【转载】如何在归档后启用归档信息系统
  4. django 的ORM
  5. win7中VS2010中安装CSS3.0问题解决方法
  6. linux创建进程fork函数和vfork函数
  7. 计算机无法找到实达打印机,实达打印机使用方法教程
  8. asp.net如何获取客户端真实IP地址
  9. jQuery学习之三---工具
  10. Python程序员薪资 你不知道的事
  11. centos6使用docker部署zookeeper
  12. CCF201512试题
  13. linux中用at命令5分钟后执行,我使用过的Linux命令之at - 在指定时间执行一次任务...
  14. java 矩阵题目_java练习本(20190611)
  15. LIO-SAM探秘之文章索引
  16. RPC框架dubbo架构原理及使用说明
  17. Linux salt
  18. IIS 6.0 不能处理未知的 MIME 类
  19. 有限元法分析工程实际问题的一般过程
  20. PPC手机新手教程,使用方法,疑难解答,最强大的PPC手机使用手则

热门文章

  1. 微信启动界面:张小龙的情怀和马化腾的爱好
  2. 鼠友题库每日百题(六)
  3. pip install 库名 -i https://pypi.tuna.tsinghua.edu.cn/simple
  4. 刘强东卸任京东集团CEO 徐雷接任
  5. Kymeta加入美国陆军装甲旅战斗队试点项目
  6. java文件tree目录_java 遍历目录,操作文件 tree命令
  7. 汉字简体与繁体互相转换
  8. c语言NCR编码转换,NCR编码转换成字符
  9. 心田花开:成长路上千万不要弄丢了父母
  10. 前端开发体系建设日记