DTP模型和XA规范

X/Open Distributed Transaction Processing (DTP) Model (X/Open XA), 是X/Open,即现在的open group,是一个独立的组织,主要负责制定各种行业技术标准,提出的分布式事务处理规范,已经成为事实上的事务模型组件的行为标准。

X/Open主要提供了以下参考文档:

DTP模型

DTP模型的主要流程图如下:

taskImg

在图上可以清楚的看到构成DTP模型的4个基本元素:

  • 应用程序(Application Program ,简称AP):用于定义事务边界(即定义事务的开始和结束),并且在事务边界内对资源进行操作。

  • 资源管理器(Resource Manager,简称RM):如数据库、文件系统等,并提供访问资源的方式。

  • 事务管理器(Transaction Manager ,简称TM):负责分配事务唯一标识,监控事务的执行进度,并负责事务的提交、回滚等。

  • 通信资源管理器(Communication Resource Manager,简称CRM):控制一个TM域(TM domain)内或者跨TM域的分布式应用之间的通信。

上述元素在多节点的分布式环境下,在DTP本地模型实例中,由AP、RMs和TM组成,不需要其他元素。AP、RM和TM之间,彼此都需要进行交互,如下图所示:

taskImg

这张图中(1)表示AP-RM的交互接口,(2)表示AP-TM的交互接口,(3)表示RM-TM的交互接口。

XA规范

XA就是DTP模型中的事务管理器(TM)与数据库(RM)之间的接口规范(即接口函数),事务管理器(TM)用它来通知数据库(RM)事务的开始、结束以及提交、回滚等。

XA接口函数由数据库厂商提供。通常情况下,TM与RM通过XA 接口规范,使用二阶段提交协议来完成一个全局事务,XA规范的基础是二阶段提交协议(2pc),目前几乎所有的主流数据库都对XA规范提供了支持,例如Oracle、DB2、SQL Server、MySQL、PostgreSQL。

XA的大致流程如下图所示:

taskImg

但是XA也存在一些缺点:

  • 由于属于强一致性事务规范,所以性能比较低(阻塞、响应时间增加、死锁);
  • 依赖于独立的J2EE中间件,Weblogic、Jboss,后期轻量级的Atomikos、JOTM;
  • 不是所有资源(RM)都支持XA协议;

JTA(Java Transaction API)

即Java的事务API,基于XA实现,也就是RM需要支持XA,所以也有JTA(XA)的说法,JTA仅定义了接口,这些接口如下:

  • XAResource :XAResource接口是对实现了X/Open CAE规范的资源管理器 (Resource Manager,数据库就是典型的资源管理器) 的抽象,它由资源适配器 (Resource Apdater) 提供实现。XAResource是支持事务控制的核心。
  • Transaction:Transaction接口是一个事务实例的抽象,通过它可以控制事务内多个资源的提交或者回滚。二阶段提交过程也是由Transaction接口的实现者来完成的。
  • TransactionManager:托管模式 (managed mode) 下,TransactionManager接口是被应用服务器调用,以控制事务的边界的。
  • UserTransaction:非托管模式 (non-managed mode) 下,应用程序可以通过UserTransaction接口控制事务的边界

taskImg

taskImg

taskImg

目下JTA的实现有几种形式:

  • J2EE容器提供的JTA实现(Weblogic、JBoss );
  • JOTM(Java Open Transaction Manager)、Atomikos、Bitronix等可独立于J2EE容器的环境下第三方中间件实现JTA;

Atomikos介绍

atomikos是一个产品,核心功能是提供事务管理器功能,即xa模型或jta中的TM。

Atomikos公司官方网址为:https://www.atomikos.com/。其旗下最著名的产品就是事务管理器。产品分两个版本:

  • TransactionEssentials:开源的免费产品

  • ExtremeTransactions:商业版,需要收费。

这两个产品的关系如下图所示:

taskImg

TransactionEssentials

  1. 实现了JTA/XA规范中的事务管理器(Transaction Manager)应该实现的相关接口,比如:
  • UserTransaction实现是com.atomikos.icatch.jta.UserTransactionImp,用户只需要直接操作这个类
  • TransactionManager实现是com.atomikos.icatch.jta.UserTransactionManager
  • Transaction实现是com.atomikos.icatch.jta.TransactionImp
  1. 针对实现了JDBC规范中规定的实现了XADataSource接口的数据库连接池,以及实现了JMS规范的MQ客户端提供一层封装。

