在上一篇《ActiveMQ入门系列一:认识并安装ActiveMQ(Windows下)》中,大致介绍了ActiveMQ和一些概念,并下载、安装、启动他,还访问了他的控制台页面。

这篇,就用代码实例说下如何实现消息的生产和消费。

一、理论基础

同RabbitMQ一样,ActiveMQ中也是有两种模式:

  • 点对点模式(Point to Point,简写为PTP)
  • 发布/订阅模式(Publish & Subscribe,简写为Pub & Sub)

通过上一篇我们知道了制造消息的应用叫生产者(Producer),生产者在生产了消息后会发送消息到目的地(Destination),到达消费和处理消息的应用(也就是消费者Consumer)。这里的两种模式就通过对应不同的消息目的地(Destination)来实现,PTP对应Queue(队列)、Pub&Sub对应Topic(主题)。

今天就详细介绍下PTP和Queue,下一篇介绍Pub & Sub和Topic。

在PTP模式的示意图:

  • 消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。
  • 消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。
  • Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了。
  • 当消费者不存在时,消息会一直保存,直到有消费消费。

在PTP中,代码实现有两种方式:消费者主动消费和消费者监听消费,下面就分别说下。

二、消费者主动消费

主动消费是最基本也是最简单的消费方式,先上代码:

  1. 创建maven工程并引入依赖

    <dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-core</artifactId><version>5.7.0</version></dependency>

  2. 实现生产者
    package com.sam.ptp;import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    /*** @author JAVA开发老菜鸟**/
    public class Producer {public static final  String QUEUE_NAME = "ptp-demo";//队列名public void producer(String message) throws JMSException {ConnectionFactory factory = null;Connection connection = null;Session session = null;MessageProducer producer = null;try {/*** 1.创建连接工厂* 创建工厂,构造方法有三个参数:分别是用户名、密码、连接地址* 无参构造:有默认的连接地址,localhost* 一个参数:无验证模式,无用户的认证* 三个参数:有认证和连接地址,我这里使用三个参数的构造方法*/factory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");/*** 2.创建连接,有两个方法(我这里使用无参数的)* 无参数* 有参数:用户名、密码;*/connection = factory.createConnection();/*** 3.启动连接* 生产者可以不用调用start()方法启动,因为在发送消息的时候回进行检查* 如果未启动连接,会自动启动。* 如果有特殊配置,需要配置完成后再启动连接*/connection.start();/*** 4.用连接创建会话* 有两个参数:是否需要事务、消息确认机制* 如果支持事务,对于生产者来说第二个参数就无效了,这个时候第二个参数建议传入Session.SESSION_TRANSACTED* 如果不支持事务,第二个参数有效且必须传递** AUTO_ACKNOWLEDGE:自动确认,消息处理后自动确认(商业开发不推荐)* CLIENT_ACKNOWLEDGE:客户端手动确认,消费者处理后必须手动确认* DUPS_OK_ACKNOWLEDGE:有副本的客户端手动确认,消息可以多次处理(不建议)*/session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);/*** 5.用会话创建目的地(队列)、生产者、消息* 队列名是队列的唯一标记* 创建生产者的时候可以指定目的地,也可以在发送消息的时候再指定*/Destination destination = session.createQueue(QUEUE_NAME);producer = session.createProducer(destination);TextMessage textMessage = session.createTextMessage(message);/*** 6.生产者发送消息到目的地*/producer.send(textMessage);System.out.println("消息发送成功");} catch(Exception ex){throw ex;} finally {/*** 7.释放资源*/if(producer != null){producer.close();}if(session != null){session.close();}if(connection != null){connection.close();}}}public static void main(String[] args){Producer producer = new Producer();try{producer.producer("hello, activemq");} catch (Exception ex){ex.printStackTrace();}}
    }

  3. 实现消费者
    package com.sam.ptp;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** @author JAVA开发老菜鸟** 主动消费*/
    public class Consumer {public String consumer() throws JMSException {ConnectionFactory factory = null;Connection connection = null;Session session = null;MessageConsumer consumer = null;try {factory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");connection = factory.createConnection();/*** 消费者必须启动连接,否则无法消费*/connection.start();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue(Producer.QUEUE_NAME);consumer = session.createConsumer(destination);/*** 获取队列消息*/Message message = consumer.receive();String text = ((TextMessage) message).getText();return text;} catch(Exception ex){throw ex;} finally {/*** 7.释放资源*/if(consumer != null){consumer.close();}if(session != null){session.close();}if(connection != null){connection.close();}}}public static void main(String[] args){Consumer consumer = new Consumer();try{String message = consumer.consumer();System.out.println("消息消费成功:" + message);} catch (Exception ex){ex.printStackTrace();}}
    }

好,这样代码就写好了,我们来测试下。

1.先运行生产者,我发现报错了。。。

好吧,原来是我这次没有启动ActiveMQ,被自己蠢哭了。。。

启动ActiveMQ之后,再运行生产者,成功了。

去看下控制台页面的变化,队列里面多了个“ptp-demo”队列,这个就是我们生产者代码里面的队列名,并且能看到该队列的基本情况:

从左到右依次为,有待消费的消息1条、消费者0个、已经发送的消息1条、已经消费的消息0条

2.接下来运行消费者,成功

再去看下控制台页面,发现队列信息变了,从左到右依次为:有待消费的消息0条、消费者0个、已经发送的消息1条、已经消费的消息1条

也就是说,消息真的被消费了!

代码写完了,也按照预期执行完了,我们现在再回过头来分析下消费者的代码,会发现他在consumer.receive()之后不会再消费其他消息了,即便后面再有消息被生产出来也不会再消费。也就是说只能在运行后消费一次消息,这个就是主动消费。

如果想要循环消费多次产生的消息的话,怎么办呢?请用下面的监听消费

三、消费者监听消费

还是先上代码,代码结构同主动消费类似,有细微差别,具体代码不贴了,可以到我的GitHub或码云上获取源码

  1. 首先为了区分,我把队列名改了

    public static final  String QUEUE_NAME = "ptp-listener-demo";//队列名

  2. 生产者和消费者的消息确认方式都改成了客户端手动确认,不再自动确认,手动确认有个好处就是可以防止消息没有被正常消费而丢失,这个同RabbitMQ机制一样
    session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

  3. 生产者生产消息的时候,为了方便我改成了一次性发送10条
    /*** 6.创建消息并且生产者发送消息到目的地*/for(int num = 0; num < 10; num++){TextMessage textMessage = session.createTextMessage(message + num);producer.send(textMessage);System.out.println("消息发送成功"+textMessage.getText());}

  4. 关键点来了,在消费者上加了一个监听器
     /*** 注册监听器,队列中的消息变化会自动触发监听器,接收并自动处理消息** 监听器一旦注册,永久有效,一直到程序关闭* 监听器可以注册多个,相当于集群* activemq自动轮询多个监听器,实现并行处理*/consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {//需要手动确认消息
                            message.acknowledge();TextMessage om = (TextMessage) message;String data = om.getText();System.out.println(data);} catch (JMSException e) {e.printStackTrace();}}});

