Atomikos是开源的分布式事务管理器,是JTA规范的实现,支持XA协议。现在要将Atomikos整合进springboot。
Atomikos依赖
在pom.xml添加atomikos依赖。
- <!– spring jdbc –>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-jdbc</artifactId>
- </dependency>
- <!– JTA –>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-jta-atomikos</artifactId>
- </dependency>
添加配置
修改application.properties添加多个数据源配置
- # 主数据源
- mysql.datasource.one.url = jdbc:mysql://127.0.0.1:3306/wordpress?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
- mysql.datasource.one.username =root
- mysql.datasource.one.password =123456
- mysql.datasource.one.minPoolSize =3
- mysql.datasource.one.maxPoolSize =25
- mysql.datasource.one.maxLifetime =20000
- mysql.datasource.one.borrowConnectionTimeout =30
- mysql.datasource.one.loginTimeout =30
- mysql.datasource.one.maintenanceInterval =60
- mysql.datasource.one.maxIdleTime =60
- mysql.datasource.one.testQuery =select 1
- # 数据源 2
- mysql.datasource.two.url =jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
- mysql.datasource.two.username =root
- mysql.datasource.two.password =123456
- mysql.datasource.two.minPoolSize =3
- mysql.datasource.two.maxPoolSize =25
- mysql.datasource.two.maxLifetime =20000
- mysql.datasource.two.borrowConnectionTimeout =30
- mysql.datasource.two.loginTimeout =30
- mysql.datasource.two.maintenanceInterval =60
- mysql.datasource.two.maxIdleTime =60
- mysql.datasource.two.testQuery =select 1
加载配置类
这里定义了两个数据源
定义主数据源配置类,加载参数
- /**
- *
- * @ClassName: OneDbProperties
- * @Description: 主数据源配置
- * @author wenqy
- * @date 2017年7月26日 下午9:56:09
- *
- */
- @ConfigurationProperties(prefix = “mysql.datasource.one”)
- public class OneDbProperties {
- private String url;
- private String username;
- private String password;
- /** min-pool-size 最小连接数 **/
- private int minPoolSize;
- /** max-pool-size 最大连接数 **/
- private int maxPoolSize;
- /** max-lifetime 连接最大存活时间 **/
- private int maxLifetime;
- /** borrow-connection-timeout 获取连接失败重新获等待最大时间,在这个时间内如果有可用连接,将返回 **/
- private int borrowConnectionTimeout;
- /** login-timeout java数据库连接池,最大可等待获取datasouce的时间 **/
- private int loginTimeout;
- /** maintenance-interval 连接回收时间 **/
- private int maintenanceInterval;
- /** max-idle-time 最大闲置时间,超过最小连接池连接的连接将将关闭 **/
- private int maxIdleTime;
- /** test-query 测试SQL **/
- private String testQuery;
- // 省略getter 和 setter 方法
- }
定义其他数据源配置类,加载参数
- /**
- *
- * @ClassName: OneDbProperties
- * @Description: 数据源配置
- * @author wenqy
- * @date 2017年7月26日 下午9:56:09
- *
- */
- @ConfigurationProperties(prefix = “mysql.datasource.two”)
- public class TwoDbProperties {
- private String url;
- private String username;
- private String password;
- /** min-pool-size 最小连接数 **/
- private int minPoolSize;
- /** max-pool-size 最大连接数 **/
- private int maxPoolSize;
- /** max-lifetime 连接最大存活时间 **/
- private int maxLifetime;
- /** borrow-connection-timeout 获取连接失败重新获等待最大时间,在这个时间内如果有可用连接,将返回 **/
- private int borrowConnectionTimeout;
- /** login-timeout java数据库连接池,最大可等待获取datasouce的时间 **/
- private int loginTimeout;
- /** maintenance-interval 连接回收时间 **/
- private int maintenanceInterval;
- /** max-idle-time 最大闲置时间,超过最小连接池连接的连接将将关闭 **/
- private int maxIdleTime;
- /** test-query 测试SQL **/
- private String testQuery;
- // 省略getter 和 setter 方法
- }
主数据源加载, 注入Atomikos的数据源Bean
- @Configuration
- @MapperScan(basePackages = “com.wenqy.mapper.one”, sqlSessionTemplateRef = “oneSqlSessionTemplate”)
- public class OneDbConfig {
- /**
- * @throws SQLException
- * @Title: oneDataSource
- * @Description: 主数据源
- * @param @param oneDbProperties
- * @return DataSource 数据源
- * @throws
- */
- @Primary
- @Bean(name = “oneDataSource”)
- public DataSource oneDataSource(OneDbProperties oneDbProperties) throws SQLException {
- MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
- mysqlXADataSource.setUrl(oneDbProperties.getUrl());
- mysqlXADataSource.setUser(oneDbProperties.getUsername());
- mysqlXADataSource.setPassword(oneDbProperties.getPassword());
- AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
- xaDataSource.setXaDataSource(mysqlXADataSource);
- xaDataSource.setUniqueResourceName(“oneDataSource”);
- xaDataSource.setMinPoolSize(oneDbProperties.getMinPoolSize());
- xaDataSource.setMaxPoolSize(oneDbProperties.getMaxPoolSize());
- xaDataSource.setMaxLifetime(oneDbProperties.getMaxLifetime());
- xaDataSource.setBorrowConnectionTimeout(oneDbProperties.getBorrowConnectionTimeout());
- xaDataSource.setLoginTimeout(oneDbProperties.getLoginTimeout());
- xaDataSource.setMaintenanceInterval(oneDbProperties.getMaintenanceInterval());
- xaDataSource.setMaxIdleTime(oneDbProperties.getMaxIdleTime());
- xaDataSource.setTestQuery(oneDbProperties.getTestQuery());
- return xaDataSource;
- }
- @Bean(name = “oneSqlSessionFactory”)
- public SqlSessionFactory oneSqlSessionFactory(@Qualifier(“oneDataSource”) DataSource dataSource)
- throws Exception {
- SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
- bean.setDataSource(dataSource);
- bean.setMapperLocations(
- new PathMatchingResourcePatternResolver().getResources(“classpath:/mapper/config/one/*.xml”));
- return bean.getObject();
- }
- @Bean(name = “oneSqlSessionTemplate”)
- public SqlSessionTemplate oneSqlSessionTemplate(
- @Qualifier(“oneSqlSessionFactory”) SqlSessionFactory sqlSessionFactory) throws Exception {
- return new SqlSessionTemplate(sqlSessionFactory);
- }
- }
其他数据源加载,注入Atomikos的数据源Bean
- @Configuration
- @MapperScan(basePackages = “com.wenqy.mapper.two”, sqlSessionTemplateRef = “twoSqlSessionTemplate”)
- public class TwoDbConfig {
- @Bean(name = “twoDataSource”)
- public DataSource twoDataSource(TwoDbProperties twoDbProperties) throws SQLException {
- MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
- mysqlXaDataSource.setUrl(twoDbProperties.getUrl());
- mysqlXaDataSource.setPassword(twoDbProperties.getPassword());
- mysqlXaDataSource.setUser(twoDbProperties.getUsername());
- mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
- AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
- xaDataSource.setXaDataSource(mysqlXaDataSource);
- xaDataSource.setUniqueResourceName(“twoDataSource”);
- xaDataSource.setMinPoolSize(twoDbProperties.getMinPoolSize());
- xaDataSource.setMaxPoolSize(twoDbProperties.getMaxPoolSize());
- xaDataSource.setMaxLifetime(twoDbProperties.getMaxLifetime());
- xaDataSource.setBorrowConnectionTimeout(twoDbProperties.getBorrowConnectionTimeout());
- xaDataSource.setLoginTimeout(twoDbProperties.getLoginTimeout());
- xaDataSource.setMaintenanceInterval(twoDbProperties.getMaintenanceInterval());
- xaDataSource.setMaxIdleTime(twoDbProperties.getMaxIdleTime());
- xaDataSource.setTestQuery(twoDbProperties.getTestQuery());
- return xaDataSource;
- }
- @Bean(name = “twoSqlSessionFactory”)
- public SqlSessionFactory twoSqlSessionFactory(@Qualifier(“twoDataSource”) DataSource dataSource)
- throws Exception {
- SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
- bean.setDataSource(dataSource);
- bean.setMapperLocations(
- new PathMatchingResourcePatternResolver().getResources(“classpath:/mapper/config/two/*.xml”));
- return bean.getObject();
- }
- @Bean(name = “twoSqlSessionTemplate”)
- public SqlSessionTemplate twoSqlSessionTemplate(
- @Qualifier(“twoSqlSessionFactory”) SqlSessionFactory sqlSessionFactory) throws Exception {
- return new SqlSessionTemplate(sqlSessionFactory);
- }
- }
事务配置
注入Atomikos自己的事务管理器
- /**
- *
- * @ClassName: TransactionManagerConfig
- * @Description: 事务管理配置
- * @author
- *
- */
- @Configuration
- @EnableTransactionManagement
- public class TransactionManagerConfig {
- /**
- * 自定义事务
- * MyBatis自动参与到spring事务管理中,无需额外配置,
- * 只要org.mybatis.spring.SqlSessionFactoryBean
- * 引用的数据源与DataSourceTransactionManager引用的数据源一致即可,否则事务管理会不起作用。
- */
- @Bean(name = “userTransaction”)
- public UserTransaction userTransaction() throws Throwable {
- UserTransactionImp userTransactionImp = new UserTransactionImp();
- userTransactionImp.setTransactionTimeout(10000);
- return userTransactionImp;
- }
- @Bean(name = “atomikosTransactionManager”, initMethod = “init”, destroyMethod = “close”)
- public TransactionManager atomikosTransactionManager() throws Throwable {
- UserTransactionManager userTransactionManager = new UserTransactionManager();
- userTransactionManager.setForceShutdown(false);
- return userTransactionManager;
- }
- @Bean(name = “transactionManager”)
- @DependsOn({ “userTransaction”, “atomikosTransactionManager” })
- public PlatformTransactionManager transactionManager() throws Throwable {
- UserTransaction userTransaction = userTransaction();
- JtaTransactionManager manager = new JtaTransactionManager(userTransaction, atomikosTransactionManager());
- return manager;
- }
- }
单元测试
编写测试类,测试服务。
- @RunWith(SpringJUnit4ClassRunner.class)
- @SpringBootTest
- public class TestMutiDataSource {
- @Autowired
- private UserService userService;
- @Test
- public void testInsertData() throws Exception {
- userService.insertData();
- }
- }
Service实现类,注释了要抛出异常错误
- @Transactional(value=“transactionManager”)
- public void insertData() throws Exception {
- User user = new User();
- user.setId(new Random().nextInt());
- user.setName(“wenqy”);
- userMapper.saveUser( user );
- // if (true) {
- // throw new RuntimeException(“insert failure”);
- // }
- Person person = new Person();
- person.setId(new Random().nextInt());
- person.setEmail(“wen.qy@qq.com”);
- personMapper.savePerson( person );
- // if (true) {
- // throw new RuntimeException(“insert failure”);
- // }
- }
结果两表都插入数据
去掉第一个注释块,抛出运行时异常,执行测试,两表没插入数据,正确。恢复第一个注释块,去掉第一个注释块,抛出运行时异常,执行测试,两表没插入数据,正确。更换异常为检查异常,如,SQLException时,发现,两表都插入数据了,没有发生回滚,错误。
- @Transactional(value=“transactionManager”)
- public void insertData() throws Exception {
- User user = new User();
- user.setId(new Random().nextInt());
- user.setName(“wenqy”);
- userMapper.saveUser( user );
- // if (true) {
- // throw new RuntimeException(“insert failure”);
- // }
- Person person = new Person();
- person.setId(new Random().nextInt());
- person.setEmail(“wen.qy@qq.com”);
- personMapper.savePerson( person );
- if (true) {
- throw new SQLException(“insert failure”);
- }
- }
指定事务需要回滚所抛出的异常,这里是Exception
- @Transactional(value=“transactionManager”,rollbackFor=Exception.class)
- public void insertDataRollbackForException() throws Exception {
- User user = new User();
- user.setId(new Random().nextInt());
- user.setName(“wenqy”);
- userMapper.saveUser( user );
- // if (true) {
- // throw new RuntimeException(“insert failure”);
- // }
- Person person = new Person();
- person.setId(new Random().nextInt());
- person.setEmail(“wen.qy@qq.com”);
- personMapper.savePerson( person );
- if (true) {
- throw new SQLException(“insert failure”);
- }
- }
然后继续测试,执行测试,两表没插入数据,说明事务回滚正确
结论
默认配置下,Spring只有在抛出运行时异常,即RuntimeException及其子类(Errors也会导致事务回滚)时,才回滚该事务。Atomikos分布式事务和Spring声明式事务一样,对业务透明,非侵入式的。Atomikos支持与Spring的无缝衔接。而Spring默认的事务隔离级别是数据库默认事务。需要数据库支持事务,如,MySQL不能使用MYISAM引擎。
参考:
atomikos官网:https://www.atomikos.com/
发表评论