在JTA规范中,提到过XADataSource、XAConnection等接口应该由资源管理器RM来实现,而Atomikos的作用是一个事务管理器TM,并不需要提供对应的实现。而Atomikos对XADataSource进行封装,只是为了方便与事务管理器整合。封装XADataSource的实现类为AtomikosDataSourceBean。

典型的XADataSource实现包括:

  1. mysql官方提供的com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
  2. 阿里巴巴开源的druid连接池,对应的实现类为com.alibaba.druid.pool.xa.DruidXADataSource
  3. PostgreSql官方提供的org.postgresql.xa.PGXADataSource

而其他一些常用的数据库连接池,如dbcp、dbcp2或者c3p0,目前貌似尚未提供XADataSource接口的实现。如果提供给AtomikosDataSourceBean一个没有实现XADataSource接口的数据源,如c3p0的ComboPooledDataSource,则会抛出类似以下异常:

1
2
3
com.atomikos.jdbc.AtomikosSQLException: 
The class com.mchange.v2.c3p0.ComboPooledDataSource specified by property xaDataSourceClassName does not implement the required interface javax.jdbc.XADataSource
Please make sure the spelling is correct, and check your JDBC driver vendor documentation.

ExtremeTransactions

ExtremeTransactions在TransactionEssentials的基础上额外提供了以下功能:

  • 支持TCC柔性事务
  • 支持通过RMI、IIOP、SOAP这些远程过程调用技术,进行事务传播。

Atomikos原理

分布式事务创建

这个和 Spring 传统事务相比来说,Spring 的事务通过的是 JpaTransactionManager 进行管理,而这套逻辑采用的是 JtaTransactionManager 进行管理。通过调用 Atomikos 框架实现相关逻辑。

这个的核心就是 CompositeTransaction 对象,这个对象基本就可以说是一个分布式事务,它是从一个 map 中进行获取,这个 map 的 key 是当前线程,value 就是 CompositeTransaction 对象,如果 map 集合中没有的话,就会进行创建,并将其加入到 map 集合中。这里要注意的是,这个 map 是 hashMap , 所以在获取的时候,加入了 synchronized 机制。

1
2
3
4
5
6
7
8
9
10
11

private CompositeTransactionImp getCurrentTx() {
Thread thread = Thread.currentThread();
synchronized(this.threadtotxmap_) {
// 获取 CompositeTransactionImp 对象
Stack txs = (Stack)this.threadtotxmap_.get(thread);
return txs == null ? null : (CompositeTransactionImp)txs.peek();
}
}


这样,获取到线程对应的 CompositeTransactionImp 对象,我们的 SQL 都是由这个线程去进行执行的,这个线程对应的 CompositeTransactionImp 也就代表了执行 SQL 所在的全部库,多个库的事务统一由 CompositeTransactionImp 对象进行管理,同时会为这个事务生成一个事务 id。

taskImg

Atomikos 包装 DataSource

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

@Bean(name="masterXADataSource")
@ConfigurationProperties(prefix="spring.datasource.master")
public DruidXADataSource masterXADataSource(){
return new DruidXADataSource();
}

@Bean(name="slaveXADataSource")
@ConfigurationProperties(prefix="spring.datasource.slave")
public DruidXADataSource slaveXADataSource(){
return new DruidXADataSource();
}


@Bean
@Primary
public DynamicDataSource dynamicDataSource(DruidXADataSource masterXADataSource,
DruidXADataSource slaveXADataSource) {
DataSource masterDataSource = getDataSource(masterXADataSource,Constants.MASTER_DATA_SOURCE_NAME);
DataSource slaveDataSource = getDataSource(slaveXADataSource,Constants.SLAVE_DATA_SOURCE_NAME);
return new DynamicDataSource(masterDataSource,slaveDataSource);
}

