ActiveMQ 事务消息 手工签收
发送端/生产者(带有事务):
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Sender {public static void main(String[] args) throws Exception{//第一步:建立connectionFactory工厂对象【需填入用户名、密码、要连接的地址】ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("admin","1234","tcp://localhost:61616");//第二步:通过ConnectionFactory工厂对象创建一个Connection连接,并且调用Connection
的start方法开启连接【connection默认是关闭的】Connection connection = connectionFactory.createConnection();connection.start();//第三步:通过connection创建session会话(上下文环境对象),用于接收消息,参数配置1为是否启用事务,
参数配置2为签收模式【一般我们设置为自动签收】Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);//使用事务的方式进行消息的发送// Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//使用CLIENT端签收的方式//Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//第四步:通过session创建Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象,
在PTP模式中,Destination被称作Queue即队列;在Pub/Sub模式,Destination被称作Topic即主题.
在程序中可以使用多个Queue和Topic.Destination destination = session.createQueue("first");//第五步:通过session对象创建消息的发送和接收对象(生产者和消费者)MessageProducer/MessageConsumerMessageProducer messageProducer = session.createProducer(null);//第六步:可以使用MessageProducer的setDeliveryMode方法为其设置持久化特性
和非持久化特性(DeliveryMode)//messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//第七步:使用JMS规范的TextMessage形式创建数据(通过session对象),并用MessageProducer
的send方法发送数据for (int i = 1; i <= 10; i++) {//TextMessage textMessage = session.createTextMessage("helloworld"+i);TextMessage textMessage = session.createTextMessage("我是消息内容"+i);//第一个参数: 目的地//第二个参数: 消息//第三个参数: 是否持久化//第四个参数: 优先级【0-9 0-4表示普通 5-9表示加急 默认4】//第五个参数: 消息在mq上的存放有效期【单位毫秒】//messageProducer.send(destination, textMessage,
DeliveryMode.NON_PERSISTENT, i, 1000*60*2);messageProducer.send(destination, textMessage);//TimeUnit.SECONDS.sleep(1);System.out.println("生产者:"+textMessage.getText());}//提交数据session.commit();//session.rollback();if (connection!=null) {connection.close();}}
}
接收端/消费者:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Receiver { public static void main(String[] args) { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( "admin", "1234", "tcp:/localhost:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("first"); consumer = session.createConsumer(destination); while (true) { // 设置接收者接收消息的时间,为了便于测试,这里谁定为100s //TextMessage message = (TextMessage) consumer.receive(100000); TextMessage message = (TextMessage) consumer.receive(); if (null != message) { //System.out.println("收到消息" + message.getText()); System.out.println("消费数据:" + message.getText());} else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } }
}
生产者(没有事务手动签收):
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Sender {public static void main(String[] args) throws Exception{//第一步:建立connectionFactory工厂对象【需填入用户名、密码、要连接的地址】ActiveMQConnectionFactory connectionFactory
= new ActiveMQConnectionFactory("admin","1234","tcp://localhost:61616");//第二步:通过ConnectionFactory工厂对象创建一个Connection连接,并且调用Connection的start方法
开启连接【connection默认是关闭的】Connection connection = connectionFactory.createConnection();connection.start();//第三步:通过connection创建session会话(上下文环境对象),用于接收消息,参数配置1为是否启用事务,参数配置2为签收模式【一般我们设置为自动签收】Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);//使用事务的方式进行消息的发送// Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//使用CLIENT端签收的方式//Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//第四步:通过session创建Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象,在PTP模式中,Destination被称作Queue即队列;
在Pub/Sub模式,Destination被称作Topic即主题.在程序中可以使用多个Queue和Topic.Destination destination = session.createQueue("first");//第五步:通过session对象创建消息的发送和接收对象(生产者和消费者)MessageProducer/MessageConsumerMessageProducer messageProducer = session.createProducer(null);//第六步:可以使用MessageProducer的setDeliveryMode方法为其设置持久化特性和非持久化特性(DeliveryMode)//messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//第七步:使用JMS规范的TextMessage形式创建数据(通过session对象),并用MessageProducer的send方法发送数据for (int i = 1; i <= 20; i++) {//TextMessage textMessage = session.createTextMessage("helloworld"+i);TextMessage textMessage = session.createTextMessage("我是消息内容"+i);//第一个参数: 目的地//第二个参数: 消息//第三个参数: 是否持久化//第四个参数: 优先级【0-9 0-4表示普通 5-9表示加急 默认4】//第五个参数: 消息在mq上的存放有效期【单位毫秒】//messageProducer.send(destination, textMessage, DeliveryMode.NON_PERSISTENT, i, 1000*60*2);messageProducer.send(destination, textMessage);//TimeUnit.SECONDS.sleep(1);System.out.println("生产者:"+textMessage.getText());}//提交数据//session.commit();//session.rollback();if (connection!=null) {connection.close();}}
}
消费端(没有事务手动签收):
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Receiver { public static void main(String[] args) { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( "admin", "1234", "tcp://localhost:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("first"); consumer = session.createConsumer(destination); while (true) { // 设置接收者接收消息的时间,为了便于测试,这里谁定为100s //TextMessage message = (TextMessage) consumer.receive(100000); TextMessage message = (TextMessage) consumer.receive(); if (null != message) { //System.out.println("收到消息" + message.getText()); System.out.println("消费数据:" + message.getText());message.acknowledge();} else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } }
}
ActiveMQ 事务消息 手工签收相关推荐
- ActiveMQ的消息重发策略和DLQ处理
2019独角兽企业重金招聘Python工程师标准>>> ActiveMQ的消息重发策略和DLQ处理 博客分类: MQ 在以下三种情况中,ActiveMQ消息会被重发给客户端/消费者: ...
- JMS学习九 ActiveMQ的消息持久化到Mysql数据库
1.将连接Mysql数据库驱动包,放到ActiveMQ的lib目录下 2,修改ActiveMQ的conf目录下的active.xml文件,修改数据持久化的方式 2.1 修改原来的kshadb的持久化 ...
- 事务消息大揭秘!RocketMQ、Kafka、Pulsar全方位对比
导语 | 事务是一个程序执行单元,里面的所有操作要么全部执行成功,要么全部执行失败.RocketMQ.Kafka和Pulsar都是当今业界应用十分广泛的开源消息队列(MQ)组件,笔者在工作中遇到关于M ...
- 分布式事务——消息最终一致性方案
前言 分布式事务一直是服务化拆分后一个绕不开的话题,原来在单体应用中执行的多个逻辑操作,现在被拆分成了多个服务之间的远程调用.虽然服务化为我们的系统带来了水平伸缩的能力,然而随之而来挑战就是分布式事务 ...
- 浅谈 RocketMQ、Kafka、Pulsar 的事务消息
作者:ruoyuliu刘若愚,腾讯 WXG 后台开发工程师 导语 事务是一个程序执行单元,里面的所有操作要么全部执行成功,要么全部执行失败.RocketMQ.Kafka 和 Pulsar 都是当今业界 ...
- Apache RocketMQ 正式开源分布式事务消息
摘要: 近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事 ...
- 消息中间件学习总结(15)——Apache RocketMQ 正式开源分布式事务消息
近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事务消息, ...
- RocketMQ事务消息
文章目录 事务消息 RocketMQ事务流程概要 RocketMQ事务流程关键 事务消息的使用约束 分布式事务场景分析 场景案例 RocketMQ事务消息设计分析 消费事务消息 事务消息 Rocket ...
- 深度剖析如何实现事务消息
来自:咖啡拿铁 1.背景 分布式事务一直是一个老生常谈的一个话题,在我的公众号下面下面已经写过很多篇分布式事务相关的文章了,但是依旧没有将其完全剖析.在之前的文章中我也多次提到我们可以使用消息队列来实 ...
最新文章
- PTA基础编程题目集-6-7 统计某类完全平方数
- 02NSString 转换 UTF8
- boost::lambda::is_instance_of_1用法的测试程序
- Kettle使用_22 维度更新 缓慢变化维 拉链表
- 用计算机计算电力系统故障,用计算机计算电力系统故障的方法.ppt
- 升级! Facebook 模型全部迁移至 PyTorch 框架
- 在stackoverflow上使用markdown
- 【Python之旅】第四篇(四):基于面向对象的模拟人生游戏类
- Frequentist 观点和 Bayesian 观点
- 合并基因表达水平(merge gene expression levels, FPKM)
- 【加密工具】2019年网络安全加密工具排行,好用的计算机加密软件推荐
- 抖音超火的罗马时钟html代码,最近抖音上挺火的圆形文字时钟
- 瑞能实业IPO被终止:年营收4.47亿 曾拟募资3.76亿
- 华为往事(十八)--CC08 STP:华为抢占制高点
- 基于数组判断字符串是否是回文
- Android Room数据库使用
- 《北京DRGs系统的研究与应用》学习笔记
- 计算机输入法在桌面显示不出来的,电脑开机无法正常显示桌面只能看到输入法如何解决...
- 嵌入式开发——结构体指针作为参数传递变量的值不正确
- word标尺灰色_如何在Microsoft Word中使用标尺