执行生产者:

执行消费者,消息全部被消费了:

再执行2遍生产者,消息同样都被消费了。

控制台页面多了个队列,由于监听中的消费者没有关闭,因此这里能看到消费者数量为1,我执行了三遍生产者,因此消息有30条。

还没完,继续...

我们这次先启动2个消费者,然后启动生产者

两个生产者分别消费了消息0,2,4,6,8和1,3,5,7,9

也就是说两个消费者都监听到了消息,并且activemq自动轮询两个监听器发送消息。

好,到这里,ActiveMQ的点对点模式就介绍完了。下一篇介绍发布订阅模式,敬请期待

转载于:https://www.cnblogs.com/sam-uncle/p/10988930.html

ActiveMQ入门系列二:入门代码实例(点对点模式)相关推荐

  1. Quantopian 入门系列二 - 流水线 (下)

    本文含 8225 字,28 图表截屏 建议阅读 42 分钟 本贴接着上贴[Quantopian 入门系列二 - 流水线 (上)]的内容,讨论下面目录的 5- 8 节: 简介 因子 筛选器 分类器 掩码 ...

  2. 机器学习入门系列二(关键词:多变量(非)线性回归,批处理,特征缩放,正规方程

    机器学习入门系列二(关键词:多变量(非)线性回归,批处理,特征缩放,正规方程) 目录(?)[+] 一多变量的线性回归 二批处理 三特征缩放 四正规方程 五多变量非线性回归 一.多变量的线性回归 在#机 ...

  3. C语言速看,C语言高速入门系列(二)

    C语言高速入门系列(二) -----转载请注明出处coder-pig 本节引言: 在前面一节中我们对C语言进行了初步的了解,学会了使用IDE进行代码的编写,编译执行! 在这一节中我们会对C语言的基本的 ...

  4. Reflex WMS入门系列二十五:将叉车纳入系统进行管理

    Reflex WMS入门系列二十五:将叉车纳入系统进行管理 据笔者所知,SAP WM 模块里是不对仓库里常用的叉车等仓库管理工具进行管理的.笔者发现,Reflex WMS系统则会在很多仓库部门日常操作 ...

  5. Reflex WMS入门系列二十二:物料库存报表

    Reflex WMS入门系列二十二:物料库存报表 在Reflex WMS系统上,我们可以通过物料号查询它的HD列表,或者IPG列表.通过在其HD/IPG信息得知其库存数据.当然还可以通过如下方式直接获 ...

  6. 小猪的C语言快速入门系列(二)

    小猪的C语言快速入门系列(二) 标签: C语言 本节引言 在上一节中,对于C语言有了一个初步的了解,学会了如何使用IDE来进行 代码编写,编译和运行.而这一节,我们会对C语言的基本语法进行学习, C语 ...

  7. Reflex WMS入门系列二十三:几个库存相关的报表

    Reflex WMS入门系列二十三:几个库存相关的报表 Reflex WMS系统作为一个主流的仓库管理软件系统,自然需要对仓库里的库存有多个角度的报表功能.比如常见的slow-moving, agin ...

  8. Reflex WMS入门系列二十六:合并托盘

    Reflex WMS入门系列二十六:合并托盘 仓库管理业务实践中,对于仓库里的库存,将几个零托合并成一个托,也是比较常见的作业.Reflex WMS系统自然要能支持这种合并托盘(Merge HDs)的 ...

  9. Reflex WMS入门系列二十八:空白标签打印

    Reflex WMS入门系列二十八:空白标签打印 贴在托盘上的标签,因托盘上的货物的移动,使用等缘故可能会导致标签丢失.在很多场景下又需要扫描托盘标签,所以Reflex WMS系统提供了打印空白标签的 ...

最新文章

  1. 【redis】redis持久化
  2. SVM中的线性分类器
  3. SwiftUI之从前端视角看SwiftUI语言
  4. 安装nvm管理多版本nodejs
  5. java注解字段类型相同_《java基础学习之——重复注解》
  6. ubuntu ifconfig_Ubuntu 设置固定 IP 最简单的方法!
  7. ASP.NET学习笔记之操作过滤器
  8. java程序怎么都不是一个_java运行的流程-怎么运行java程序编了一个程序不知道怎么运行郁闷啊后缀文件名是 爱问知识人...
  9. 【图的有向路径检查】程序员面试金典——4.2有向路径检查
  10. python实战项目
  11. 微信自动发消息机器人实现方法
  12. cJSON遍历Json数据的key
  13. matlab斯奈尔定律,斯奈尔定律和Zoeppritz方程
  14. Verilog乘法的实现——Xilinx Multiplier IP研究(1)
  15. Python基础入门第二课--Python编辑器的选择
  16. 小飞鱼平台介绍——小飞鱼开发者服务平台业务介绍
  17. 青少年计算机编程经验,青少年学习计算机编程前景
  18. 1.01.21盒子模型,浮动,定位
  19. 如何使用分布是缓存Hazelcast
  20. [SMOJ2095]Bug2

热门文章

  1. 欧盟数据保护新规则 75%云应用没准备好
  2. javascript 给关键字加链接
  3. UITableViewCell 取消选中的蓝色背景
  4. 安装Exchange2003时出0XC1037AE6错误的解决方法.
  5. linux脚本调试-bashdb安装及调试
  6. 分布式大数据sql查询引擎Presto初识
  7. 机器学习知识点(二十一)特征选择之岭回归和LASSO
  8. 索引语法——创建索引 || 查看索引 || 删除索引 || ALTER命令 索引设计原则
  9. Python 爬虫进阶篇-利用beautifulsoup库爬取网页文章内容实战演示
  10. CTFshow 文件包含 web88