private DataSource getDataSource(DruidXADataSource dataSource,String dataSourceKey){
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setUniqueResourceName(dataSourceKey);
atomikosDataSourceBean.setXaDataSource(dataSource);
atomikosDataSourceBean.setPoolSize(5);
atomikosDataSourceBean.setTestQuery("SELECT 1");
atomikosDataSourceBean.setMaintenanceInterval(2);
return atomikosDataSourceBean;
}


我们通过配置文件设置链接池的各种参数之后,会通过创建 DruidXADataSource 对象,获取 Connection 链接,这里我们将链接交给了 AtomikosDataSourceBean ,由 AtomikosDataSourceBean 进行了一定的包装。

核心类就是 AtomikosDataSourceBean 的父类 AbstractDataSourceBean ,通过 getConnection() 去获取链接对象,主要就是通过 connectionPool 链接池去获取对象,这个链接池是通过 init() 方法进行初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

public Connection getConnection ( HeuristicMessage msg ) throws SQLException
{
if ( LOGGER.isInfoEnabled() ) LOGGER.logInfo ( this + ": getConnection ( " + msg + " )..." );
Connection connection = null;

// 初始化连接池
init();

try {
connection = (Connection) connectionPool.borrowConnection ( msg );

} catch (CreateConnectionException ex) {
throwAtomikosSQLException("Failed to grow the connection pool", ex);
} catch (PoolExhaustedException e) {
throwAtomikosSQLException ("Connection pool exhausted - try increasing 'maxPoolSize' and/or 'borrowConnectionTimeout' on the DataSourceBean.");
} catch (ConnectionPoolException e) {
throwAtomikosSQLException("Error borrowing connection", e );
}
if ( LOGGER.isDebugEnabled() ) LOGGER.logDebug ( this + ": returning " + connection );
return connection;
}



protected com.atomikos.datasource.pool.ConnectionFactory doInit() throws Exception
{
... ...
// 这里的 xaDataSource 就是我们创建好的 DruidXADataSource
xaDataSource = (XADataSource) driver;
xaDataSource.setLoginTimeout ( getLoginTimeout() );
xaDataSource.setLogWriter ( getLogWriter() );
PropertyUtils.setProperties(xaDataSource, xaProperties );




JdbcTransactionalResource tr = new JdbcTransactionalResource(getUniqueResourceName() , xaDataSource);

// 这就是最后创建好的 ConnectionFactory
com.atomikos.datasource.pool.ConnectionFactory cf = new com.atomikos.jdbc.AtomikosXAConnectionFactory(xaDataSource, tr, this);
Configuration.addResource ( tr );

return cf;
}


将 DruidXADataSource 创建好链接池之后,就是获取到链接,对链接进行包装了,主要就是创建了一个动态代理对象,AtomikosConnectionProxy 这个对象的 invoke() 方法,就是所有逻辑的实现,这里只是包装,具体逻辑实现,稍后再讲。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

public static Reapable newInstance ( Connection c , SessionHandleState sessionHandleState , HeuristicMessage hmsg )
{
Reapable ret = null;
AtomikosConnectionProxy proxy = new AtomikosConnectionProxy(c, sessionHandleState , hmsg );
Set<Class> interfaces = PropertyUtils.getAllImplementedInterfaces ( c.getClass() );
interfaces.add ( Reapable.class );
//see case 24532
interfaces.add ( DynamicProxy.class );
Class[] interfaceClasses = ( Class[] ) interfaces.toArray ( new Class[0] );

Set<Class> minimumSetOfInterfaces = new HashSet<Class>();
minimumSetOfInterfaces.add ( Reapable.class );
minimumSetOfInterfaces.add ( DynamicProxy.class );
minimumSetOfInterfaces.add ( java.sql.Connection.class );
Class[] minimumSetOfInterfaceClasses = ( Class[] ) minimumSetOfInterfaces.toArray( new Class[0] );

List<ClassLoader> classLoaders = new ArrayList<ClassLoader>();
classLoaders.add ( Thread.currentThread().getContextClassLoader() );
classLoaders.add ( c.getClass().getClassLoader() );
classLoaders.add ( AtomikosConnectionProxy.class.getClassLoader() );


ret = ( Reapable ) ClassLoadingHelper.newProxyInstance ( classLoaders , minimumSetOfInterfaceClasses , interfaceClasses , proxy );

return ret;
}


