发送端/生产者(带有事务):


import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Sender {public static void main(String[] args) throws Exception{//第一步:建立connectionFactory工厂对象【需填入用户名、密码、要连接的地址】ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("admin","1234","tcp://localhost:61616");//第二步:通过ConnectionFactory工厂对象创建一个Connection连接,并且调用Connection
的start方法开启连接【connection默认是关闭的】Connection connection = connectionFactory.createConnection();connection.start();//第三步:通过connection创建session会话(上下文环境对象),用于接收消息,参数配置1为是否启用事务,
参数配置2为签收模式【一般我们设置为自动签收】Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);//使用事务的方式进行消息的发送//   Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//使用CLIENT端签收的方式//Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//第四步:通过session创建Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象,
在PTP模式中,Destination被称作Queue即队列;在Pub/Sub模式,Destination被称作Topic即主题.
在程序中可以使用多个Queue和Topic.Destination destination = session.createQueue("first");//第五步:通过session对象创建消息的发送和接收对象(生产者和消费者)MessageProducer/MessageConsumerMessageProducer messageProducer = session.createProducer(null);//第六步:可以使用MessageProducer的setDeliveryMode方法为其设置持久化特性
和非持久化特性(DeliveryMode)//messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//第七步:使用JMS规范的TextMessage形式创建数据(通过session对象),并用MessageProducer
的send方法发送数据for (int i = 1; i <= 10; i++) {//TextMessage textMessage = session.createTextMessage("helloworld"+i);TextMessage textMessage = session.createTextMessage("我是消息内容"+i);//第一个参数: 目的地//第二个参数: 消息//第三个参数: 是否持久化//第四个参数: 优先级【0-9  0-4表示普通    5-9表示加急  默认4】//第五个参数: 消息在mq上的存放有效期【单位毫秒】//messageProducer.send(destination, textMessage,
DeliveryMode.NON_PERSISTENT, i, 1000*60*2);messageProducer.send(destination, textMessage);//TimeUnit.SECONDS.sleep(1);System.out.println("生产者:"+textMessage.getText());}//提交数据session.commit();//session.rollback();if (connection!=null) {connection.close();}}
}

