更多大数据架构、实战经验,欢迎关注【大数据每日哔哔】,期待与你一起成长!

本文将介绍一下 ActiveMQ 的安装、原理和简单实战。

一、什么是消息中间件

消息中间件顾名思义实现的就是在两个系统或两个客户端之间进行消息传送

二、什么是ActiveMQ

ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。

三、什么时候需要用ActiveMQ

ActiveMQ常被应用与系统业务的解耦,异步消息的推送,增加系统并发量,提高用户体验。例如以我在工作中的使用,在比较耗时且异步的远程开锁操作时

四、如何使用ActiveMQ

1.AcitveMQ的数据传送流程

2.ActiveMQ的两种消息传递类型

(1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,当消费者接受该条队列里的数据。

(2)基于发布/订阅模式的传输,即根据订阅话题来接收相应数据,一个生产者可向多个消费者推送数据,与MQTT协议的实现是类似的,对MQTT协议有兴趣的可跳转到https://www.cnblogs.com/xiguadadage/p/11216463.html

两种消息传递类型的不同,点对点传输消费者可以接收到在连接之前生产者所推送的数据,而基于发布/订阅模式的传输方式消费者只能接收到连接之后生产者推送的数据。

3.ActiveMQ的安装与启动

(1)官网下载对应服务器版本

(2)解压后进入apache-activemq-5.15.9/bin目录

(3)执行./activemq start启动ActiveMQ

(4)浏览器输入ActiveMQ启动的服务器ip:8161便可进入web界面,点击Manage ActiveMQ broker可以查看消息推送的状态,默认账号密码为admin,admin

(5)启动错误分析

进入/root/apache-activemq-5.15.9/data目录查看activemq.log文件,根据错误提示信息修改,例如端口号被占用等。

4.ActiveMQ的代码测试

(1)构建maven项目,引入依赖

org.apache.activemq    activemq-all    5.9.0

(2)生产者类

/** * @Description 生产者 * @Date 2019/7/20 * @Created by yqh */public class MyProducer {    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";    public static void main(String[] args) throws JMSException {        // 创建连接工厂        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        // 创建连接        Connection connection = activeMQConnectionFactory.createConnection();        // 打开连接        connection.start();        // 创建会话        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据        Destination destination = session.createQueue("myQueue");        // 创建一个生产者        MessageProducer producer = session.createProducer(destination);        // 向队列推送10个文本消息数据        for (int i = 1 ; i <= 10 ; i++){            // 创建文本消息            TextMessage message = session.createTextMessage("第" + i + "个文本消息");            //发送消息            producer.send(message);            //在本地打印消息            System.out.println("已发送的消息:" + message.getText());        }        //关闭连接        connection.close();    }}

运行结果:

已发送的消息:第1个文本消息已发送的消息:第2个文本消息已发送的消息:第3个文本消息已发送的消息:第4个文本消息已发送的消息:第5个文本消息已发送的消息:第6个文本消息已发送的消息:第7个文本消息已发送的消息:第8个文本消息已发送的消息:第9个文本消息已发送的消息:第10个文本消息

测试查看web后台显示,有10条消息在队列中等待消费

(3)消费者类

/** * @Description 消费者类 * @Date 2019/7/20 0020 * @Created by yqh */public class MyConsumer {    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";    public static void main(String[] args) throws JMSException {        // 创建连接工厂        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        // 创建连接        Connection connection = activeMQConnectionFactory.createConnection();        // 打开连接        connection.start();        // 创建会话        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据        Destination destination = session.createQueue("myQueue");        // 创建消费者        MessageConsumer consumer = session.createConsumer(destination);        // 创建消费的监听        consumer.setMessageListener(new MessageListener() {            public void onMessage(Message message) {                TextMessage textMessage = (TextMessage) message;                try {                    System.out.println("消费的消息:" + textMessage.getText());                } catch (JMSException e) {                    e.printStackTrace();                }            }        });    }}

测试结果:

消费的消息:第1个文本消息消费的消息:第2个文本消息消费的消息:第3个文本消息消费的消息:第4个文本消息消费的消息:第5个文本消息消费的消息:第6个文本消息消费的消息:第7个文本消息消费的消息:第8个文本消息消费的消息:第9个文本消息消费的消息:第10个文本消息

web后台显示有一个消费者处于连接状态,且已消费了10个message,而该条队列已没有message待消费了

(4)当我们运行两个消费者类,消息又是怎么被消费的呢?是两个消费者都能收到生产者生产的message,还是只有其中一个消费者能消费呢?

我们先运行两个消费者,在运行一个生产者对目标队列生产10个message,会发现有以下情况

// Consumer1控制台消费的消息:第1个文本消息消费的消息:第3个文本消息消费的消息:第5个文本消息消费的消息:第7个文本消息消费的消息:第9个文本消息
// Consumer2控制台消费的消息:第2个文本消息消费的消息:第4个文本消息消费的消息:第6个文本消息消费的消息:第8个文本消息消费的消息:第10个文本消息

即队列中的数据会平均的分给每一个消费者消费,且每一条数据只能被消费一次

(5)以上是基于队列点对点的传输类型,以下是基于发布/订阅模式传输的类型测试

/** * @Description 基于发布/订阅模式传输类型的生产者测试 * @Date 2019/7/20 0020 * @Created by yqh */public class MyProducerForTopic {    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";    public static void main(String[] args) throws JMSException {        // 创建连接工厂        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        // 创建连接        Connection connection = activeMQConnectionFactory.createConnection();        // 打开连接        connection.start();        // 创建会话        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据        Destination destination = session.createTopic("topicTest");        // 创建一个生产者        MessageProducer producer = session.createProducer(destination);        // 向队列推送10个文本消息数据        for (int i = 1 ; i <= 10 ; i++){            // 创建文本消息            TextMessage message = session.createTextMessage("第" + i + "个文本消息");            //发送消息            producer.send(message);            //在本地打印消息            System.out.println("已发送的消息:" + message.getText());        }        //关闭连接        connection.close();    }}
/** * @Description 基于发布/订阅模式传输类型的消费者测试 * @Date 2019/7/20 0020 * @Created by yqh */public class MyConsumerForTopic {    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";    public static void main(String[] args) throws JMSException {        // 创建连接工厂        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        // 创建连接        Connection connection = activeMQConnectionFactory.createConnection();        // 打开连接        connection.start();        // 创建会话        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据        Destination destination = session.createTopic("topicTest");        // 创建消费者        MessageConsumer consumer = session.createConsumer(destination);        // 创建消费的监听        consumer.setMessageListener(new MessageListener() {            public void onMessage(Message message) {                TextMessage textMessage = (TextMessage) message;                try {                    System.out.println("消费的消息:" + textMessage.getText());                } catch (JMSException e) {                    e.printStackTrace();                }            }        });    }}

现在如果我们先启动生产者,再启动消费者,会发现消费者是无法接收到之前生产者之前所生产的数据,只有消费者先启动,再让生产者消费才可以正常接收数据,这也是发布/订阅的主题模式与点对点的队列模式的一个明显区别。

而如果启动两个消费者,那么每一个消费者都能完整的接收到生产者生产的数据,即每一条数据都被消费了两次,这是发布/订阅的主题模式与点对点的队列模式的另一个明显区别。

更多大数据架构、实战经验,欢迎关注【大数据每日哔哔】,期待与你一起成长!

启动activemq_浅谈ActiveMQ与使用相关推荐

  1. activemq 发两条只收到一条_浅谈ActiveMQ与使用

    更多大数据架构.实战经验,欢迎关注[大数据每日哔哔],期待与你一起成长! 本文将介绍一下 ActiveMQ 的安装.原理和简单实战. 一.什么是消息中间件 消息中间件顾名思义实现的就是在两个系统或两个 ...

  2. /opt/hbase/conf 中不能启动hbase_浅谈Hbase在用户画像上的应用

    背景 公司要做C端的用户画像,方便运营人员根据标签做用户圈选,然后对这部分人群做精准广告投放,所以需要线上接口实时调用库中数据,并快速返回结果,并将结果反馈到推送平台进行推送. 目前公司的方案是全部走 ...

  3. 浅谈嵌入式MCU软件开发之S32K1xx系列MCU启动过程及重映射代码到RAM中运行方法详解

    内容提要 注:本文摘自NXP工程师胡恩伟的微信公众号"汽车电子expert成长之路",大家感兴趣可以关注一下. 引言 1. S32K1xx系列MCU启动过程详解(startup_S ...

  4. 普歌-浅谈RabbitMQ

    浅谈RabbitMQ 一.mq简介 mq(Message Queue),一种提供消息队列服务的中间件,也称消息中间件,提供消息生产.存储.消费全过程API软件系统. 二.常见的MQ ActiveMQ, ...

  5. 浅谈几种区块链网络攻击以及防御方案之拒绝服务攻击

    旧博文,搬到 csdn 原文:http://rebootcat.com/2020/04/14/network_attack_of_blockchain_ddos_attack/ 写在前面的话 自比特币 ...

  6. 浅谈几种区块链网络攻击以及防御方案之日蚀攻击

    旧博文,搬到 csdn 原文:http://rebootcat.com/2020/04/12/network_attack_of_blockchain_eclipse_attack/ 写在前面的话 自 ...

  7. 浅谈大数据中的 2PC、3PC、Paxos、Raft、ZAB

    一致性 简述 一致性,是指对每个节点一个数据的更新,整个集群都知道更新,并且是一致的.假设一个具有N个节点的分布式系统,当其满足以下条件时,我们说这个系统满足一致性: 全认同: 所有N个节点都认同一个 ...

  8. java程序的装载与检查_浅谈Java类型装载、连接与初始化

    类型装载.连接与初始化 Java虚拟机通过装载.连接和初始化一个Java类型,使该类型可以被正在运行的Java程序所使用.其中装载就是把二进制形式的Java class文件读入Java虚拟机中去;连接 ...

  9. JDK与JRE的关系和path的作用浅谈

    JDK与JRE的关系和path的作用浅谈 摘要:JDK与JRE的关系以及path的配置对于初学者是要明白的问题,那么具体的JDK与JRE的关系是什么呢?那么本文讲给你简单介绍. 标签:JDK与JRE关 ...

最新文章

  1. poj3648 2-sat 输出任意一组解
  2. MarkDown写ppt
  3. 服务至上——安擎的待“客”之道
  4. cout和printf的区别
  5. 计算机音乐新年好呀,新年好呀新年好 伴奏
  6. pon移动家庭网关有虚拟服务器吗,电信、移动、联通家庭网关对比分析
  7. Transformer结构详解
  8. 一个大数据屌丝的一天
  9. vs 输入代码时出现火花_VSV和VBV随发动机转速和进气温度怎么变化维修执照机务在线...
  10. 旷世英才遭天妒——拉马努金
  11. 如何安装百度分享按钮
  12. ES slop问题纪录
  13. 我写了一个微信排版编辑器(已开源)
  14. Field communityMapper in com.estate.service.impl.CommunityServiceImpl required a bean of type ‘
  15. 模型auc指标_auc致命缺陷模型指标
  16. Word处理控件Aspose.Words功能演示:使用C#对PDF文件进行进一步修改和转换
  17. 小米2S最新Android版本,小米2/2s如何刷Android 5.0?小米2s升级安卓5.0步骤
  18. 新一代交通控制网与智慧公路建设
  19. vue2--代码高亮
  20. Web3:打造一个公平、开发的价值互联网

热门文章

  1. 极大似然估计的直观解释-转
  2. JZOJ __Day 3:【NOIP普及模拟】求和(sum)
  3. 网盘php资料,怎么搜索百度网盘里的资料(php版)
  4. 计算机编程软文,全盲男孩自学编程,一句话打动无数网友…
  5. php 处理对象用什么,程序处理的对象是什么
  6. unity创建一个简单对象的开销_Unity下简单对象池的创建与使用
  7. 如何用苹果手机生成扫描件
  8. 简单哈弗曼树(Java)
  9. Java基础:详解HashMap在多线程下不安全
  10. 新玩法,CentOS7中LVM通过扩展逻辑卷扩展swap空间