atomikos封装数据源的流程如下图所示:

taskImg

Start 指令和 SQL 准备

其实 Mybatis 框架要执行的时候,会通过 mapper 组件,通过链接池获取到链接对象,对 SQL 语句进行执行,在这里的话,就是获取到经过包装之后的 AtomikosConnectionsProxy 动态代理对象,核心的逻辑都是由这个动态代理对象的 invoke() 方法来完成 。

这里也是有 XAConnection ,XAResource 这些概念,通过 之前包装的 ConnectionFactory 获取到 CompositeTransactionImp 对象,这个对象就是全局的事务对象 。

当我们要执行 SQL 的时候,会被动态代理对象拦截,通过Druid 链接池的信息,构建好 XAConnection ,XAResouce 对象,这里 Atomikos 框架对 XAResource 进行了封装(XAResourceTransaction),XAResourceTransaction 这个代表的就是分布式事务的一个子事务,这个库的 SQL 第一次执行的时候,就会创建这样的一个子事务并将其加入到 CompositeTransactionImp 全局事务中,然后通过这个子事务的 XAResource 开始 START 指令。

后续的话,就是会去执行 prepareStatement 命令,这个就是去执行我们要执行的 SQL 语句,这个就是通过调用 jdbc 原生的进行执行。

这样也就完成了 START 指令 和 SQL 语句的准备。

taskImg

End 指令

end 指令说到底也是通过子事务的 XAResource 对象进行发送的,基本和上面一样,最终就是 xaresource_.end(xid_,flag)

终止分布式事务

这里面逻辑实在是太深了,大体就记一下,主要就还是通过 xaresource.commit 执行 commit , 且后续会通过 全局事务 CompositeTransaction 获取到所有的子事务,对子事务进行遍历,每个子事务都执行 prepare() 方法,返回一个执行结果,到这里为止,2PC 的第一阶段(Prepare)就完事了。

主要就是在终止分布式事务中,执行 prepare 等待返回结果,根据结果判断是执行 commit 还是 rollback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

protected void terminate ( boolean commit ) throws HeurRollbackException,
HeurMixedException, SysException, java.lang.SecurityException,
HeurCommitException, HeurHazardException, RollbackException,
IllegalStateException

{
synchronized ( fsm_ ) {
if ( commit ) {
if ( participants_.size () <= 1 ) {
commit ( true );
} else {

// 这里就是去执行 prepare() 方法
int prepareResult = prepare ();

// make sure to only do commit if NOT read only
if ( prepareResult != Participant.READ_ONLY )
commit ( false ); // 最终根据判断执行提交或者回滚
}
} else {
rollback ();
}
}
}

如果是 commit 的话,就是对分布式中所有的子事务进行遍历,通过 propagator_.submitPropagationMessage ( cm ); 去触发整个 commit 的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

Enumeration<Participant> enumm = participants.elements ();
while ( enumm.hasMoreElements () ) {
Participant p = enumm.nextElement ();
if ( !readOnlyTable_.containsKey ( p ) ) {
CommitMessage cm = new CommitMessage ( p, commitresult,
onePhase );

// if onephase: set cascadelist anyway, because if the
// participant is a REMOTE one, then it might have
// multiple participants that are not visible here!

if ( onePhase && cascadeList_ != null ) { // null for OTS
Integer sibnum = (Integer) cascadeList_.get ( p );
if ( sibnum != null ) // null for local participant!
p.setGlobalSiblingCount ( sibnum.intValue () );
p.setCascadeList ( cascadeList_ );
}
//
propagator_.submitPropagationMessage ( cm );
}java
} // while

最后的 commit 还是通过每个 RM 对应的 XAResource 去执行 commit 操作,进行事务提交,
xaresource_.commit ( xid_, onePhase );

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

try {
// refresh xaresource for MQSeries: seems to close XAResource after suspend???
testOrRefreshXAResourceFor2PC ();

... ...

// 这就是提交核心
xaresource_.commit ( xid_, onePhase );

} catch ( XAException xaerr ) {
String msg = interpretErrorCode ( resourcename_ , "commit" , xid_ , xaerr.errorCode );
LOGGER.logWarning ( msg , xaerr );
}


