activemq高级客户端选项
2019独角兽企业重金招聘Python工程师标准>>>
我们经常希望维持队列中的消息,按一定次序转发给消息者。然而当有多个JMS Session和消息消费者实例的从同一个队列中获取消息的时候,就不能保证消息顺序处理。因为消息被多个不同线程并发处理着。
在ActiveMQ4.x中可以采用Exclusive Consumer或者Exclusive Queues,避免这种情况,Broker会从消息队列中,一次发送消息给一个消息消费者来保证顺序。
A. 当在接收信息的时候有一个或者多个备份接收消息者和一个独占消息者的同时接收时候,无论两者创建先后,在接收的时候,均为独占消息者接收。
B. 当在接收信息的时候,有多个独占消费者的时候,只有一个独占消费者可以接收到消息。
C. 当有多个备份消息者和多个独占消费者的时候,当所有的独占消费者均close的时候,只有一个备份消费者接到到消息。
备注:备份消费者为不带任何参数的消费者。
12.1.1选择一个独占的message consumer
对于应用来说,那些重要的order ,或者,你需要确保这里仅仅只有一个message consumer对于queen,activemq提供了一个客户端选项来确保只有一个active message consumer来处理message
activemq meaasge broker也会在queen上选择一个consumer来处理消息,这样的好处就是允许broker来选择,即使consumer失败或者停止了,然后另外一个message consumer能够被选择成为
active的
如果你混合了标准consumer和exclusive consumer在同一个queen上 ,the activemq将会仅仅选择exclusive的其中一个consumer,,如果所有的exclusive consumer都变为inactive那么就会选择
标准的consumer,然后queen的消费将会变为正常的传输模式,
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
12.1.2利用exclusive consumer来提供分布式锁的功能
通常你用message从外部资源来广播数据,如果你想构建一个冗余的, 即使你有一个实例阅读和广播changedate失败了【改变数据库记录,在文件里面的内容用逗号分隔】,另一个实例都将要接管,通常你依靠锁住资源【行锁或者文件锁】来确保
仅仅只有一个程序能够acess data并且广播over topic ,但是当你不想利用数据库,或者想要运行一个程序跨越一个机器(不能用分布式锁),然后你就只能用独占consumer来创建一个分布式锁
为了能够使用独占consumer来创建分布式锁,我们需要我没得producer订阅独占的queen, 如果message producer接收到queen,他就便激活了, 并且能够 订阅实时的feed和把实时数据变为jms message
this.connection = this.factory.createConnection();
this.connection.start();
this.session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = this.session.createQueue(this.queueName + "?consumer.exclusive=true");
Message message = this.session.createMessage();
MessageProducer producer = this.session.createProducer(destination);
producer.send(message);
MessageConsumer consumer = this.session.createConsumer(destination);
consumer.setMessageListener(this);
在这个代码片中,我们总是send a message到queen这一步总是被外部的管理程序执行的,注意到Session.CLIENT_ACKNOWLEDGE模式来消费这个消息,尽管我们想要被通知我们是独占的consumer, 因此我们有锁,我们不想要remove,我们
不想要remove这一条消息吗, 如果我们失败了, 我们的另一个独占producer将会active
正在这个列子中我们实现了MessageListener,如果我们没有active, 我们将要call一个功能性方法start producing ,如果我们是实时应用, 这个方法将要订阅一个实时的并且转换实时的data 进入jms message
public void onMessage(Message message) {
if (message != null && this.active==false) {
this.active=true;
startProducing();
}
}
12.2 message groups
全部的message 都将要转向单一的message consumer,message也能够分组来给予单一的consumer, 一个message producer也能指定一个group,通过指定message header JMSXGroupId,
ActiveMQ将要确保全部相同的JmsxGroupID的message发送给相同的consumer
如果Activemq broker制定了consumer接受消息通过JmsxGroupID,那么他就应该close掉,然后activemq broker将要选择一个不同的message consumer来dispatch给不同的message
为了创建一个group,你需要设置JmsGroupID string property在消息上
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("group.queue");
MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("<foo>test</foo>");
message.setStringProperty("JMSXGroupID", "TEST_GROUP_A");
producer.send(message);
这个列子显示了message producer已经被创建了, 并且设置好textmessage 属于message group TEST_GROUP_A
message group利用正常的message consumer,因此没有额外的工作需要group来消费message, 全部的工作都被message producer来定义一个group的消息属于什么, activemqbroker选择一个
message consumer来处理全部的分组消息
activemqbroker对于group里面的每一条消息都会添加一条sequeence no,[通过JMSXGroupSeq,从1开始]
但是从consumer视角来说,你不能假定你从一个新的group里收到的第一条JMSXGroupSeq设置1, 如果一个存在的group close掉或者死掉之后, 任何消息route到这个group里的都会分配给一个新的consumer
为了帮助识别一个消息的consumer 从一个新的group里收到消息,或者一个新的group从来没有被看见过, 一个boolean 参数叫做JMSXGroupFirstForConsumer被设置了对于第一个message, 你也能够核对是否
他是为第一条message设置的【对于新组】, 你也能够核对消息是否被
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("group.queue");
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive();
String groupId = message.getStringProperty("JMSXGroupId");
if (message.getBooleanProperty("JMSXGroupFirstForConsumer")) {
// do processing for new group
}
The Activemq message代理允许 分配各种各样的消息groups跨越多个consumer,但是如果这里早已经有message等着dispatch, the message group典型的分配给第一个consumer,为了确保一个基数de
的分布式负载均衡,他可能考虑message broker等着开启更多的messgae consumer , 为了这样做, 你不得不设置destination policy在active broker 配置里面,设置好consumersBeforeDispatchStarts参数
用
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="5000">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
这个配置告诉ActiveMq broker, 都应该等着两个consumer在dispatch之前, 另外我们也可以看到timeBeforeDispatchStarts参数5000ms来通知activemq broker如果两个message consumer在5s
内没有砸queen上得到消息,利用messgae group添加最小化的active broker 就每个消息group存储routing 信息而言。这是明晰的关掉message group通过发送message从activemq broker 的JMSXGroupID
设置为-1
Connection connection = new ActiveMQConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("group.queue");
MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("<foo>close</foo>");
message.setStringProperty("JMSXGroupID", "TEST_GROUP_A");
message.setIntProperty("JMSXGroupSeq", -1);
producer.send(message);
12.3activemqstream
Activemq stream是一个高级的特色,他允许使用activemq来作为Jave Io stream,activeMQ将要break 一个outputstream对于不同的data chunk并且send每一个chunk通过activemq作为jms message
一个相应的activemq jms inputstream应该用在consumer边重新结合data chunk
如果你用queen 作为streamd的destination,使用不止一个consumer 在queen上(或者一个独占的consumer)是很好的, 由于group的这个特色【用同样的groupid指向一个单一的consumer】,使用超过一个的producer可能会造成message排序order
的问题
利用jms的好处就是activemq 把breank stream 分为了管理的块【chunk】, 并且允许你在consumer端给合并, 因此这是允许你传输大文件用这个功能
为了证明这个用stream
//source of our large data
FileInputStream in = new FileInputStream("largetextfile.txt");
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
OutputStream out = connection.createOutputStream(destination);
//now write the file on to ActiveMQ
byte[] buffer = new byte[1024];
while(true){
int bytesRead = in.read(buffer);
if (bytesRead==-1){
break;
}
out.write(buffer,0,bytesRead);
}
out.close();
在下面的这个例子中我们创建了一个ActiveMQConnection并且创建了一个inputstream利用一个queen, 注意到我们利用一个独占的consumer通过apend"?consumer.exclusive=true";
我们确保仅仅一个consumer 能够阅读到一个queen,我们read InputStream并且通过FileOutputStream来重组file在硬盘上
你也能够使用topic, 尽管这个
//destination of our large data
FileOutputStream out = new FileOutputStream("copied.txt");
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//we want be be an exclusive consumer
String exclusiveQueueName= QUEUE_NAME + "?consumer.exclusive=true";
Queue destination = session.createQueue(exclusiveQueueName);
InputStream in = connection.createInputStream(destination);
//now write the file from ActiveMQ
byte[] buffer = new byte[1024];
while(true){
int bytesRead = in.read(buffer);
if (bytesRead==-1){
break;
}
out.write(buffer,0,bytesRead);
}
out.close();
}
12.4 Blob消息
activemq引进了blob来处理large message
自己处理中转
如果自己处理文件的话,一个简单方式是使用共享或ftp、dfs等方式,先把文件发送到一个大家都可以拿到的地方,然后发送message,payload或properties中包含文件的路径信息。这样,consumer拿到文件路径后去指定的地方,按照给定的方式去获取文件数据即可。
优势:这种方式可以用来处理大数据,并且不需要client或broker在内存中持有文件数据本身,非常的节省资源。而且文件是通过额外的方式处理,跟ActiveMQ本身无关,所以符合jms协议、处理的效率也相对比较高。
劣势:需要自己处理很多文件相关的操作。
BlobMessage对文件中转的封装
幸运的是,ActiveMQ把上面繁复的文件处理工作进行了封装,屏蔽掉文件中转的整个处理过程,使得我们可以使用类似jms规范的API来简单操作文件传输。
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(destination);
BlobMessage message = session.createBlobMessage(new URL("http://some.shared.site.com"));
producer.send(message);
consumer for blob::
FileOutputStream out = new FileOutputStream("blob.txt");
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
BlobMessage blobMessage = (BlobMessage) consumer.receive();
InputStream in = blobMessage.getInputStream();
// now write the file from ActiveMQ
byte[] buffer = new byte[1024];
while (true) {
int bytesRead = in.read(buffer);
if (bytesRead == -1) {
break;
}
out.write(buffer, 0, bytesRead);
}
out.close();
}
12.5网络存活 或者代理失败后的失效转移协议
failover:(tcp://host1:61616,tcp://host2:61616,ssl://host3:61616)
activemq将会随机的选择list中的其中一个用失效转移协议,如果仅仅只有一个uri那么客户端将会隔断时间查看是否broker available,你可以利用TransportListener来监听activemq的连接
public class ClientTransportListener implements TransportListener {
protected final Logger logger = LoggerFactory.getLogger(ClientTransportListener.class);
public void onCommand(Object o) {
logger.debug("onCommand检测到服务端命令:{}", o);
}
public void onException(IOException error) {
logger.error("onException,与服务器连接发生错误......");
}
public void transportInterupted() {
logger.error("transportInterupted,与服务器连接发生中断......");
IConnector connector = new Connector();
connector.reConnect();
}
public void transportResumed() {
logger.info("transportResumed,恢复与服务器连接....");
}
}
当你想要按照顺序来启动
failover:(tcp://host1:61616,tcp://host2:61616,ssl://host3:61616)?random=false
如果个了段时间还是连不上,the failover protocol将会增加一段总量来连接activemq broker,这个叫做指数退避算法Exponential Backoff默认的useExponentialBackoff是enable
参数 默认值 含义
initialReconnectDelay 10ms, 重连之前等待的时间(ms)
backOffMultiplier 1.5 增大等待时间的系数
maxReconnectDelay 30000 重连之前等待的最大时间(ms)
failover:(tcp://host1:61616,tcp://host2:61616,ssl://host3:61616)?backOffMultiplier=2,initialReconnectDelay=1000
在maxInactivityDuration时间里没有连接上话就是invalidate
failover:(tcp://host1:61616?wireformat.maxInactivityDuration=0,tcp://host2:61616,ssl://host3:61616?wireformat.maxInactivityDuration=0)
默认的话activemq传输是持久化的,如果你使用非持久化的方式传输的话,为了防止丢失你就要使用trackMessages=true
maxCachesize
backup=true,backupPoolSize=2
updateClusterClients
rebalanceClusterClients
updateClusterClientOnRemove
updateClusterFilter
12.6在future传输message
Property name type description
AMQ_SCHEDULED_DELAY long The time in milliseconds that a message will wait before being scheduled to
be delivered by the broker
AMQ_SCHEDULED_PERIOD long The time in milliseconds to wait after the start time to wait before scheduling
the message again
AMQ_SCHEDULED_REPEAT int The number of times to repeat scheduling a message for delivery
AMQ_SCHEDULED_CRON String Use a Cron entry to set the schedule
例如,有一个消息,原定在60秒-交付你需要设置amq_scheduled_delay
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long time = 60 * 1000;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
producer.send(message);
你可以设置一个消息,等待一个初始延迟,并重复传送10次,等待10秒之间的每一个重新交付
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long delay = 30 * 1000;
long period = 10 * 1000;
int repeat = 9;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
producer.send(message);
你也可以使用cron调度信息,例如,如果你想要一个消息如期交付的每一个小时,你就需要设置cron入口是0 * -例如
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
producer.send(message);
cron调度优先使用消息延迟,然而,如果一个重复周期设置一个cron入门,ActiveMQ调度器将安排每次cron进入火灾的消息传递。用一个例子来解释更容易。
假设你想要一个消息,10次,一一秒的延迟之间的每一个消息-你希望这个发生每小时-你会这样做:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);
producer.send(message)
转载于:https://my.oschina.net/goudingcheng/blog/615239
activemq高级客户端选项相关推荐
- ActiveMQ 的客户端选项
本章重点 怎么使用独占式消费者 消息分组的威力 理解流和二进制大对象 容错传输 计划消息分发 简介 上一章我们介绍了 ActiveMQ 的代理特性,本章我们将学习 ActiveMQ 客户端的一些高级特 ...
- Maven技巧和窍门:高级Reactor选项
从Maven 2.1发行版开始,提供了新的Maven命令行选项,使您可以操纵Maven构建多模块项目的方式.这些新选项是: -rf,--resume-from 从指定项目恢复反应堆 -pl,-项目 建 ...
- 解决Visual Studio 2017隐藏“高级保存选项”命令
解决Visual Studio 2017隐藏"高级保存选项"命令 Visual Studio提供高级保存选项功能,它能指定特定代码文件的编码规范和行尾所使用的换行符.在Visual ...
- ERPLAB中文教程:高级EvenList选项
目录 本分享为脑机学习者Rose整理发表于公众号:脑机接口社区 .QQ交流群:941473018 前面我们介绍了使用ERPLAB来分析脑电数据的第一步: 安装ERPLAB并添加通道<ERPLAB ...
- VS 2017显示“高级保存选项”命令操作方法
Visual Studio提供"高级保存选项"功能,它能指定特定代码文件的编码规范和行尾所使用的换行符.在Visual Studio 2017中,该命令默认是没有显示在" ...
- tmemo 选择消除行_Divi模块,行和部分加入高级动画选项
一切元素的动画选项每个Divi模块,行和部分都带有高级动画选项,你可以使用这些选项来吸引访问者并使页面更加耀眼.Divi引入一个全新的动画系统,并将这些高级动画选项扩展到每个Divi模块,行和部分!这 ...
- 背景图层和普通图层的区别_图层样式(一)—高级混合选项
一.图层顺序 为了便于说明,首先建立例子,新建图层,用画笔随便画个圈,新建蒙版随便画一笔,然后把所有图层样式加给它.可以看到样式从上到下的顺序,这也是它们混合的图层顺序. 图层顺序 我的效果,参数不同 ...
- win10单机修复计算机在哪,win10如何进入高级修复选项
进入WinRE(Windows恢复环境)后,点击"疑难解答"即可显示"恢复电脑"."初始化电脑"等Windows恢复选项,再点击" ...
- 计算机开机f8键,开机F8键“高级启动选项”的秘密
这次主要想跟大家探讨一下Windows系统下开机时按下F8键之后系统进行的一系列操作.了解了这些操作之后,相信大家就能对很多无法正常开机的原因"有所行动"了. 引用微软官方解释: ...
最新文章
- 2年内落地34款车型,路测里程可绕地球50圈,这家自动驾驶公司正在“玩命求生”...
- 专题 15 TCP套接字编程
- 【Linux抓包工具之tcpdump】
- 从零单排学Redis【铂金一】
- HTML5学习笔记简明版(9):变化的元素和属性
- SpringBoot 自动开启事务原理
- 汇编指令处理的数据长度
- C#枚举类型的常用操作总结
- 查看linux网络带宽
- log4j记录不同的日志_Spring boot中使用log4j记录日志
- 【渝粤教育】电大中专成本会计作业 题库
- 最新收藏:8个临时邮箱平台,24小时邮箱,10分钟邮箱 ,免费在线接收邮件非常不错,推荐给有需要的人!
- 特洛伊木马与计算机病毒有什么区别,特洛伊木马Vs病毒Vs蠕虫, 有什么区别?...
- Excel如何快速方便生成随机姓名
- 我买了个5g手机,但是手机卡是4g的,能使用吗?
- 安卓微信王者荣耀野区服务器,王者荣耀安卓微信136区运筹帷幄
- 关于7Z自解压文件拆分,读取条目,复写,合并的功能
- 编写程序练习直接,间接,相对,基址变址寻址
- 如何跨域调用微信图片
- 智慧灯杆基于边缘计算网关的单灯远程控制功能
热门文章
- 用init-connect+binlog实现用户操作追踪【转】
- WIN7上VM中的LINUX如何设置上网
- 常用24个方法有效优化ASP.NET的性能
- mysql 开启远程
- python list find_一篇文章带你了解Python爬虫常用选择器
- 下面哪个字段是http请求中必须具备的_HTTP协议及其工作原理介绍
- Xamarin中Unsupported major.minor version 52.0问题解决
- mycat是什么_MYCAT学习2
- JAVA实现从上往下打印二叉树(《剑指offer》)
- 计算机运行游戏慢怎么办,电脑运行太慢了太卡了怎么办,台式电脑运行速度慢的解决方法...