您的位置:首页 > 编程语言 > Java开发

java分布式事务JTA

2018-02-28 11:07 330 查看
什么是事务处理

事务是计算机应用中不可或缺的组件模型,它保证了用户操作的原子性 ( Atomicity )、一致性 ( Consistency )、隔离性 ( Isolation ) 和持久性 ( Durabilily )。关于事务最经典的示例莫过于信用卡转账:将用户 A 账户中的 500 元人民币转移到用户 B 的账户中,其操作流程如下

1. 将 A 账户中的金额减少 500

2. 将 B 账户中的金额增加 500

这两个操作必须保正 ACID 的事务属性:即要么全部成功,要么全部失败;假若没有事务保障,用户的账号金额将可能发生问题:

假如第一步操作成功而第二步失败,那么用户 A 账户中的金额将就减少 500 元而用户 B 的账号却没有任何增加(不翼而飞);同样如果第一步出错 而第二步成功,那么用户 A 的账户金额不变而用户 B 的账号将增加 500 元(凭空而生)。上述任何一种错误都会产生严重的数据不一致问题,事务的缺失对于一个稳定的生产系统是不可接受的。

Java事务API(JTA:Java Transaction API)和它的同胞Java事务服务(JTS:Java Transaction Service),为J2EE平台提供了分布式事务服务(distributed transaction)。

一个分布式事务(distributed transaction)包括一个事务管理器(transaction manager)和一个或多个资源管理器(resource manager)。

一个资源管理器(resource manager)是任意类型的持久化数据存储。

事务管理器(transaction manager)承担着所有事务参与单元者的相互通讯的责任。

JTA事务比JDBC事务更强大。一个JTA事务可以有多个参与者,而一个JDBC事务则被限定在一个单一的数据库连接。

分布式事务(Distributed Transaction)包括事务管理器(Transaction Manager)和一个或多个支持 XA 协议的资源管理器 ( Resource Manager )。我们可以将资源管理器看做任意类型的持久化数据存储;事务管理器承担着所有事务参与单元的协调与控制。

XA协议由Tuxedo首先提出的,并交给X/Open组织,作为资源管理器(数据库)与事务管理器的接口标准。目前,Oracle、Informix、DB2和Sybase等各大数据库厂家都提供对XA的支持。XA协议采用两阶段提交方式来管理分布式事务。XA接口提供资源管理器与事务管理器之间进行通信的标准接口。XA协议包括两套函数,以xa_开头的及以ax_开头的。

java的分布式配置要用到Atomikos

Atomikos 是一个为Java平台提供增值服务的并且开源类事务管理器。

Atomikos分两个:一个是开源的TransactionEssentials,一个是商业的ExtremeTransactions。

TransactionEssentials的主要特征:JTA/XA 事务管理 —— 提供事务管理和连接池不需要应用服务器 —— TransactionEssentials可以在任何Java EE应用服务器中运行,也就是不依赖于任何应用服务器开源 —— TransactionEssentials是遵守Apache版本2许可的开源软件专注于JDBC/JMS —— 支持所有XA资源,但是资源池和消息监听是专供JDBC和JMS的与Spring 和 Hibernate 集成 —— 提供了描述如何与Spring和Hibernate集成的文档

一、环境

spring 2

ibatis2

AtomikosTransactionsEssentials-3.7.0 下载地址:http://www.atomikos.com/Main/InstallingTransactionsEssentials

MySQL-5.1 :数据库引擎为InnoDB,只有这样才能支持事务

JDK1.6

Oracle10

二 jar包

Atomikos jar必须包 transactions-jdbc.jar

transactions-jta.jar

transactions.jar

atomikos-util.jar

transactions-api.jar

ibatis,spring

只要符合二者集成jar组合即可,无额外jar

一下是具体代码实践:不需要修改代码,只要修改数据源的配置文件即可

pom.xml

<!--atomikos -->
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-jdbc</artifactId>
</dependency>

<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
</dependency>


dataSource.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd
">
<!--oracle数据源 -->
<bean id="dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"
init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="ds1" />
<property name="xaDataSourceClassName" value="oracle.jdbc.xa.client.OracleXADataSource" />
<property name="xaProperties">
<props>
<prop key="URL">${mall.settlement.oracle.jdbc.url}</prop>
<prop key="user">${mall.settlement.oracle.jdbc.usrename}</prop>
<prop key="password">${mall.settlement.oracle.jdbc.password}</prop>
</props>
</property>
<property name="minPoolSize" value="0" /> <!-- 连接池中保留的最小连接数 -->
<property name="maxPoolSize" value="100" /><!-- 连接池中保留的最大连接数 -->
<property name="borrowConnectionTimeout" value="1" />  <!--获取连接失败重新获等待最大时间,在这个时间内如果有可用连接,将返回 -->
<property name="maintenanceInterval" value="60" /><!--连接回收时间 -->
<property name="maxIdleTime" value="60" /> <!-- 最大空闲时间,60秒内未使用则连接被丢弃。若为0则永不丢弃。Default: 0 -->
<property name="loginTimeout" value="60" />     <!--java数据库连接池,最大可等待获取datasouce的时间 -->
<!-- set a SQL for testing connection -->
<property name="testQuery" value="${mall.settlement.oracle.query}" />
</bean>
<!-- myBatis文件 -->
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource" />
<!-- 显式指定Mapper文件位置 -->
<property name="mapperLocations" value="classpath:/oracleMybatisMapper/**/*Mapper.xml" />
<property name="configLocation" value="classpath:mybatis-config.xml"></property>
</bean>
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="basePackage"
value="com.boss.settlement.service.mapper.oracle" />
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
</bean>