如果结果不正常的话,是不会走 commit 的,会进入 rollback 逻辑,进行整体所有子库的回滚操作

1
2
3
4
5
6
7
8
9
10
11
12

Enumeration enumm = participants.elements ();
while ( enumm.hasMoreElements () ) {
Participant p = (Participant) enumm.nextElement ();
if ( !readOnlyTable_.containsKey ( p ) ) {
RollbackMessage rm = new RollbackMessage ( p,
rollbackresult, indoubt );
// 发送回滚消息
propagator_.submitPropagationMessage ( rm );
}
}

rollback 逻辑和 commit 基本是一致的,也是通过 全局事务获取到全局事务中所有的子事务,对子事务进行遍历,通过 propagator_.submitPropagationMessage ( rm ); 发送回滚执行,最终还是通过子事务对应的 XAResource 去执行 rollback 操作 :xaresource_.rollback ( xid_ );

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

try {
if ( state_.equals ( TxState.ACTIVE ) ) { // first suspend xid
suspend ();
}

// refresh xaresource for MQSeries: seems to close XAResource after suspend???
testOrRefreshXAResourceFor2PC ();
if(LOGGER.isInfoEnabled()){
LOGGER.logInfo("XAResource.rollback ( " + xidToHexString + ")"
+ "on resource " + resourcename_
+ " represented by XAResource instance "+ xaresource_);
}

xaresource_.rollback ( xid_ );

} catch ( ResourceException resErr ) {
// failure of suspend
errors.push ( resErr );
throw new SysException ( "Error in rollback: " + resErr.getMessage(), errors );

}

流程总结

taskImg

SpringBoot集成Atomikos

Maven配置

首先引入atomikos的springboot版本的依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>

atomikos配置文件

Atomikos在启动后,默认会从以下几个位置读取配置文件,这里直接贴出atomikos源码进行说明:

具体的加载逻辑,位于com.atomikos.icatch.provider.imp.AssemblerImp类中的initializeProperties方法,其中定义了配置加载的顺序逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

@Override
public ConfigProperties initializeProperties() {

//读取classpath下的默认配置transactions-defaults.properties
Properties defaults = new Properties();
loadPropertiesFromClasspath(defaults, DEFAULT_PROPERTIES_FILE_NAME);


//读取classpath下,transactions.properties配置,覆盖transactions-defaults.properties中相同key的值
Properties transactionsProperties = new Properties(defaults);
loadPropertiesFromClasspath(transactionsProperties, TRANSACTIONS_PROPERTIES_FILE_NAME);


//读取classpath下,jta.properties,覆盖transactions-defaults.properties、transactions.properties中相同key的值
Properties jtaProperties = new Properties(transactionsProperties);
loadPropertiesFromClasspath(jtaProperties, JTA_PROPERTIES_FILE_NAME);


//读取通过java -Dcom.atomikos.icatch.file方式指定的自定义配置文件路径,覆盖之前的同名配置
Properties customProperties = new Properties(jtaProperties);
loadPropertiesFromCustomFilePath(customProperties);


//最终构造一个ConfigProperties对象,来表示实际要使用的配置
Properties finalProperties = new Properties(customProperties);
return new ConfigProperties(finalProperties);
}


从源码中,很清楚就可以配置文件加载顺序和优先级:

transactions-defaults.properties < transactions.properties < jta.properties < 自定义配置文件路径

并且优先级高的配置会覆盖之前同名key的配置

其中transactions-defaults.properties是atomikos自带的默认配置,位于transactions-4.0.6.jar中.

taskImg

以下是4.0.6中 transactions-default.properties中配置内容,归类后如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

===============================================================
============ 事务管理器(TM)配置参数 ==============
===============================================================
#指定是否启动磁盘日志,默认为true。在生产环境下一定要保证为true,否则数据的完整性无法保证
com.atomikos.icatch.enable_logging=true
#JTA/XA资源是否应该自动注册
com.atomikos.icatch.automatic_resource_registration=true
#JTA事务的默认超时时间,默认为10000ms
com.atomikos.icatch.default_jta_timeout=10000
#事务的最大超时时间,默认为300000ms。这表示事务超时时间由 UserTransaction.setTransactionTimeout()较大者决定。4.x版本之后,指定为0的话则表示不设置超时时间
com.atomikos.icatch.max_timeout=300000
#指定在两阶段提交时,是否使用不同的线程(意味着并行)。3.7版本之后默认为false,更早的版本默认为true。如果为false,则提交将按照事务中访问资源的顺序进行。
com.atomikos.icatch.threaded_2pc=false
#指定最多可以同时运行的事务数量,默认值为50,负数表示没有数量限制。在调用 UserTransaction.begin()方法时,可能会抛出一个”Max number of active transactions reached”异常信息,表示超出最大事务数限制
com.atomikos.icatch.max_actives=50
#是否支持subtransaction,默认为true
com.atomikos.icatch.allow_subtransactions=true
#指定在可能的情况下,否应该join 子事务(subtransactions),默认值为true。如果设置为false,对于有关联的不同subtransactions,不会调用XAResource.start(TM_JOIN)
com.atomikos.icatch.serial_jta_transactions=true
#指定JVM关闭时是否强制(force)关闭事务管理器,默认为false
com.atomikos.icatch.force_shutdown_on_vm_exit=false
#在正常关闭(no-force)的情况下,应该等待事务执行完成的时间,默认为Long.MAX_VALUE
com.atomikos.icatch.default_max_wait_time_on_shutdown=9223372036854775807

===============================================================
========= 事务日志(Transaction logs)记录配置 =======
===============================================================
#事务日志目录,默认为./。
com.atomikos.icatch.log_base_dir=./
#事务日志文件前缀,默认为tmlog。事务日志存储在文件中,文件名包含一个数字后缀,日志文件以.log为扩展名,如tmlog1.log。遇到checkpoint时,新的事务日志文件会被创建,数字增加。
com.atomikos.icatch.log_base_name=tmlog
#指定两次checkpoint的时间间隔,默认为500
com.atomikos.icatch.checkpoint_interval=500

===============================================================
========= 事务日志恢复(Recovery)配置 =============
===============================================================
#指定在多长时间后可以清空无法恢复的事务日志(orphaned),默认86400000ms
com.atomikos.icatch.forget_orphaned_log_entries_delay=86400000
#指定两次恢复扫描之间的延迟时间。默认值为与com.atomikos.icatch.default_jta_timeout相同
com.atomikos.icatch.recovery_delay=${com.atomikos.icatch.default_jta_timeout}
#提交失败时,再抛出一个异常之前,最多可以重试几次,默认值为5
com.atomikos.icatch.oltp_max_retries=5
#提交失败时,每次重试的时间间隔,默认10000ms
com.atomikos.icatch.oltp_retry_interval=10000

===============================================================
===================== 其他 ======================
===============================================================
java.naming.factory.initial=com.sun.jndi.rmi.registry.RegistryContextFactory
com.atomikos.icatch.client_demarcation=false
java.naming.provider.url=rmi://localhost:1099
com.atomikos.icatch.rmi_export_class=none
com.atomikos.icatch.trust_client_tm=false


当我们想对默认的配置进行修改时,可以在classpath下新建一个jta.properties,覆盖同名的配置项即可。

关于不同版本配置的差异,请参考官方文档

配置数据源

由于项目使用了mybatis以及多数据源,并且会有动态切换数据源,所以使用了自定义的DynamicDataSource和继承了SpringManagedTransactionFactory的事务工厂,以及实现了org.apache.ibatis.transaction.Transaction的自定义事务类,通过aop的方式,来动态切换数据源和事务

初始化的主数据源配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

@Configuration
public class DataSourceConfig {

@Bean(name="masterXADataSource")
@ConfigurationProperties(prefix="spring.datasource.master")
public DruidXADataSource masterXADataSource(){
return new DruidXADataSource();
}

@Bean(name="slaveXADataSource")
@ConfigurationProperties(prefix="spring.datasource.slave")
public DruidXADataSource slaveXADataSource(){
return new DruidXADataSource();
}


@Bean
@Primary
public DynamicDataSource dynamicDataSource(DruidXADataSource masterXADataSource,
DruidXADataSource slaveXADataSource) {
DataSource masterDataSource = getDataSource(masterXADataSource,Constants.MASTER_DATA_SOURCE_NAME);
DataSource slaveDataSource = getDataSource(slaveXADataSource,Constants.SLAVE_DATA_SOURCE_NAME);
return new DynamicDataSource(masterDataSource,slaveDataSource);
}

private DataSource getDataSource(DruidXADataSource dataSource,String dataSourceKey){
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setUniqueResourceName(dataSourceKey);
atomikosDataSourceBean.setXaDataSource(dataSource);
atomikosDataSourceBean.setPoolSize(5);
atomikosDataSourceBean.setTestQuery("SELECT 1");
atomikosDataSourceBean.setMaintenanceInterval(2);
return atomikosDataSourceBean;
}




@Bean
public SqlSessionFactory sqlSessionFactory(DynamicDataSource dynamicDataSource) throws Exception{
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dynamicDataSource);
sqlSessionFactoryBean.setVfs(SpringBootVFS.class);
sqlSessionFactoryBean.setConfigLocation(new PathMatchingResourcePatternResolver().getResource("classpath:mybatis-config.xml"));
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*.xml"));
sqlSessionFactoryBean.setTransactionFactory(new DynamicDataSourceTransactionFactory());
return sqlSessionFactoryBean.getObject();
}

@Bean
public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory){
return new SqlSessionTemplate(sqlSessionFactory);
}


}


非第一时间加载的动态多数据源配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128