接收端/消费者:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Receiver {  public static void main(String[] args) {  // ConnectionFactory :连接工厂,JMS 用它创建连接  ConnectionFactory connectionFactory;  // Connection :JMS 客户端到JMS Provider 的连接  Connection connection = null;  // Session: 一个发送或接收消息的线程  Session session;  // Destination :消息的目的地;消息发送给谁.  Destination destination;  // 消费者,消息接收者  MessageConsumer consumer;  connectionFactory = new ActiveMQConnectionFactory(  "admin",  "1234", "tcp:/localhost:61616");  try {  // 构造从工厂得到连接对象  connection = connectionFactory.createConnection();  // 启动  connection.start();  // 获取操作连接  session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置  destination = session.createQueue("first");  consumer = session.createConsumer(destination);  while (true) {  // 设置接收者接收消息的时间,为了便于测试,这里谁定为100s  //TextMessage message = (TextMessage) consumer.receive(100000);  TextMessage message = (TextMessage) consumer.receive(); if (null != message) {  //System.out.println("收到消息" + message.getText());  System.out.println("消费数据:" + message.getText());} else {  break;  }  }  } catch (Exception e) {  e.printStackTrace();  } finally {  try {  if (null != connection)  connection.close();  } catch (Throwable ignore) {  }  }  }
} 

生产者(没有事务手动签收):


import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Sender {public static void main(String[] args) throws Exception{//第一步:建立connectionFactory工厂对象【需填入用户名、密码、要连接的地址】ActiveMQConnectionFactory connectionFactory
= new ActiveMQConnectionFactory("admin","1234","tcp://localhost:61616");//第二步:通过ConnectionFactory工厂对象创建一个Connection连接,并且调用Connection的start方法
开启连接【connection默认是关闭的】Connection connection = connectionFactory.createConnection();connection.start();//第三步:通过connection创建session会话(上下文环境对象),用于接收消息,参数配置1为是否启用事务,参数配置2为签收模式【一般我们设置为自动签收】Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);//使用事务的方式进行消息的发送//  Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//使用CLIENT端签收的方式//Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//第四步:通过session创建Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象,在PTP模式中,Destination被称作Queue即队列;
在Pub/Sub模式,Destination被称作Topic即主题.在程序中可以使用多个Queue和Topic.Destination destination = session.createQueue("first");//第五步:通过session对象创建消息的发送和接收对象(生产者和消费者)MessageProducer/MessageConsumerMessageProducer messageProducer = session.createProducer(null);//第六步:可以使用MessageProducer的setDeliveryMode方法为其设置持久化特性和非持久化特性(DeliveryMode)//messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//第七步:使用JMS规范的TextMessage形式创建数据(通过session对象),并用MessageProducer的send方法发送数据for (int i = 1; i <= 20; i++) {//TextMessage textMessage = session.createTextMessage("helloworld"+i);TextMessage textMessage = session.createTextMessage("我是消息内容"+i);//第一个参数: 目的地//第二个参数: 消息//第三个参数: 是否持久化//第四个参数: 优先级【0-9  0-4表示普通    5-9表示加急  默认4】//第五个参数: 消息在mq上的存放有效期【单位毫秒】//messageProducer.send(destination, textMessage, DeliveryMode.NON_PERSISTENT, i, 1000*60*2);messageProducer.send(destination, textMessage);//TimeUnit.SECONDS.sleep(1);System.out.println("生产者:"+textMessage.getText());}//提交数据//session.commit();//session.rollback();if (connection!=null) {connection.close();}}
}

消费端(没有事务手动签收):


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Receiver {  public static void main(String[] args) {  // ConnectionFactory :连接工厂,JMS 用它创建连接  ConnectionFactory connectionFactory;  // Connection :JMS 客户端到JMS Provider 的连接  Connection connection = null;  // Session: 一个发送或接收消息的线程  Session session;  // Destination :消息的目的地;消息发送给谁.  Destination destination;  // 消费者,消息接收者  MessageConsumer consumer;  connectionFactory = new ActiveMQConnectionFactory(  "admin",  "1234", "tcp://localhost:61616");  try {  // 构造从工厂得到连接对象  connection = connectionFactory.createConnection();  // 启动  connection.start();  // 获取操作连接  session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);  // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置  destination = session.createQueue("first");  consumer = session.createConsumer(destination);  while (true) {  // 设置接收者接收消息的时间,为了便于测试,这里谁定为100s  //TextMessage message = (TextMessage) consumer.receive(100000);  TextMessage message = (TextMessage) consumer.receive(); if (null != message) {  //System.out.println("收到消息" + message.getText());  System.out.println("消费数据:" + message.getText());message.acknowledge();} else {  break;  }  }  } catch (Exception e) {  e.printStackTrace();  } finally {  try {  if (null != connection)  connection.close();  } catch (Throwable ignore) {  }  }  }
} 

ActiveMQ 事务消息 手工签收相关推荐

  1. ActiveMQ的消息重发策略和DLQ处理

    2019独角兽企业重金招聘Python工程师标准>>> ActiveMQ的消息重发策略和DLQ处理 博客分类: MQ 在以下三种情况中,ActiveMQ消息会被重发给客户端/消费者: ...

  2. JMS学习九 ActiveMQ的消息持久化到Mysql数据库

    1.将连接Mysql数据库驱动包,放到ActiveMQ的lib目录下 2,修改ActiveMQ的conf目录下的active.xml文件,修改数据持久化的方式 2.1  修改原来的kshadb的持久化 ...

  3. 事务消息大揭秘!RocketMQ、Kafka、Pulsar全方位对比

    导语 | 事务是一个程序执行单元,里面的所有操作要么全部执行成功,要么全部执行失败.RocketMQ.Kafka和Pulsar都是当今业界应用十分广泛的开源消息队列(MQ)组件,笔者在工作中遇到关于M ...

  4. 分布式事务——消息最终一致性方案

    前言 分布式事务一直是服务化拆分后一个绕不开的话题,原来在单体应用中执行的多个逻辑操作,现在被拆分成了多个服务之间的远程调用.虽然服务化为我们的系统带来了水平伸缩的能力,然而随之而来挑战就是分布式事务 ...

  5. 浅谈 RocketMQ、Kafka、Pulsar 的事务消息

    作者:ruoyuliu刘若愚,腾讯 WXG 后台开发工程师 导语 事务是一个程序执行单元,里面的所有操作要么全部执行成功,要么全部执行失败.RocketMQ.Kafka 和 Pulsar 都是当今业界 ...

  6. Apache RocketMQ 正式开源分布式事务消息

    摘要: 近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事 ...

  7. 消息中间件学习总结(15)——Apache RocketMQ 正式开源分布式事务消息

    近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事务消息, ...

  8. RocketMQ事务消息

    文章目录 事务消息 RocketMQ事务流程概要 RocketMQ事务流程关键 事务消息的使用约束 分布式事务场景分析 场景案例 RocketMQ事务消息设计分析 消费事务消息 事务消息 Rocket ...

  9. 深度剖析如何实现事务消息

    来自:咖啡拿铁 1.背景 分布式事务一直是一个老生常谈的一个话题,在我的公众号下面下面已经写过很多篇分布式事务相关的文章了,但是依旧没有将其完全剖析.在之前的文章中我也多次提到我们可以使用消息队列来实 ...

最新文章

  1. PTA基础编程题目集-6-7 统计某类完全平方数
  2. 02NSString 转换 UTF8
  3. boost::lambda::is_instance_of_1用法的测试程序
  4. Kettle使用_22 维度更新 缓慢变化维 拉链表
  5. 用计算机计算电力系统故障,用计算机计算电力系统故障的方法.ppt
  6. 升级! Facebook 模型全部迁移至 PyTorch 框架
  7. 在stackoverflow上使用markdown
  8. 【Python之旅】第四篇(四):基于面向对象的模拟人生游戏类
  9. Frequentist 观点和 Bayesian 观点
  10. 合并基因表达水平(merge gene expression levels, FPKM)
  11. 【加密工具】2019年网络安全加密工具排行,好用的计算机加密软件推荐
  12. 抖音超火的罗马时钟html代码,最近抖音上挺火的圆形文字时钟
  13. 瑞能实业IPO被终止:年营收4.47亿 曾拟募资3.76亿
  14. 华为往事(十八)--CC08 STP:华为抢占制高点
  15. 基于数组判断字符串是否是回文
  16. Android Room数据库使用
  17. 《北京DRGs系统的研究与应用》学习笔记
  18. 计算机输入法在桌面显示不出来的,电脑开机无法正常显示桌面只能看到输入法如何解决...
  19. 嵌入式开发——结构体指针作为参数传递变量的值不正确
  20. word标尺灰色_如何在Microsoft Word中使用标尺

热门文章

  1. 算法导论-装配线调度问题
  2. tomcat压力测试、优化
  3. 你的微博也被盗赞?试试HSTS强制HTTPS加密
  4. 开机故障中的MBR引导故障的排查
  5. you do not have permission
  6. oracle 动态注册和静态注册
  7. 微软企业级加解密解决方案MBAM架构
  8. 每天一个linux命令(58):telnet命令
  9. 254. Factor Combinations
  10. 教你50招提升ASP.NET性能(十五):解决性能问题时不要低估UI的价值