最近的项目中用到了mq,之前自己一直在码农一样的照葫芦画瓢。最近几天研究了下,把自己所有看下来的文档和了解总结一下。

一. 认识JMS

1.概述

对于JMS,百度百科,是这样介绍的:JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

简短来说,JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(Java Database Connectivity),提供了应用程序之间异步通信的功能。

JMS1.0是jsr 194里规定的规范(关于jsr规范,请点击)。目前最新的规范是JSR 343,JMS2.0。

好了,说了这么多,其实只是在说,JMS只是sun公司为了统一厂商的接口规范,而定义出的一组api接口。

2. JMS体系结构

描述如下:

  • JMS提供者(JMS的实现者,比如activemq jbossmq等)
  • JMS客户(使用提供者发送消息的程序或对象,例如在12306中,负责发送一条购票消息到处理队列中,用来解决购票高峰问题,那么,发送消息到队列的程序和从队列获取消息的程序都叫做客户)
  • JMS生产者,JMS消费者(生产者及负责创建并发送消息的客户,消费者是负责接收并处理消息的客户)
  • JMS消息(在JMS客户之间传递数据的对象)
  • JMS队列(一个容纳那些被发送的等待阅读的消息的区域)
  • JMS主题(一种支持发送消息给多个订阅者的机制)

3. JMS对象模型

  • 连接工厂(connectionfactory)客户端使用JNDI查找连接工厂,然后利用连接工厂创建一个JMS连接。
  • JMS连接 表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。
  • JMS会话 session 标识JMS客户端和服务端的会话状态。会话建立在JMS连接上,标识客户与服务器之间的一个会话进程。
  • JMS目的 Destinatio 又称为消息队列,是实际的消息源
  • 生产者和消费者
  • 消息类型,分为队列类型(优先先进先出)以及订阅类型

二. ActiveMQ

1. ActiveMQ的安装

  1. 从官网下载安装包,http://activemq.apache.org/download.html
  2. 赋予运行权限 chmod +x,windows可以忽略此步
  3. 运行 ./active start | stop

启动后,activeMQ会占用两个端口,一个是负责接收发送消息的tcp端口:61616,一个是基于web负责用户界面化管理的端口:8161。这两个端口可以在conf下面的xml中找到。http服务器使用了jettry。
这里有个问题是启动mq后,很长时间管理界面才可以显示出来。

2. 用Java访问ActiveMQ

先附上Bean代码:

public class MqBean implements Serializable{ private Integer age; private String name; public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } }

2.1 队列消息的发送:

public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection; Session session; Destination destination; MessageProducer producer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { connection = connectionFactory.createConnection(); connection.start(); //第一个参数是是否是事务型消息,设置为true,第二个参数无效 //第二个参数是 //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。异常也会确认消息,应该是在执行之前确认的 //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。可以在失败的 //时候不确认消息,不确认的话不会移出队列,一直存在,下次启动继续接受。接收消息的连接不断开,其他的消费者也不会接受(正常情况下队列模式不存在其他消费者) //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。 //待测试 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); destination = session.createQueue("test-queue"); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //优先级不能影响先进先出。。。那这个用处究竟是什么呢呢呢呢 MqBean bean = new MqBean(); bean.setAge(13); for(int i=0;i<100;i++){ bean.setName("小黄"+i); producer.send(session.createObjectMessage(bean)); } producer.close(); System.out.println("呵呵"); } catch (JMSException e) { e.printStackTrace(); } }

注:在上面的代码中,确认模式有三种,里面的DUPS_OK_ACKNOWLEDGE和AUTO_ACKNOWLEDGE一直没明白有什么区别。因为无法测试。不过大概也明白了一些。其实主要是MQ处理消息的流程决定的:

  1. 消息从生成方客户端传送到消息服务器。
  2. 消息服务器读取消息。
  3. 消息被放置到持久性存储器当中(出于可靠性的考虑)。
  4. 消息服务器确认收到消息(出于可靠性的考虑)。
  5. 消息服务器确定消息的路由。
  6. 消息服务器写出消息。
  7. 消息从消息服务器传送到使用方客户端。
  8. 使用方客户端确认收到消息(出于可靠性的考虑)。
  9. 消息服务器处理客户端确认(出于可靠性的考虑)。
  10. 消息服务器确定已经处理客户端确认。

这些步骤是连续的,所以任何步骤都可能成为消息从生成方客户端到使用方客户端的传送过程的瓶颈。这些步骤中的大多数都取决于消息传送系统的物理特征:网络带宽、计算机处理速度和消息服务器体系结构等等。但是,有一些步骤还取决于消息传送应用程序的特征和该应用程序要求的可靠性级别。
其实就是基于可靠性还是性能的选择.

2.2 队列消息的接收:

public static void main(String[] args) { ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 //这个最好还是有事务 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("test-queue"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { MqBean bean = (MqBean) ((ObjectMessage)message).getObject(); System.out.println(bean); if (null != message) { System.out.println("收到消息" + bean.getName()); } } catch (Exception e) { // TODO: handle exception } } }); } catch (Exception e) { e.printStackTrace(); } }

注:对于队列来说,比较简单的优化策略,应该就是队列分载了。由于每个消费者都是单线程的,所以可以设置多个消费者来提高速度。
大家可以复制个消费者自己测试下,在消费者中添加sleep测试下效果。

2.3 订阅消息的发送

public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection; Session session; Destination destination; MessageProducer producer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); destination = session.createTopic("test-topic"); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //优先级不能影响先进先出。。。那这个用处究竟是什么呢呢呢呢 MqBean bean = new MqBean(); bean.setAge(13); for(int i=0;i<100;i++){ Thread.sleep(1000); bean.setName("小黄"+i); producer.send(session.createObjectMessage(bean)); } producer.close(); System.out.println("呵呵"); } catch (Exception e) { e.printStackTrace(); } }

2.4 订阅消息的接收

public static void main(String[] args) { ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 //这个最好还是有事务 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("test-queue"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { MqBean bean = (MqBean) ((ObjectMessage)message).getObject(); System.out.println(bean); if (null != message) { System.out.println("收到消息" + bean.getName()); } } catch (Exception e) { // TODO: handle exception } } }); } catch (Exception e) { e.printStackTrace(); } }

以上的消息发送后,如果没有接收到,可以登录自己的MQ管理页面:http://192.168.3.159:8161/admin/ ,默认帐号密码都是admin,查看队列中的消息

Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数
Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减
Messages Dequeued 出了队列的消息 可以理解为是消费这消费掉的数量

JMS(Java消息服务)与消息队列ActiveMQ基本使用(一)相关推荐

  1. java jms 消息服务_Java消息服务JMS详解

    Java消息服务(JMS Java Message Services)提供了点对点模式(Point-to-Point Queue)和发布-订阅模式(Publish-Subscribe Topics). ...

  2. .Net Core with 微服务 - 可靠消息最终一致性分布式事务

    前面我们讲了分布式事务的2PC.3PCTCC 的原理.这些事务其实都在尽力的模拟数据库的事务,我们可以简单的认为他们是一个同步行的事务.特别是 2PC,3PC 他们完全利用数据库的事务能力,在一阶段开 ...

  3. JMS(Java消息服务)(Activemq简单介绍)

    是什么? JMS(java消息服务)是规范,它定义了一些规则,一些接口.具体实现由各种做这个产品的厂家或开源组织来实现. 为什么? 在JMS还没有诞生前,每个企业都会有自己的一套内部消息系统,比如项目 ...

  4. ActiveMQ学习总结(5)——Java消息服务JMS详解

    JMS: Java消息服务(Java Message Service) JMS是用于访问企业消息系统的开发商中立的API.企业消息系统可以协助应用软件通过网络进行消息交互. JMS的编程过程很简单,概 ...

  5. JMS(Java消息服务)入门教程

    什么是Java消息服务 Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建.发送.读取消息等,用于支持JAVA应用程序开发.在J2EE中 ...

  6. JMS (Java消息服务) 入门教程

    转载自:https://www.cnblogs.com/chenpi/p/5559349.html 什么是Java消息服务 Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准消息协议 ...

  7. JAVA消息服务JMS规范及原理详解

    一.简介 JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进 ...

  8. java 消息队列服务_ActiveMQ 消息队列服务

    1 ActiveMQ简介 1.1 ActiveMQ是什么 ActiveMQ是一个消息队列应用服务器(推送服务器).支持JMS规范. 1.1.1 JMS概述 全称:Java Message Servic ...

  9. java jms activemq_JMS-ActiveMQ与Java消息服务

    JMS:Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间或分布式系统中发送消息,进行异步通. ...

最新文章

  1. Cesium学习笔记(九):导入3D模型(obj转gltf)
  2. Linux C编程--进程间通信(IPC)5--System V IPC 机制1--消息队列
  3. django 学习笔记
  4. java nio的使用
  5. java导出excel并压缩
  6. 安卓学习笔记---Activity
  7. linux安装g++编译器_Ubuntu Desktop下配置Rosetta安装教程
  8. 视频编辑利器,不喜欢就框除!开源视频物体移除软件video object removal
  9. 《诛仙Ⅰ》票房破3亿 QQ阅读《诛仙》小说全平台收入增长11.7倍
  10. python自定义一个可迭代类_python进阶之自定义可迭代的类
  11. awr报告 解读_AWR报告分析解读
  12. 网站里的MG电子是一个服务器吗,魔兽世界中的品质生活,从MG电子游艺开始!...
  13. 多核cpu的特殊中断
  14. 名称不存在或不是目录_大数据从入门到深入:LINUX 04 文档目录管理
  15. datatable自定义搜索和导出按钮并解决在后端分页无法导出全部数据的问题
  16. R语言处理数据——画图时加大标题
  17. 新sniffer pro 4.75 sp5下载
  18. Lightroom Classic CC 2019 for Mac永久破解激活方法(含lr cc 2019破解补丁)
  19. C语言基础-#include<stdio.h>
  20. html设置手写输入,手写登陆页面

热门文章

  1. 拉普拉斯时域卷积定理_如何证明频域卷积定理
  2. php5.0相等,关于php:3个相等
  3. java6个人抽奖抽三个人,基于Java的抽奖逻辑
  4. java是值传递还是引用传递_Java 到底是值传递还是引用传递?
  5. 抓包红色_抓包三部曲 WebSocket 协议原理抓包分析
  6. php hmac sha256签名,HMAC-SHA256签名错误?
  7. vfp程序转换为c语言程序软件,c语言程序设计及vfp程序设计试题.doc
  8. mysql中ak替换键_数据库:唯一性约束_alternate key(替换键) mySQL Oracle 数据库 ak 唯一性约束...
  9. java dos窗口小工具下载,maxdos 9 3-maxdos工具箱 v9.3 官方版
  10. HDFS学习 Java连接hadoop