@Component
public class GlobalConfigHolder {

@Resource
private DataBaseInfoService dataBaseInfoService;

@Resource
private DynamicDataSource dynamicDataSource;

private static final Logger logger = LoggerFactory.getLogger(GlobalConfigHolder.class);



private final Map<Integer,String> groupMasterDataSourceMap = new HashMap<>();

private final Map<Integer,String> groupSlaveDataSourceMap = new HashMap<>();



@Bean("groupMasterDataSourceMap")
public Map<Integer,String> getGroupMasterDataSourceMap(){
return groupMasterDataSourceMap;
}

@Bean("groupSlaveDataSourceMap")
public Map<Integer,String> getGroupSlaveDataSourceMap(){
return groupSlaveDataSourceMap;
}




@PostConstruct
private void init(){

loadDatabaseInfos();
}




private void loadDatabaseInfos(){
logger.info("开始加载动态数据源!");
List<DatabaseInfo> dbList = dataBaseInfoService.getDataBaseInfoList();
dbList.forEach(this::processDatabaseInfo);
logger.info("动态数据源已加载完毕!");
}


private void processDatabaseInfo(DatabaseInfo db){
processDynamicDataSourceMap(db);
processGroupDataSourceMapByDb(db);
}


private void processDynamicDataSourceMap(DatabaseInfo db){
DruidXADataSource xaDs = buildDruidDataSource(db);
DataSource ds = getXADataSource(xaDs,db.getName());
dynamicDataSource.addDataSource(db.getName(),ds);
}


private void processGroupDataSourceMapByDb(DatabaseInfo db){
Integer[] groupIds = db.getGroupIds();
if (groupIds==null || groupIds.length==0) return;
if (db.getMaster())
Arrays.stream(groupIds).forEach(groupId->groupMasterDataSourceMap.put(groupId,db.getName()));
if (!db.getMaster())
Arrays.stream(groupIds).forEach(groupId->groupSlaveDataSourceMap.put(groupId,db.getName()));
}



/**
* 根据数据库信息生成数据源连接池
* @param db 数据库信息实体
* @return 数据源连接池
*/
private DruidXADataSource buildDruidDataSource(DatabaseInfo db) {
String url = "jdbc:postgresql://" + db.getHost() + ":" + db.getPort() + "/" + db.getDatabaseName() + "?zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8";
DruidXADataSource ds = DataSourceBuilder.create()
.type(com.alibaba.druid.pool.xa.DruidXADataSource.class)
.driverClassName(Constants.PG_DRIVER_NAME)
.url(url)
.username(db.getUsername())
.password(db.getPassword())
.build();
ds.setInitialSize(10);
ds.setMinIdle(10);
ds.setMaxActive(100);

ds.setMaxWait(30000);
ds.setTimeBetweenEvictionRunsMillis(30000);
ds.setMinEvictableIdleTimeMillis(300000);
ds.setValidationQuery("SELECT 1");
ds.setTestWhileIdle(true);
ds.setTestOnBorrow(false);
ds.setTestOnReturn(true);
ds.setPoolPreparedStatements(false);
ds.setMaxPoolPreparedStatementPerConnectionSize(0);
return ds;
}


private DataSource getXADataSource(DruidXADataSource ds , String key){
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setUniqueResourceName(key);
xaDataSource.setXaDataSource(ds);
xaDataSource.setPoolSize(5);
xaDataSource.setTestQuery("SELECT 1");
xaDataSource.setMaintenanceInterval(2);

try {
xaDataSource.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
return xaDataSource;
}



}



注册XA事务管理器

在atomikos中,已经针对JTA的事务规范,进行了接口的实现,回顾这里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24


@Bean
public JtaTransactionManager jtaTransactionManager() throws SystemException {
JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();
jtaTransactionManager.setTransactionManager(userTransactionManager());
jtaTransactionManager.setUserTransaction(userTransactionImp());
jtaTransactionManager.setAllowCustomIsolationLevels(true);
return jtaTransactionManager;
}


private UserTransactionManager userTransactionManager(){
UserTransactionManager manager = new UserTransactionManager();
manager.setForceShutdown(false);
return manager;
}
private UserTransactionImp userTransactionImp() throws SystemException {
UserTransactionImp imp = new UserTransactionImp();
imp.setTransactionTimeout(30);
return imp;
}


@Transactional声明式事务

1
2
3
4
5
6
7
8
9
10
11
12

@Transactional(rollbackFor = Exception.class)
public Integer addProperty(Property property){
property.setLeaf(true);
Integer id = add(property);
property.setId(id);
Integer result = addGroupProperty(property);
boolean closeResult = closeParentPropertyLeaf(property.getGroupId(),property.getParentId());
if (result>0&&closeResult) return id;
throw new RuntimeException("add property failed");
}

需要注意的是,在注册JtaTransactionManager之后,@transactional注解会默认使用JtaTransactionManager全局事务,而不再使用传统的本地事务

编程式事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64


public class AtomikosExample {

public static void main(String[] args) {
AtomikosDataSourceBean ds1 = createAtomikosDataSourceBean("test2022");
AtomikosDataSourceBean ds2 = createAtomikosDataSourceBean("test2021");

Connection conn1 = null;
Connection conn2 = null;
PreparedStatement ps1 = null;
PreparedStatement ps2 = null;

UserTransaction userTransaction = new UserTransactionImp();
try {
// 开启事务
userTransaction.begin();

// 执行db1上的sql
conn1 = ds1.getConnection();
ps1 = conn1.prepareStatement("INSERT into user(name) VALUES (?)", Statement.RETURN_GENERATED_KEYS);
ps1.setString(1, "zhangsan");
ps1.executeUpdate();
ResultSet generatedKeys = ps1.getGeneratedKeys();
int userId = -1;
while (generatedKeys.next()) {
// 获得自动生成的userId
userId = generatedKeys.getInt(1);
}

// 模拟异常 ,直接进入catch代码块,2个都不会提交
// int i=1/0;

// 执行db2上的sql
conn2 = ds2.getConnection();
ps2 = conn2.prepareStatement("INSERT into account(user_id,money) VALUES (?,?)");
ps2.setInt(1, userId);
ps2.setDouble(2, 10000000);
ps2.executeUpdate();

// 两阶段提交
userTransaction.commit();
} catch (Exception e) {
try {
System.out.println("有异常,所有数据源都回滚");
userTransaction.rollback();
} catch (SystemException e1) {
e1.printStackTrace();
}
} finally {
try {
ps1.close();
ps2.close();
conn1.close();
conn2.close();
ds1.close();
ds2.close();
} catch (Exception ignore) {
}
}
}
}


Atomikos优点

  • 开源,免费,轻量级
  • 兼容标准的JTA API
  • 保证数据的强一致性

Atomikos缺点

  • 匮乏的日志记录和异常处理
  • 不支持 one-phase 提交优化
  • 对资源的锁定会造成阻塞,效率比较低