<bean id="dataSourceDB2" class="com.atomikos.jdbc.AtomikosDataSourceBean"
init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="ds2" />
<property name="xaDataSourceClassName" value="com.ibm.db2.jcc.DB2XADataSource" />
<property name="xaProperties">
<props>
<prop key="serverName">${mall.settlement.db2.serverName}</prop>
<prop key="portNumber">${mall.settlement.db2.portNumber}</prop>
<prop key="databaseName">${mall.settlement.db2.databaseName}</prop>
<prop key="user">${mall.settlement.db2.jdbc.usrename}</prop>
<prop key="password">${mall.settlement.db2.jdbc.password}</prop>
<prop key="driverType">4</prop>
</props>
</property>
<property name="minPoolSize" value="0" /> <!-- 连接池中保留的最小连接数 -->
<property name="maxPoolSize" value="100" /><!-- 连接池中保留的最大连接数 -->
<property name="borrowConnectionTimeout" value="1" />  <!--获取连接失败重新获等待最大时间,在这个时间内如果有可用连接,将返回 -->
<property name="maintenanceInterval" value="60" /><!--连接回收时间 -->
<property name="maxIdleTime" value="60" /> <!-- 最大空闲时间,60秒内未使用则连接被丢弃。若为0则永不丢弃。Default: 0 -->
<property name="loginTimeout" value="60" />     <!--java数据库连接池,最大可等待获取datasouce的时间 -->
<!-- set a SQL for testing connection -->
<property name="testQuery" value="${mall.settlement.db2.query}" />
</bean>

<!-- myBatis文件 -->
<bean id="sqlSessionFactoryDB2" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSourceDB2" />
<!-- 显式指定Mapper文件位置 -->
<property name="mapperLocations" value="classpath:/db2MybaitsMapper/**/*Mapper.xml" />
<property name="configLocation" value="classpath:mybatis-config.xml"></property>
</bean>

<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="basePackage"
value="com.boss.settlement.service.mapper.db2" />
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactoryDB2" />
</bean>

<bean id="transactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager">
<bean class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close">
<property name="forceShutdown" value="true" />
</bean>
</property>
<property name="userTransaction">
<bean class="com.atomikos.icatch.jta.UserTransactionImp" />
</property>
<property name="allowCustomIsolationLevels" value="true" />
</bean>

<tx:annotation-driven />
</beans>


java代码:

@Transactional
@Override
@SuppressWarnings("unchecked")
public Map<String, Object> fundTransferJTA(String loginName, FundTransfer transfer, String channelType) {
Map<String, Object> resultMap = new HashMap<String, Object>();
try {
// 转账审核通过
transfer.setCheckDate(new Date());
transfer.setCheckStatus(CheckStatusEnum.CHECK_STATUS_Y.getKey());
transfer.setCheckUser(loginName);
fundTransferMapper.updateFundTransfer(transfer);
// 划转资金
Map<String, Object> fundMidMap = new HashMap<String, Object>();
StringBuffer str = new StringBuffer();
str.append("TRANSFER_ID=" + transfer.getTransferId());
fundMidMap.put("in_para", str.toString());
fundMidProcedureMapper.pdFundTransfer(fundMidMap);
PdFundMidCrtRollDto pdFundMidCrtRollDto = ((List<PdFundMidCrtRollDto>) fundMidMap.get("out_pay_result")).get(0);
String batchNoStr = pdFundMidCrtRollDto.getBatchNo();
if (StringUtils.isBlank(batchNoStr)) {
resultMap.put("msg", "划入货款/划出货款返回批次为空");
resultMap.put("isSuccess", false);
return resultMap;
}
Map<String, Object> payMap = payChgCalcJTA(batchNoStr, channelType);
if (!(boolean) payMap.get("isSuccess")) {
// 手动回滚
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
resultMap.put("msg", payMap.get("msg"));
resultMap.put("isSuccess", false);
return resultMap;
}
}
catch (Exception e) {
e.printStackTrace();
// 手动回滚
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
resultMap.put("msg", ErrorMessageUtil.getErrorMessage(e));
resultMap.put("isSuccess", false);
return resultMap;
}
resultMap.put("isSuccess", true);
return resultMap;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: