DTP模型和XA规范 X/Open Distributed Transaction Processing (DTP) Model (X/Open XA), 是X/Open,即现在的open group,是一个独立的组织,主要负责制定各种行业技术标准,提出的分布式事务处理规范,已经成为事实上的事务模型组件的行为标准。
X/Open主要提供了以下参考文档:
DTP模型 DTP模型的主要流程图如下:
在图上可以清楚的看到构成DTP模型的4个基本元素:
应用程序(Application Program ,简称AP):用于定义事务边界(即定义事务的开始和结束),并且在事务边界内对资源进行操作。
资源管理器(Resource Manager,简称RM):如数据库、文件系统等,并提供访问资源的方式。
事务管理器(Transaction Manager ,简称TM):负责分配事务唯一标识,监控事务的执行进度,并负责事务的提交、回滚等。
通信资源管理器(Communication Resource Manager,简称CRM):控制一个TM域(TM domain)内或者跨TM域的分布式应用之间的通信。
上述元素在多节点的分布式环境下,在DTP本地模型实例中,由AP、RMs和TM组成,不需要其他元素。AP、RM和TM之间,彼此都需要进行交互,如下图所示:
这张图中(1)表示AP-RM的交互接口,(2)表示AP-TM的交互接口,(3)表示RM-TM的交互接口。
XA规范 XA就是DTP模型中的事务管理器(TM)与数据库(RM)之间的接口规范(即接口函数),事务管理器(TM)用它来通知数据库(RM)事务的开始、结束以及提交、回滚等。
XA接口函数由数据库厂商提供。通常情况下,TM与RM通过XA 接口规范,使用二阶段提交协议 来完成一个全局事务,XA规范的基础是二阶段提交协议 (2pc),目前几乎所有的主流数据库都对XA规范提供了支持,例如Oracle、DB2、SQL Server、MySQL、PostgreSQL。
XA的大致流程如下图所示:
但是XA也存在一些缺点:
由于属于强一致性事务规范,所以性能比较低(阻塞、响应时间增加、死锁);
依赖于独立的J2EE中间件,Weblogic、Jboss,后期轻量级的Atomikos、JOTM;
不是所有资源(RM)都支持XA协议;
JTA(Java Transaction API) 即Java的事务API,基于XA实现,也就是RM需要支持XA,所以也有JTA(XA)的说法,JTA仅定义了接口,这些接口如下:
XAResource :XAResource接口是对实现了X/Open CAE规范的资源管理器 (Resource Manager,数据库就是典型的资源管理器) 的抽象,它由资源适配器 (Resource Apdater) 提供实现。XAResource是支持事务控制的核心。
Transaction:Transaction接口是一个事务实例的抽象,通过它可以控制事务内多个资源的提交或者回滚。二阶段提交过程也是由Transaction接口的实现者来完成的。
TransactionManager:托管模式 (managed mode) 下,TransactionManager接口是被应用服务器调用,以控制事务的边界的。
UserTransaction:非托管模式 (non-managed mode) 下,应用程序可以通过UserTransaction接口控制事务的边界
目下JTA的实现有几种形式:
J2EE容器提供的JTA实现(Weblogic、JBoss );
JOTM(Java Open Transaction Manager)、Atomikos、Bitronix等可独立于J2EE容器的环境下第三方中间件实现JTA;
Atomikos介绍 atomikos是一个产品,核心功能是提供事务管理器功能,即xa模型或jta中的TM。
Atomikos公司官方网址为:https://www.atomikos.com/。其旗下最著名的产品就是事务管理器。产品分两个版本:
这两个产品的关系如下图所示:
TransactionEssentials :
实现了JTA/XA规范中的事务管理器(Transaction Manager)应该实现的相关接口 ,比如:
UserTransaction实现是com.atomikos.icatch.jta.UserTransactionImp,用户只需要直接操作这个类
TransactionManager实现是com.atomikos.icatch.jta.UserTransactionManager
Transaction实现是com.atomikos.icatch.jta.TransactionImp
针对实现了JDBC规范中规定的实现了XADataSource接口的数据库连接池,以及实现了JMS规范的MQ客户端提供一层封装。
在JTA规范中,提到过XADataSource、XAConnection等接口应该由资源管理器RM来实现,而Atomikos的作用是一个事务管理器TM,并不需要提供对应的实现。而Atomikos对XADataSource进行封装,只是为了方便与事务管理器整合。封装XADataSource的实现类为AtomikosDataSourceBean。
典型的XADataSource实现包括:
mysql官方提供的com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
阿里巴巴开源的druid连接池,对应的实现类为com.alibaba.druid.pool.xa.DruidXADataSource
PostgreSql官方提供的org.postgresql.xa.PGXADataSource
而其他一些常用的数据库连接池,如dbcp、dbcp2或者c3p0,目前貌似尚未提供XADataSource接口的实现。如果提供给AtomikosDataSourceBean一个没有实现XADataSource接口的数据源,如c3p0的ComboPooledDataSource,则会抛出类似以下异常:
1 2 3 com.atomikos.jdbc.AtomikosSQLException: The class com .mchange .v2 .c3p0 .ComboPooledDataSource specified by property xaDataSourceClassName does not implement the required interface javax .jdbc .XADataSource Please make sure the spelling is correct , and check your JDBC driver vendor documentation .
ExtremeTransactions :
ExtremeTransactions在TransactionEssentials的基础上额外提供了以下功能:
支持TCC柔性事务
支持通过RMI、IIOP、SOAP这些远程过程调用技术,进行事务传播。
Atomikos原理 分布式事务创建 这个和 Spring 传统事务相比来说,Spring 的事务通过的是 JpaTransactionManager 进行管理,而这套逻辑采用的是 JtaTransactionManager 进行管理。通过调用 Atomikos 框架实现相关逻辑。
这个的核心就是 CompositeTransaction 对象,这个对象基本就可以说是一个分布式事务,它是从一个 map 中进行获取,这个 map 的 key 是当前线程,value 就是 CompositeTransaction 对象,如果 map 集合中没有的话,就会进行创建,并将其加入到 map 集合中。这里要注意的是,这个 map 是 hashMap , 所以在获取的时候,加入了 synchronized 机制。
1 2 3 4 5 6 7 8 9 10 11 private CompositeTransactionImp getCurrentTx () { Thread thread = Thread.currentThread(); synchronized (this .threadtotxmap_) { Stack txs = (Stack)this .threadtotxmap_.get(thread); return txs == null ? null : (CompositeTransactionImp)txs.peek(); } }
这样,获取到线程对应的 CompositeTransactionImp 对象,我们的 SQL 都是由这个线程去进行执行的,这个线程对应的 CompositeTransactionImp 也就代表了执行 SQL 所在的全部库,多个库的事务统一由 CompositeTransactionImp 对象进行管理,同时会为这个事务生成一个事务 id。
Atomikos 包装 DataSource 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Bean(name="masterXADataSource") @ConfigurationProperties(prefix="spring.datasource.master") public DruidXADataSource masterXADataSource () { return new DruidXADataSource(); } @Bean(name="slaveXADataSource") @ConfigurationProperties(prefix="spring.datasource.slave") public DruidXADataSource slaveXADataSource () { return new DruidXADataSource(); } @Bean @Primary public DynamicDataSource dynamicDataSource (DruidXADataSource masterXADataSource, DruidXADataSource slaveXADataSource) { DataSource masterDataSource = getDataSource(masterXADataSource,Constants.MASTER_DATA_SOURCE_NAME); DataSource slaveDataSource = getDataSource(slaveXADataSource,Constants.SLAVE_DATA_SOURCE_NAME); return new DynamicDataSource(masterDataSource,slaveDataSource); } private DataSource getDataSource (DruidXADataSource dataSource,String dataSourceKey) { AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean(); atomikosDataSourceBean.setUniqueResourceName(dataSourceKey); atomikosDataSourceBean.setXaDataSource(dataSource); atomikosDataSourceBean.setPoolSize(5 ); atomikosDataSourceBean.setTestQuery("SELECT 1" ); atomikosDataSourceBean.setMaintenanceInterval(2 ); return atomikosDataSourceBean; }
我们通过配置文件设置链接池的各种参数之后,会通过创建 DruidXADataSource 对象,获取 Connection 链接,这里我们将链接交给了 AtomikosDataSourceBean ,由 AtomikosDataSourceBean 进行了一定的包装。
核心类就是 AtomikosDataSourceBean 的父类 AbstractDataSourceBean ,通过 getConnection() 去获取链接对象,主要就是通过 connectionPool 链接池去获取对象,这个链接池是通过 init() 方法进行初始化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public Connection getConnection ( HeuristicMessage msg ) throws SQLException { if ( LOGGER.isInfoEnabled() ) LOGGER.logInfo ( this + ": getConnection ( " + msg + " )..." ); Connection connection = null ; init(); try { connection = (Connection) connectionPool.borrowConnection ( msg ); } catch (CreateConnectionException ex) { throwAtomikosSQLException("Failed to grow the connection pool" , ex); } catch (PoolExhaustedException e) { throwAtomikosSQLException ("Connection pool exhausted - try increasing 'maxPoolSize' and/or 'borrowConnectionTimeout' on the DataSourceBean." ); } catch (ConnectionPoolException e) { throwAtomikosSQLException("Error borrowing connection" , e ); } if ( LOGGER.isDebugEnabled() ) LOGGER.logDebug ( this + ": returning " + connection ); return connection; } protected com.atomikos.datasource.pool.ConnectionFactory doInit () throws Exception { ... ... xaDataSource = (XADataSource) driver; xaDataSource.setLoginTimeout ( getLoginTimeout() ); xaDataSource.setLogWriter ( getLogWriter() ); PropertyUtils.setProperties(xaDataSource, xaProperties ); JdbcTransactionalResource tr = new JdbcTransactionalResource(getUniqueResourceName() , xaDataSource); com.atomikos.datasource.pool.ConnectionFactory cf = new com.atomikos.jdbc.AtomikosXAConnectionFactory(xaDataSource, tr, this ); Configuration.addResource ( tr ); return cf; }
将 DruidXADataSource 创建好链接池之后,就是获取到链接,对链接进行包装了,主要就是创建了一个动态代理对象,AtomikosConnectionProxy 这个对象的 invoke() 方法,就是所有逻辑的实现,这里只是包装,具体逻辑实现,稍后再讲。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public static Reapable newInstance ( Connection c , SessionHandleState sessionHandleState , HeuristicMessage hmsg ) { Reapable ret = null ; AtomikosConnectionProxy proxy = new AtomikosConnectionProxy(c, sessionHandleState , hmsg ); Set<Class> interfaces = PropertyUtils.getAllImplementedInterfaces ( c.getClass() ); interfaces.add ( Reapable.class ); interfaces.add ( DynamicProxy.class ); Class[] interfaceClasses = ( Class[] ) interfaces.toArray ( new Class[0 ] ); Set<Class> minimumSetOfInterfaces = new HashSet<Class>(); minimumSetOfInterfaces.add ( Reapable.class ); minimumSetOfInterfaces.add ( DynamicProxy.class ); minimumSetOfInterfaces.add ( java.sql.Connection.class ); Class[] minimumSetOfInterfaceClasses = ( Class[] ) minimumSetOfInterfaces.toArray( new Class[0 ] ); List<ClassLoader> classLoaders = new ArrayList<ClassLoader>(); classLoaders.add ( Thread.currentThread().getContextClassLoader() ); classLoaders.add ( c.getClass().getClassLoader() ); classLoaders.add ( AtomikosConnectionProxy.class.getClassLoader() ); ret = ( Reapable ) ClassLoadingHelper.newProxyInstance ( classLoaders , minimumSetOfInterfaceClasses , interfaceClasses , proxy ); return ret; }
atomikos封装数据源的流程如下图所示:
Start 指令和 SQL 准备 其实 Mybatis 框架要执行的时候,会通过 mapper 组件,通过链接池获取到链接对象,对 SQL 语句进行执行,在这里的话,就是获取到经过包装之后的 AtomikosConnectionsProxy 动态代理对象,核心的逻辑都是由这个动态代理对象的 invoke() 方法来完成 。
这里也是有 XAConnection ,XAResource 这些概念,通过 之前包装的 ConnectionFactory 获取到 CompositeTransactionImp 对象,这个对象就是全局的事务对象 。
当我们要执行 SQL 的时候,会被动态代理对象拦截,通过Druid 链接池的信息,构建好 XAConnection ,XAResouce 对象,这里 Atomikos 框架对 XAResource 进行了封装(XAResourceTransaction),XAResourceTransaction 这个代表的就是分布式事务的一个子事务,这个库的 SQL 第一次执行的时候,就会创建这样的一个子事务并将其加入到 CompositeTransactionImp 全局事务中,然后通过这个子事务的 XAResource 开始 START 指令。
后续的话,就是会去执行 prepareStatement 命令,这个就是去执行我们要执行的 SQL 语句,这个就是通过调用 jdbc 原生的进行执行。
这样也就完成了 START 指令 和 SQL 语句的准备。
End 指令 end 指令说到底也是通过子事务的 XAResource 对象进行发送的,基本和上面一样,最终就是 xaresource_.end(xid_,flag)
终止分布式事务 这里面逻辑实在是太深了,大体就记一下,主要就还是通过 xaresource.commit 执行 commit , 且后续会通过 全局事务 CompositeTransaction 获取到所有的子事务,对子事务进行遍历,每个子事务都执行 prepare() 方法,返回一个执行结果,到这里为止,2PC 的第一阶段(Prepare)就完事了。
主要就是在终止分布式事务中,执行 prepare 等待返回结果,根据结果判断是执行 commit 还是 rollback
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 protected void terminate ( boolean commit ) throws HeurRollbackException, HeurMixedException, SysException, java.lang.SecurityException, HeurCommitException, HeurHazardException, RollbackException, IllegalStateException { synchronized ( fsm_ ) { if ( commit ) { if ( participants_.size () <= 1 ) { commit ( true ); } else { int prepareResult = prepare (); if ( prepareResult != Participant.READ_ONLY ) commit ( false ); } } else { rollback (); } } }
如果是 commit 的话,就是对分布式中所有的子事务进行遍历,通过 propagator_.submitPropagationMessage ( cm ); 去触发整个 commit 的过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 Enumeration<Participant> enumm = participants.elements (); while ( enumm.hasMoreElements () ) { Participant p = enumm.nextElement (); if ( !readOnlyTable_.containsKey ( p ) ) { CommitMessage cm = new CommitMessage ( p, commitresult, onePhase ); if ( onePhase && cascadeList_ != null ) { Integer sibnum = (Integer) cascadeList_.get ( p ); if ( sibnum != null ) p.setGlobalSiblingCount ( sibnum.intValue () ); p.setCascadeList ( cascadeList_ ); } propagator_.submitPropagationMessage ( cm ); }java }
最后的 commit 还是通过每个 RM 对应的 XAResource 去执行 commit 操作,进行事务提交, xaresource_.commit ( xid_, onePhase );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 try { testOrRefreshXAResourceFor2PC (); ... ... xaresource_.commit ( xid_, onePhase ); } catch ( XAException xaerr ) { String msg = interpretErrorCode ( resourcename_ , "commit" , xid_ , xaerr.errorCode ); LOGGER.logWarning ( msg , xaerr ); }
如果结果不正常的话,是不会走 commit 的,会进入 rollback 逻辑,进行整体所有子库的回滚操作
1 2 3 4 5 6 7 8 9 10 11 12 Enumeration enumm = participants.elements (); while ( enumm.hasMoreElements () ) { Participant p = (Participant) enumm.nextElement (); if ( !readOnlyTable_.containsKey ( p ) ) { RollbackMessage rm = new RollbackMessage ( p, rollbackresult, indoubt ); propagator_.submitPropagationMessage ( rm ); } }
rollback 逻辑和 commit 基本是一致的,也是通过 全局事务获取到全局事务中所有的子事务,对子事务进行遍历,通过 propagator_.submitPropagationMessage ( rm ); 发送回滚执行,最终还是通过子事务对应的 XAResource 去执行 rollback 操作 :xaresource_.rollback ( xid_ );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 try { if ( state_.equals ( TxState.ACTIVE ) ) { suspend (); } testOrRefreshXAResourceFor2PC (); if (LOGGER.isInfoEnabled()){ LOGGER.logInfo("XAResource.rollback ( " + xidToHexString + ")" + "on resource " + resourcename_ + " represented by XAResource instance " + xaresource_); } xaresource_.rollback ( xid_ ); } catch ( ResourceException resErr ) { errors.push ( resErr ); throw new SysException ( "Error in rollback: " + resErr.getMessage(), errors ); }
流程总结
SpringBoot集成Atomikos Maven配置 首先引入atomikos的springboot版本的依赖
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-jta-atomikos</artifactId > </dependency >
atomikos配置文件 Atomikos在启动后,默认会从以下几个位置读取配置文件,这里直接贴出atomikos源码进行说明:
具体的加载逻辑,位于com.atomikos.icatch.provider.imp.AssemblerImp类中的initializeProperties方法,其中定义了配置加载的顺序逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Override public ConfigProperties initializeProperties () { Properties defaults = new Properties(); loadPropertiesFromClasspath(defaults, DEFAULT_PROPERTIES_FILE_NAME); Properties transactionsProperties = new Properties(defaults); loadPropertiesFromClasspath(transactionsProperties, TRANSACTIONS_PROPERTIES_FILE_NAME); Properties jtaProperties = new Properties(transactionsProperties); loadPropertiesFromClasspath(jtaProperties, JTA_PROPERTIES_FILE_NAME); Properties customProperties = new Properties(jtaProperties); loadPropertiesFromCustomFilePath(customProperties); Properties finalProperties = new Properties(customProperties); return new ConfigProperties(finalProperties); }
从源码中,很清楚就可以配置文件加载顺序和优先级:
transactions-defaults.properties < transactions.properties < jta.properties < 自定义配置文件路径
并且优先级高的配置会覆盖之前同名key的配置
其中transactions-defaults.properties是atomikos自带的默认配置,位于transactions-4.0.6.jar中.
以下是4.0.6中 transactions-default.properties中配置内容,归类后如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 =============================================================== ============ 事务管理器(TM)配置参数 ============== =============================================================== com.atomikos.icatch.enable_logging =true com.atomikos.icatch.automatic_resource_registration =true com.atomikos.icatch.default_jta_timeout =10000 com.atomikos.icatch.max_timeout =300000 com.atomikos.icatch.threaded_2pc =false com.atomikos.icatch.max_actives =50 com.atomikos.icatch.allow_subtransactions =true com.atomikos.icatch.serial_jta_transactions =true com.atomikos.icatch.force_shutdown_on_vm_exit =false com.atomikos.icatch.default_max_wait_time_on_shutdown =9223372036854775807 =============================================================== ========= 事务日志(Transaction logs)记录配置 ======= =============================================================== com.atomikos.icatch.log_base_dir =./ com.atomikos.icatch.log_base_name =tmlog com.atomikos.icatch.checkpoint_interval =500 =============================================================== ========= 事务日志恢复(Recovery)配置 ============= =============================================================== com.atomikos.icatch.forget_orphaned_log_entries_delay =86400000 com.atomikos.icatch.recovery_delay =${com.atomikos.icatch.default_jta_timeout} com.atomikos.icatch.oltp_max_retries =5 com.atomikos.icatch.oltp_retry_interval =10000 =============================================================== ===================== 其他 ====================== =============================================================== java.naming.factory.initial =com.sun.jndi.rmi.registry.RegistryContextFactory com.atomikos.icatch.client_demarcation =false java.naming.provider.url =rmi://localhost:1099 com.atomikos.icatch.rmi_export_class =none com.atomikos.icatch.trust_client_tm =false
当我们想对默认的配置进行修改时,可以在classpath下新建一个jta.properties,覆盖同名的配置项即可。
关于不同版本配置的差异,请参考官方文档
配置数据源 由于项目使用了mybatis以及多数据源,并且会有动态切换数据源,所以使用了自定义的DynamicDataSource和继承了SpringManagedTransactionFactory的事务工厂,以及实现了org.apache.ibatis.transaction.Transaction的自定义事务类,通过aop的方式,来动态切换数据源和事务
初始化的主数据源配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 @Configuration public class DataSourceConfig { @Bean(name="masterXADataSource") @ConfigurationProperties(prefix="spring.datasource.master") public DruidXADataSource masterXADataSource () { return new DruidXADataSource(); } @Bean(name="slaveXADataSource") @ConfigurationProperties(prefix="spring.datasource.slave") public DruidXADataSource slaveXADataSource () { return new DruidXADataSource(); } @Bean @Primary public DynamicDataSource dynamicDataSource (DruidXADataSource masterXADataSource, DruidXADataSource slaveXADataSource) { DataSource masterDataSource = getDataSource(masterXADataSource,Constants.MASTER_DATA_SOURCE_NAME); DataSource slaveDataSource = getDataSource(slaveXADataSource,Constants.SLAVE_DATA_SOURCE_NAME); return new DynamicDataSource(masterDataSource,slaveDataSource); } private DataSource getDataSource (DruidXADataSource dataSource,String dataSourceKey) { AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean(); atomikosDataSourceBean.setUniqueResourceName(dataSourceKey); atomikosDataSourceBean.setXaDataSource(dataSource); atomikosDataSourceBean.setPoolSize(5 ); atomikosDataSourceBean.setTestQuery("SELECT 1" ); atomikosDataSourceBean.setMaintenanceInterval(2 ); return atomikosDataSourceBean; } @Bean public SqlSessionFactory sqlSessionFactory (DynamicDataSource dynamicDataSource) throws Exception { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dynamicDataSource); sqlSessionFactoryBean.setVfs(SpringBootVFS.class); sqlSessionFactoryBean.setConfigLocation(new PathMatchingResourcePatternResolver().getResource("classpath:mybatis-config.xml" )); sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*.xml" )); sqlSessionFactoryBean.setTransactionFactory(new DynamicDataSourceTransactionFactory()); return sqlSessionFactoryBean.getObject(); } @Bean public SqlSessionTemplate sqlSessionTemplate (SqlSessionFactory sqlSessionFactory) { return new SqlSessionTemplate(sqlSessionFactory); } }
非第一时间加载的动态多数据源配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 @Component public class GlobalConfigHolder { @Resource private DataBaseInfoService dataBaseInfoService; @Resource private DynamicDataSource dynamicDataSource; private static final Logger logger = LoggerFactory.getLogger(GlobalConfigHolder.class); private final Map<Integer,String> groupMasterDataSourceMap = new HashMap<>(); private final Map<Integer,String> groupSlaveDataSourceMap = new HashMap<>(); @Bean("groupMasterDataSourceMap") public Map<Integer,String> getGroupMasterDataSourceMap () { return groupMasterDataSourceMap; } @Bean("groupSlaveDataSourceMap") public Map<Integer,String> getGroupSlaveDataSourceMap () { return groupSlaveDataSourceMap; } @PostConstruct private void init () { loadDatabaseInfos(); } private void loadDatabaseInfos () { logger.info("开始加载动态数据源!" ); List<DatabaseInfo> dbList = dataBaseInfoService.getDataBaseInfoList(); dbList.forEach(this ::processDatabaseInfo); logger.info("动态数据源已加载完毕!" ); } private void processDatabaseInfo (DatabaseInfo db) { processDynamicDataSourceMap(db); processGroupDataSourceMapByDb(db); } private void processDynamicDataSourceMap (DatabaseInfo db) { DruidXADataSource xaDs = buildDruidDataSource(db); DataSource ds = getXADataSource(xaDs,db.getName()); dynamicDataSource.addDataSource(db.getName(),ds); } private void processGroupDataSourceMapByDb (DatabaseInfo db) { Integer[] groupIds = db.getGroupIds(); if (groupIds==null || groupIds.length==0 ) return ; if (db.getMaster()) Arrays.stream(groupIds).forEach(groupId->groupMasterDataSourceMap.put(groupId,db.getName())); if (!db.getMaster()) Arrays.stream(groupIds).forEach(groupId->groupSlaveDataSourceMap.put(groupId,db.getName())); } private DruidXADataSource buildDruidDataSource (DatabaseInfo db) { String url = "jdbc:postgresql://" + db.getHost() + ":" + db.getPort() + "/" + db.getDatabaseName() + "?zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8" ; DruidXADataSource ds = DataSourceBuilder.create() .type(com.alibaba.druid.pool.xa.DruidXADataSource.class) .driverClassName(Constants.PG_DRIVER_NAME) .url(url) .username(db.getUsername()) .password(db.getPassword()) .build(); ds.setInitialSize(10 ); ds.setMinIdle(10 ); ds.setMaxActive(100 ); ds.setMaxWait(30000 ); ds.setTimeBetweenEvictionRunsMillis(30000 ); ds.setMinEvictableIdleTimeMillis(300000 ); ds.setValidationQuery("SELECT 1" ); ds.setTestWhileIdle(true ); ds.setTestOnBorrow(false ); ds.setTestOnReturn(true ); ds.setPoolPreparedStatements(false ); ds.setMaxPoolPreparedStatementPerConnectionSize(0 ); return ds; } private DataSource getXADataSource (DruidXADataSource ds , String key) { AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); xaDataSource.setUniqueResourceName(key); xaDataSource.setXaDataSource(ds); xaDataSource.setPoolSize(5 ); xaDataSource.setTestQuery("SELECT 1" ); xaDataSource.setMaintenanceInterval(2 ); try { xaDataSource.afterPropertiesSet(); } catch (Exception e) { throw new RuntimeException(e); } return xaDataSource; } }
注册XA事务管理器 在atomikos中,已经针对JTA的事务规范,进行了接口的实现,回顾这里
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Bean public JtaTransactionManager jtaTransactionManager () throws SystemException { JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(); jtaTransactionManager.setTransactionManager(userTransactionManager()); jtaTransactionManager.setUserTransaction(userTransactionImp()); jtaTransactionManager.setAllowCustomIsolationLevels(true ); return jtaTransactionManager; } private UserTransactionManager userTransactionManager () { UserTransactionManager manager = new UserTransactionManager(); manager.setForceShutdown(false ); return manager; } private UserTransactionImp userTransactionImp () throws SystemException { UserTransactionImp imp = new UserTransactionImp(); imp.setTransactionTimeout(30 ); return imp; }
@Transactional声明式事务 1 2 3 4 5 6 7 8 9 10 11 12 @Transactional(rollbackFor = Exception.class) public Integer addProperty (Property property) { property.setLeaf(true ); Integer id = add(property); property.setId(id); Integer result = addGroupProperty(property); boolean closeResult = closeParentPropertyLeaf(property.getGroupId(),property.getParentId()); if (result>0 &&closeResult) return id; throw new RuntimeException("add property failed" ); }
需要注意的是,在注册JtaTransactionManager之后,@transactional注解会默认使用JtaTransactionManager全局事务,而不再使用传统的本地事务
编程式事务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public class AtomikosExample { public static void main (String[] args) { AtomikosDataSourceBean ds1 = createAtomikosDataSourceBean("test2022" ); AtomikosDataSourceBean ds2 = createAtomikosDataSourceBean("test2021" ); Connection conn1 = null ; Connection conn2 = null ; PreparedStatement ps1 = null ; PreparedStatement ps2 = null ; UserTransaction userTransaction = new UserTransactionImp(); try { userTransaction.begin(); conn1 = ds1.getConnection(); ps1 = conn1.prepareStatement("INSERT into user(name) VALUES (?)" , Statement.RETURN_GENERATED_KEYS); ps1.setString(1 , "zhangsan" ); ps1.executeUpdate(); ResultSet generatedKeys = ps1.getGeneratedKeys(); int userId = -1 ; while (generatedKeys.next()) { userId = generatedKeys.getInt(1 ); } conn2 = ds2.getConnection(); ps2 = conn2.prepareStatement("INSERT into account(user_id,money) VALUES (?,?)" ); ps2.setInt(1 , userId); ps2.setDouble(2 , 10000000 ); ps2.executeUpdate(); userTransaction.commit(); } catch (Exception e) { try { System.out.println("有异常,所有数据源都回滚" ); userTransaction.rollback(); } catch (SystemException e1) { e1.printStackTrace(); } } finally { try { ps1.close(); ps2.close(); conn1.close(); conn2.close(); ds1.close(); ds2.close(); } catch (Exception ignore) { } } } }
Atomikos优点
开源,免费,轻量级
兼容标准的JTA API
保证数据的强一致性
Atomikos缺点
匮乏的日志记录和异常处理
不支持 one-phase 提交优化
对资源的锁定会造成阻塞,效率比较低