HornetQ之JMS2.0 (实例讲解)
前言:
在2013年4月终于迎来了新的JMS规范-JMS2.0,这是第一次对JMS规范进行更新从2002年发布的JMS1.1版本.我们也许会认为JMS这么久以来从来没更新是否是因为已经停止发展或者被废弃不用. 但是,如果你从另外一个叫角度来分析, JMS这个规范存在很多不同的实现版本来看, 就充分说明JMS是一个非常成功的API 规范。
在JMS2.0规范中,主要包括两方面的重大改进。其一是:更方便的使用API, 再则是:引入了许多新的消息特性。JMS2.0是JAVAEE7平台的一部分,它不但可以被用于到JAVAEE Web或者EJB应用程序重也可以被用于J2SE环境中.
接下来我们就来看看新特性和更方便的API.
简单API:
为了下面的讲解做准备,首先创建一个queue和一个topic在hornetq-jms.xml中
<queue name="test1"> <entry name="queue/test1"/> </queue> <topic name="test2"> <entry name="topic/test2"/> </topic >
合并Connection和session
在老的API中要生产/消费一个消息必须经过这么几步,获取connectionFactory->获取queue/topic->创建Connection->创建session->生产/消费消息. 在JMS2.0中提供了更简单的API,将创建connection和创建session合并成了一个对象JMSContext,对应的MessageProducer/MessageConsume分别用JMSProcedure/JMSConsumer替代.对参数封装上也相对于JMS1.1更易于理解,在老版本中JMS的事务和ACK模式都是在创建session时显示声明的, 并且两个参数很容易误导开发者:Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);第一个参数代表是启用事务,第二个参数代表ACK模式。如果启用事务第二参数将被忽略但是第二个参数又是必须的,这点就容易误导后来的程序维护工程师.在JMS2.0API中对这个特性进行了整改通过builder模式。
Context cnx = new InitialContext(); ConnectionFactory cf = (ConnectionFactory) cnx.lookup("/ConnectionFactory"); Queue queue = (Queue) cnx.lookup("/queue/test1"); try(JMSContext jmsContext = cf.createContext(JMSContext.AUTO_ACKNOWLEDGE);) { jmsContext.createProducer() .setDeliveryDelay(1000) .setDeliveryMode(DeliveryMode.NON_PERSISTENT) .send(queue, "content1"); System.out.println("Continue.............."); } catch (JMSRuntimeException jmse) { jmse.printStackTrace(); }
直接在创建JMSContext的时声明事务模式和ACK模式,两者选其一。
public static final int AUTO_ACKNOWLEDGE = 1; public static final int CLIENT_ACKNOWLEDGE = 2; public static final int DUPS_OK_ACKNOWLEDGE = 3; public static final int SESSION_TRANSACTED = 0;
利用JDK7的closeable接口实现自动关闭,观察上面例子我们并没有类似于JMS1.1API那样去关闭Connection关闭session,什么时候关闭的呢?JMSContext利用了JDK1.7的新特性它继承至AutoCloseable接口,当块代码结束的时候自动调用对象的close()方法进行连接关闭.
更方便进行异步消费消息,在JMS1.1中需要手动调用connection.start()方法去开启消费进程。在新的API中默认自动就启动了, 不需要显示的启动。
Context cnx; JMSContext jmsContext =null; try { cnx = new InitialContext(); ConnectionFactory cf = (ConnectionFactory) cnx .lookup("/ConnectionFactory"); Queue queue = (Queue) cnx.lookup("/queue/test1"); jmsContext = cf.createContext(); // jmsContext.setAutoStart(true); JMSConsumer jmsConsumer = jmsContext.createConsumer(queue); jmsConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { System.out.println(Message.class.getName()); } }); CountDownLatch latch = new CountDownLatch(1); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } catch (NamingException e) { e.printStackTrace(); } catch (JMSRuntimeException jmse) { jmse.printStackTrace(); }finally{ if(jmsContext!=null){ jmsContext.close(); } }
消息payload自动解析通过泛型,在老版本中需要进行手动的类型转换来获取到真实的消息内容.第一个步骤需要将Message转化为对应的TextMessage/ByteMessage/MapMessage/StreamMessage/ObjectMessage. 在新的API中除了StreamMessage不能自动检索外, 其他类型的message都可以通过方便的API直接获取到消息内容不必经过多次的强制类型转化。
Context cnx = null; ConnectionFactory cf = null; Queue queue = null; try { cnx = new InitialContext(); cf = (ConnectionFactory) cnx.lookup("/ConnectionFactory"); queue = (Queue) cnx.lookup("/queue/test1"); } catch (Exception e) { } try (JMSContext jmsContext = cf .createContext(JMSContext.AUTO_ACKNOWLEDGE);) { // Delay 3 seconds to devlivery .setDeliveryDelay(3000) jmsContext.createProducer() .setDeliveryDelay(DeliveryMode.PERSISTENT) .send(queue, "content@").send(queue, "content@1") .send(queue, "content@2"); System.out.println("Start receive.............."); JMSConsumer jmsConsumer = jmsContext.createConsumer(queue); String msgBody = jmsConsumer.receiveBody(String.class); System.out.println(msgBody); Message msg1 = jmsConsumer.receive(); System.out.println(msg1.getBody(String.class)); Message msg2 = jmsConsumer.receive(); System.out.println(msg2.getBody(String.class)); } catch (JMSRuntimeException jmse) { jmse.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); }
新特性:
允许多个订阅者在同一个Topic Subscription,想象现在消息的吞吐量很大, 在老的版本中我们一个消息只能绑定到一个订阅者。但是消息两很大希望能做到负载均衡类似于Apache。JMS2.0提供这个功能,支持在多个虚拟机中共享一个消息。新建四个订阅者在四个(两组)不同的VM中.
第一组:
Client1-S1:
Context cnx; try { cnx = new InitialContext(); final ConnectionFactory cf = (ConnectionFactory) cnx .lookup("/ConnectionFactory"); final Topic topic = (Topic) cnx.lookup("/topic/test2"); // Invalid concurrent session usage. Sessions are not supposed to be // used by more than one thread concurrently. new Thread() { public void run() { JMSContext jmsContext = cf.createContext(); JMSConsumer jmsConsumer = jmsContext.createSharedConsumer( topic, "S1"); while (true) { Message msg = jmsConsumer.receive(); System.out.println("Client1-S1:"+Thread.currentThread().getId() + ":" + msg); } }; }.start(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSRuntimeException jmse) { jmse.printStackTrace(); }
Client2-S1:
Context cnx; try { cnx = new InitialContext(); final ConnectionFactory cf = (ConnectionFactory) cnx .lookup("/ConnectionFactory"); final Topic topic = (Topic) cnx.lookup("/topic/test2"); // Invalid concurrent session usage. Sessions are not supposed to be // used by more than one thread concurrently. new Thread() { public void run() { JMSContext jmsContext = cf.createContext("guest", "guest"); JMSConsumer jmsConsumer = jmsContext.createSharedConsumer( topic, "S1"); while (true) { Message msg = jmsConsumer.receive(); System.out.println("Client2-S1:"+Thread.currentThread().getId() + ":" + msg); } }; }.start(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSRuntimeException jmse) { jmse.printStackTrace(); }
第二组:
Client3-S2:
Context cnx; try { cnx = new InitialContext(); final ConnectionFactory cf = (ConnectionFactory) cnx .lookup("/ConnectionFactory"); final Topic topic = (Topic) cnx.lookup("/topic/test2"); // Invalid concurrent session usage. Sessions are not supposed to be // used by more than one thread concurrently. new Thread() { public void run() { JMSContext jmsContext = cf.createContext("guest", "guest"); JMSConsumer jmsConsumer = jmsContext.createSharedConsumer( topic, "S2"); while (true) { Message msg = jmsConsumer.receive(); System.out.println("Client3-S2:"+Thread.currentThread().getId() + ":" + msg); } }; }.start(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSRuntimeException jmse) { jmse.printStackTrace(); }
Client4-S2:
Context cnx; try { cnx = new InitialContext(); final ConnectionFactory cf = (ConnectionFactory) cnx .lookup("/ConnectionFactory"); final Topic topic = (Topic) cnx.lookup("/topic/test2"); // Invalid concurrent session usage. Sessions are not supposed to be // used by more than one thread concurrently. new Thread() { public void run() { JMSContext jmsContext = cf.createContext(); JMSConsumer jmsConsumer = jmsContext.createSharedConsumer( topic, "S2"); while (true) { Message msg = jmsConsumer.receive(); System.out.println("Client4-S2:"+Thread.currentThread().getId() + ":" + msg); } }; }.start(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSRuntimeException jmse) { jmse.printStackTrace(); }
分两次发送消息到同一个topic观察输出结果在四个VM控制台.
第一次发送消息:
Client4-S2:12:HornetQMessage[ID:5f856d00-bff1-11e3-86fb-af8e1cdeda67]:PERSISTENT
Client1-S1:12:HornetQMessage[ID:4b696a47-bff1-11e3-8b74-d32c9a2e8eec]:PERSISTENT
第二次发送消息:
Client3-S2:12:HornetQMessage[ID:4b696a47-bff1-11e3-8b74-d32c9a2e8eec]:PERSISTENT
Client2-S1:12:HornetQMessage[ID:5f856d00-bff1-11e3-86fb-af8e1cdeda67]:PERSISTENT延迟递送消息,假象有这么一种业务模型,当前有两个数据库环境生产库PROD和一个数据仓库DWS,PROD数据会实时的复制到数据仓库。 现在外部程序插入一条数据进入PROD 并且同时生成一个消息到消息服务器,消息服务器的消费者会立刻消费这个消息并且和DWS数据进行整合,由于数据库的复制可能会因为CPU或者LOCK等原因有一定的也延时,此时就可能数据还没有到DWS数据库此时消费者就不能完成数据整合操作。一种方案就是重复消费, 这样就有一点开销对于消费者来说。 我们希望一次就能够保证消费成功。那么我们就可以受用延迟递送消息到消费者。假设数据复制到DWS最大延时是1h,我们可以设置消息的延迟递送时间为1h就能够保证消费者只需要一次消费消息就可以达到数据整合目的。 设置延迟递送类似于JMS1.1也非常简单.
Context cnx = new InitialContext(); ConnectionFactory cf = (ConnectionFactory) cnx.lookup("/ConnectionFactory"); Queue queue = (Queue) cnx.lookup("/queue/test1"); try(JMSContext jmsContext = cf.createContext(JMSContext.AUTO_ACKNOWLEDGE);) { jmsContext.createProducer() .setDeliveryDelay(10000) .setDeliveryMode(DeliveryMode.NON_PERSISTENT) .send(queue, "content1"); System.out.println("Continue.............."); } catch (JMSRuntimeException jmse) { jmse.printStackTrace(); }
经过笔者测试:Hornetq延迟递送特性,必须是消费者和生产者在不同的虚拟机。实际情况也会这么用。
异步发送消息.类似于异步消费,同样提供更为简便的API进行异步发送。 HornetQ似乎对这点支持不太好,有异步发送的效果。
private void asyncSendSimplified(ConnectionFactory connectionFactory,Queue queue) throws Exception { // send a message asynchronously try (JMSContext context = connectionFactory.createContext();){ CountDownLatch latch = new CountDownLatch(1); MyCompletionListener myCompletionListener = new MyCompletionListener(latch); context.createProducer().setAsync(myCompletionListener).send(queue,"Hello world"); System.out.println("Message sent, now waiting for reply"); // at this point we can do something else before waiting for the reply // this is not shown here latch.await(); if (myCompletionListener.getException()==null){ System.out.println("Reply received from server"); } else { throw myCompletionListener.getException(); } } }
JMSXDeliveryCount,这个是Message的一个属性,实际上在Jms1.1已经存在了,标示这个消息被重复递送的次数.但是这个在JMS1.1是可选的并不是所有的供应商都支持这个属性,但是在JMS2.0这个属性被强制的设置了。 我们可以通过这个属性来判断消息被重复递送了多少次来决定是否丢弃这个消息或者做进一步处理,或者交给DeadLetter.
try { System.out.println(message.getIntProperty("JMSXDeliveryCount")); } catch (JMSException e) { }
参考博客:
http://www.oracle.com/technetwork/articles/java/jms2messaging-1954190.html
http://www.oracle.com/technetwork/articles/java/jms20-1947669.html
转载于:https://blog.51cto.com/ganludong/1393076
HornetQ之JMS2.0 (实例讲解)相关推荐
- tensorfow2.0实例讲解1-衣服分类
更新时间:2020-10-6 # import lib import tensorflow as tf from tensorflow import keras from tensorflow.ker ...
- php yii应用运维,Windows运维之Windows下用cmd命令实例讲解yii2.0在php命令行中运行的步骤...
本文主要向大家介绍了Windows运维之Windows下用cmd命令实例讲解yii2.0在php命令行中运行的步骤,通过具体的内容向大家展现,希望对大家学习Windows运维有所帮助. Yii中的资源 ...
- java实现页面高效刷新_selenium高效应对Web页面元素刷新的实例讲解
当我们在页面上进行selenium.type()或者selenium.click()操作的时候,往往需要需要等待一个元素的出现,对于一般的网页,当我们进入一个新页面的时候,往往会使用selenium. ...
- 手摸手教你数据可视化!(附实例讲解)
↑↑↑关注后"星标"Datawhale 每日干货 & 每月组队学习,不错过 Datawhale干货 作者:CrescentAI,华南理工大学,Datawhale优秀学习者 ...
- 【Python基础】手把手教你数据可视化!(附实例讲解)
点击上方"小白学视觉",选择加"星标"或"置顶" 重磅干货,第一时间送达 作者:CrescentAI,华南理工大学,Datawhale优秀学 ...
- php脚本函数,PHP执行系统命令函数实例讲解
命令注入 命令注入(Command Injection),对一些函数的参数没有做过滤或过滤不严导致的,可以执行系统或者应用指令(CMD命令或者 bash 命令)的一种注入攻击手段. 常见的执行系统命令 ...
- 实例讲解Linux系统中硬链接与软链接的创建
导读 Linux链接分两种,一种被称为硬链接(Hard Link),另一种被称为符号链接(Symbolic Link).默认情况下,ln命令产生硬链接.硬链接与软链接的区别从根本上要从Inode节点说 ...
- python 安卓模拟点击_python模拟点击在ios中实现的实例讲解
我们都知道因为操作系统的不同,很多游戏区分为安卓和苹果两个版本.那么之前学会python模拟点击的小伙伴开始担心,如果手机是ios版本那还能使用吗?这个问题小编进行了测试,小伙伴们完全不用忧虑ios版 ...
- 继承redis spring_实例讲解Springboot以Repository方式整合Redis
1 简介 Redis是高性能的NoSQL数据库,经常作为缓存流行于各大互联网架构中.本文将介绍如何在Springboot中整合Spring Data Redis,使用Repository的方式操作. ...
最新文章
- 首页列表显示全部问答,完成问答详情页布局。
- MyBatis—insert语句返回主键和selectKey标签
- JAVA基础——Switch条件语句
- pandas中dropna函数_快速解释如何使用pandas的inplace参数
- 数据分析和作图用python好还是R!python真有那么神吗?
- OnPaint()函数的作用原理
- ValueError: Found array with dim 4. Estimator expected和ValueError: Expected 2D array, got 1D array i
- (二)C语言数据类型(2)
- 智利可以使用支付宝嘛?_智利的水电和输电规划使用开源地理空间工具
- Java:月份的中英文转换
- 程序员真实写真:35岁前成功的12条黄金法则 (转)
- 在python语言中、写文件的操作是_Python语言之详解文件操作
- git常用使用命令个人总结
- gsp计算机设施设备表格,GSP计算机系统内审表.doc
- jQuery实现动态添加删除表格的行
- 【Love2d从青铜到王者】第十篇Love2d之类和类的继承(Classes And Inheritance)
- KFC肯德基带给孩子的危害(转)
- 官方示例(十):网页开发3D粒子系统实现降雨效果 ThingJS
- Spring/Boot/Cloud系列知识(2)— — 代理模式
- c语言 取余 % 和除法 / 的应用技巧 (在取位数方面的)