Java事务之八——分布式事务(Spring+JTA+Atomikos+Hibernate+JMS)
在本系列先前的文章中,我们主要讲解了JDBC对本地事务的处理,本篇文章将讲到一个分布式事务的例子。
请通过以下方式下载github源代码:
git clone https://github.com/davenkin/jta-atomikos-hibernate-activemq.git
本地事务和分布式事务的区别在于:本地事务只用于处理单一数据源事务(比如单个数据库),分布式事务可以处理多种异构的数据源,比如某个业务操作中同时包含了JDBC和JMS或者某个操作需要访问多个不同的数据库。
Java通过JTA完成分布式事务,JTA本身只是一种规范,不同的应用服务器都包含有自己的实现(比如JbossJTA),同时还存在独立于应用服务器的单独JTA实现,比如本篇中要讲到的Atomikos。对于JTA的原理,这里不细讲,读者可以通过这篇文章了解相关知识。
在本篇文章中,我们将实现以下一个应用场景:你在网上购物,下了订单之后,订单数据将保存在系统的数据库中,同时为了安排物流,订单信息将以消息(Message)的方式发送到物流部门以便送货。
以上操作同时设计到数据库操作和JMS消息发送,为了使整个操作成为一个原子操作,我们只能选择分布式事务。我们首先设计一个service层,定义OrderService接口:
packagedavenkin;public interfaceOrderService {public voidmakeOrder(Order order);
}
为了简单起见,我们设计一个非常简单的领域对象Order:
@XmlRootElement(name = "Order")
@XmlAccessorType(XmlAccessType.FIELD)public classOrder {@XmlElement(name= "Id",required = true)private longid;@XmlElement(name= "ItemName",required = true)privateString itemName;@XmlElement(name= "Price",required = true)private doubleprice;@XmlElement(name= "BuyerName",required = true)privateString buyerName;@XmlElement(name= "MailAddress",required = true)privateString mailAddress;publicOrder() {}
为了采用JAXB对Order对象进行Marshal和Unmarshal,我们在Order类中加入了JAXB相关的Annotation。 我们将使用Hibernate来完成数据持久化,然后使用Spring提供的JmsTemplate将Order转成xml后以TextMessage的形式发送到物流部门的ORDER.QUEUE中。
(一)准备数据库
为了方便,我们将采用Spring提供的embedded数据库,默认情况下Spring采用HSQL作为后台数据库,虽然在本例中我们将采用HSQL的非XA的DataSource,但是通过Atomikos包装之后依然可以参与分布式事务。
SQL脚本包含在createDB.sql文件中:
CREATE TABLEUSER_ORDER(
IDINT NOT NULL,
ITEM_NAMEVARCHAR (100) NOT NULL UNIQUE,
PRICEDOUBLE NOT NULL,
BUYER_NAMECHAR (32) NOT NULL,
MAIL_ADDRESSVARCHAR(500) NOT NULL,PRIMARY KEY(ID)
);
在Spring中配置DataSource如下:
<jdbc:embedded-databaseid="dataSource"><jdbc:scriptlocation="classpath:createDB.sql"/></jdbc:embedded-database>
(二)启动ActiveMQ
我们将采用embedded的ActiveMQ,在测试之前启动ActiveMQ提供的BrokerService,在测试执行完之后关闭BrokerService。
@BeforeClasspublic static void startEmbeddedActiveMq() throwsException {broker= newBrokerService();broker.addConnector("tcp://localhost:61616");broker.start();}@AfterClasspublic static void stopEmbeddedActiveMq() throwsException {broker.stop();}
(三)实现OrderService
创建一个DefaultOrderService,该类实现了OrderService接口,并维护一个JmsTemplate和一个Hibernate的SessionFactory实例变量,分别用于Message的发送和数据库处理。
packagedavenkin;importorg.hibernate.SessionFactory;importorg.hibernate.classic.Session;importorg.springframework.beans.factory.annotation.Required;importorg.springframework.jms.core.JmsTemplate;importorg.springframework.transaction.annotation.Transactional;public class DefaultOrderService implementsOrderService{privateJmsTemplate jmsTemplate;privateSessionFactory sessionFactory;@Override@Transactionalpublic voidmakeOrder(Order order) {Session session=sessionFactory.getCurrentSession();session.save(order);jmsTemplate.convertAndSend(order);}@Requiredpublic voidsetJmsTemplate(JmsTemplate jmsTemplate) {this.jmsTemplate =jmsTemplate;}@Requiredpublic voidsetSessionFactory(SessionFactory sessionFactory) {this.sessionFactory =sessionFactory;}
}
(四)创建Order的Mapping配置文件
<?xml version="1.0"?>
<!DOCTYPE hibernate-mapping PUBLIC "-//Hibernate/Hibernate Mapping DTD 3.0//EN""http://hibernate.sourceforge.net/hibernate-mapping-3.0.dtd"><hibernate-mapping><classname="davenkin.Order"table="USER_ORDER"><idname="id"type="long"><columnname="ID" /><generatorclass="increment" /></id><propertyname="itemName"type="string"><columnname="ITEM_NAME" /></property><propertyname="price"type="double"><columnname="PRICE"/></property><propertyname="buyerName"type="string"><columnname="BUYER_NAME"/></property><propertyname="mailAddress"type="string"><columnname="MAIL_ADDRESS"/></property></class>
</hibernate-mapping>
(五)配置Atomikos事务
在Spring的IoC容器中,我们需要配置由Atomikos提供的UserTransaction和TransactionManager,然后再配置Spring的JtaTransactionManager:
<beanid="userTransactionService"class="com.atomikos.icatch.config.UserTransactionServiceImp"init-method="init"destroy-method="shutdownForce"><constructor-arg><props><propkey="com.atomikos.icatch.service">com.atomikos.icatch.standalone.UserTransactionServiceFactory</prop></props></constructor-arg></bean><beanid="atomikosTransactionManager"class="com.atomikos.icatch.jta.UserTransactionManager"init-method="init"destroy-method="close"depends-on="userTransactionService"><propertyname="forceShutdown"value="false" /></bean><beanid="atomikosUserTransaction"class="com.atomikos.icatch.jta.UserTransactionImp"depends-on="userTransactionService"><propertyname="transactionTimeout"value="300" /></bean><beanid="jtaTransactionManager"class="org.springframework.transaction.jta.JtaTransactionManager"depends-on="userTransactionService"><propertyname="transactionManager"ref="atomikosTransactionManager" /><propertyname="userTransaction"ref="atomikosUserTransaction" /></bean><tx:annotation-driventransaction-manager="jtaTransactionManager" />
(六)配置JMS
对于JMS,为了能使ActiveMQ加入到分布式事务中,我们需要配置ActiveMQXAConnectionFactory,而不是ActiveMQConnectionFactory,然后再配置JmsTemplate,此外还需要配置MessageConvertor在Order对象和XML之间互转。
<beanid="jmsXaConnectionFactory"class="org.apache.activemq.ActiveMQXAConnectionFactory"><propertyname="brokerURL"value="tcp://localhost:61616" /></bean><beanid="amqConnectionFactory"class="com.atomikos.jms.AtomikosConnectionFactoryBean"init-method="init"><propertyname="uniqueResourceName"value="XAactiveMQ" /><propertyname="xaConnectionFactory"ref="jmsXaConnectionFactory" /><propertyname="poolSize"value="5"/></bean><beanid="jmsTemplate"class="org.springframework.jms.core.JmsTemplate"><propertyname="connectionFactory"ref="amqConnectionFactory"/><propertyname="receiveTimeout"value="2000" /><propertyname="defaultDestination"ref="orderQueue"/><propertyname="sessionTransacted"value="true" /><propertyname="messageConverter"ref="oxmMessageConverter"/></bean><beanid="orderQueue"class="org.apache.activemq.command.ActiveMQQueue"><constructor-argvalue="ORDER.QUEUE"/></bean><beanid="oxmMessageConverter"class="org.springframework.jms.support.converter.MarshallingMessageConverter"><propertyname="marshaller"ref="marshaller"/><propertyname="unmarshaller"ref="marshaller"/></bean><oxm:jaxb2-marshallerid="marshaller"><oxm:class-to-be-boundname="davenkin.Order"/></oxm:jaxb2-marshaller>
(七)测试
在测试中,我们首先通过(二)中的方法启动ActiveMQ,再调用DefaultOrderService,最后对数据库和QUEUE进行验证:
@Testpublic voidmakeOrder(){orderService.makeOrder(createOrder());JdbcTemplate jdbcTemplate= newJdbcTemplate(dataSource);assertEquals(1, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER"));String dbItemName= jdbcTemplate.queryForObject("SELECT ITEM_NAME FROM USER_ORDER", String.class);String messageItemName=((Order) jmsTemplate.receiveAndConvert()).getItemName();assertEquals(dbItemName, messageItemName);}@Test(expected= IllegalArgumentException.class)public voidfailToMakeOrder(){orderService.makeOrder(null);JdbcTemplate jdbcTemplate= newJdbcTemplate(dataSource);assertEquals(0, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER"));assertNull(jmsTemplate.receiveAndConvert());}
转载于:https://www.cnblogs.com/davenkin/archive/2013/03/19/java-tranaction-8.html
Java事务之八——分布式事务(Spring+JTA+Atomikos+Hibernate+JMS)相关推荐
- java jta 例子_Java事务处理全解析(八)——分布式事务入门例子(Spring+JTA+Atomikos+Hibernate+JMS)...
在本系列先前的文章中,我们主要讲解了JDBC对本地事务的处理,本篇文章将讲到一个分布式事务的例子. 请通过以下方式下载github源代码: 本地事务和分布式事务的区别在于:本地事务只用于处理单一数据源 ...
- Java 的开源分布式事务解决方案框架 fescar
fescar 全称为:Fast & Easy Commit And Rollback.它是一个完全基于 Java 的分布式事务解决框架.fescar 拥有很高的性能,并且使用非常的方便! FE ...
- 什么是事务? 事务的隔离级别和事务运行的模式分别是什么?spring 事务和分布式事务实现方式有哪些?
目录 什么是事务? 事务的隔离级别: 事务运行的模式: spring 事务实现方式: 分布式事务实现方式: 什么是事务? 百度百科中解释:指作为单个逻辑工作单元执行的一系列操作,此操作是对数据库的操作 ...
- 18c分布式事务 oracle_分布式事务的现象及理解
分布式事务 背景 在微服务环境下,因为会根据不同的业务会拆分成不同的服务,比如会员服务.订单服务.商品服务等,让专业的人做专业的事情,每个服务都有自己独立的数据库,并且是独立运行,互不影响.但是每个服 ...
- 尚硅谷谷粒商城第六天 本地事务、分布式事务及seata
1. 本地事务 商品新增功能非常复杂,商品管理微服务在service层中调用保存spu和sku相关的方法,为了保证数据的一致性,必然会使用事务. 在JavaEE企业级开发的应用领域,为了保证数据的完整 ...
- 【网站架构】一招搞定90%的分布式事务,实打实介绍数据库事务、分布式事务的工作原理应用场景
大家好,欢迎来到停止重构的频道.本期,我们来聊一下数据库事务以及分布式事务. 大家都在强调事务的重要性,而分布式事务也说是微服务必备的.但又说事务会影响性能,分布式事务更是很复杂的东西.使得大家都很迷 ...
- springcloud分布式事务_Springcloud 分布式事务集成Naco Seata
前言:分布式系统架构中,最最费劲的是分布式事务,分布式事务解决方案网上大致分为两种 消息一致性 基于TCC分布式事务 不管基于那种解决方案,都是对侵入的代码植入,以大量的代码或者消息来作为代价,来实现 ...
- 分布式事务:分布式事务原理概述
1.什么是分布式事务 分布式事务就是指事务的资源分别位于不同的分布式系统的不同节点之上的事务: 2.分布式事务产生的原因 2.1.数据库分库分表 在单库单表场景下,当业务数据量达到单库单表的极限时,就 ...
- 分布式事务:本地事务与分布式事务
分布式事务:本地事务与分布式事务 我们编写的扣减库存与保存订单是在两个服务中存在的.因为扣减库存后出现某些原因导致整一个下单流程出现错误,此刻会出现这样的状况:保存订单的本地事务(加了@Transac ...
最新文章
- 也许每个农村出来的码农都有个田园梦
- 计算机健康教育应用的意义,健康教育路径计算机模块的建立与应用 (3)
- Image-to-Image Translation with conditional Adversarial Networks ---- Pix-2-Pix
- Spring Boot中Thymeleaf的初步使用
- 求图形学基本算法好书推荐?
- jmeter java性能_jmeter之自定义java请求性能测试
- 单片机快速将库函数版代码移植为寄存器代码方法
- dnf维护服务器 安图妮15天,DNF带3次暴走安图恩被封15天 我这数据哪异常了?
- php中ini_get,关于ini_get php手册的例子?
- 【BZOJ-1097】旅游景点atr SPFA + 状压DP
- Jenkins配置ansible
- clickhouse性能优化实践
- 关于【缓存穿透、缓存击穿、缓存雪崩、热点数据失效】解决方案
- matlab的GUI滤波器设计,基于Matlab GUI的模拟带通滤波器的设计
- MAVEN实战 整理 笔记
- 令牌环算法_一环(令牌)将它们全部统治
- 关于kali的pycharm创建快捷方式
- 计算机的内存储器应用范围,计算机的内存储器可与cpu什么交换信息
- 深度学习、目标检测相关博客链接
- PS制作火焰效果文字的方法步骤教程
热门文章
- 基于Python实现相关分析案例
- R包实践:lubridate 处理时间数据
- ctr z撤回反向_Ctrl+Z 的反快捷键是什么
- java语言程序设计考题_《JAVA语言程序设计》期末考试试题及答案6(应考必备题库)...
- 汇编学习--7.9--寄存器
- python字典键值可以是元组吗_python – 为同一个字典值创建可交换元组键...
- linux 邮件服务器 并给外网发送邮件,Linux下判断公网IP是否改变,并发送邮件通知...
- 浏览器接收响应消息并显示内容
- c++:ISO C++ forbids declaration of ‘xxx’ with no type
- UEBA能够检测的七大类安全风险