spring boot学习系列之整合Atomikos多数据源和分布式事务5


Atomikos是开源的分布式事务管理器,是JTA规范的实现,支持XA协议。现在要将Atomikos整合进springboot。

Atomikos依赖

在pom.xml添加atomikos依赖。

  1. <!– spring jdbc –>
  2. <dependency>
  3.   <groupId>org.springframework.boot</groupId>
  4.   <artifactId>spring-boot-starter-jdbc</artifactId>
  5. </dependency>
  6. <!– JTA –>
  7. <dependency>
  8.   <groupId>org.springframework.boot</groupId>
  9.   <artifactId>spring-boot-starter-jta-atomikos</artifactId>
  10. </dependency>

添加配置

修改application.properties添加多个数据源配置

  1. # 主数据源
  2. mysql.datasource.one.url = jdbc:mysql://127.0.0.1:3306/wordpress?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
  3. mysql.datasource.one.username =root
  4. mysql.datasource.one.password =123456
  5. mysql.datasource.one.minPoolSize =3
  6. mysql.datasource.one.maxPoolSize =25
  7. mysql.datasource.one.maxLifetime =20000
  8. mysql.datasource.one.borrowConnectionTimeout =30
  9. mysql.datasource.one.loginTimeout =30
  10. mysql.datasource.one.maintenanceInterval =60
  11. mysql.datasource.one.maxIdleTime =60
  12. mysql.datasource.one.testQuery =select 1
  13. # 数据源 2
  14. mysql.datasource.two.url =jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
  15. mysql.datasource.two.username =root
  16. mysql.datasource.two.password =123456
  17. mysql.datasource.two.minPoolSize =3
  18. mysql.datasource.two.maxPoolSize =25
  19. mysql.datasource.two.maxLifetime =20000
  20. mysql.datasource.two.borrowConnectionTimeout =30
  21. mysql.datasource.two.loginTimeout =30
  22. mysql.datasource.two.maintenanceInterval =60
  23. mysql.datasource.two.maxIdleTime =60
  24. mysql.datasource.two.testQuery =select 1

加载配置类

这里定义了两个数据源

定义主数据源配置类,加载参数

  1. /**
  2.  * 
  3. * @ClassName: OneDbProperties
  4. * @Description: 主数据源配置
  5. * @author wenqy
  6. * @date 2017年7月26日 下午9:56:09
  7. *
  8.  */
  9. @ConfigurationProperties(prefix = “mysql.datasource.one”)
  10. public class OneDbProperties {
  11.     private String url;
  12.     private String username;
  13.     private String password;
  14.     /** min-pool-size 最小连接数 **/
  15.     private int minPoolSize;
  16.     /** max-pool-size 最大连接数 **/
  17.     private int maxPoolSize;
  18.     /** max-lifetime 连接最大存活时间 **/
  19.     private int maxLifetime;
  20.     /** borrow-connection-timeout 获取连接失败重新获等待最大时间,在这个时间内如果有可用连接,将返回 **/
  21.     private int borrowConnectionTimeout;
  22.     /** login-timeout java数据库连接池,最大可等待获取datasouce的时间 **/
  23.     private int loginTimeout;
  24.     /** maintenance-interval 连接回收时间 **/
  25.     private int maintenanceInterval;
  26.     /** max-idle-time 最大闲置时间,超过最小连接池连接的连接将将关闭 **/
  27.     private int maxIdleTime;
  28.     /** test-query 测试SQL **/
  29.     private String testQuery;
  30. // 省略getter 和 setter 方法
  31. }

定义其他数据源配置类,加载参数

  1. /**
  2.  * 
  3. * @ClassName: OneDbProperties
  4. * @Description: 数据源配置
  5. * @author wenqy
  6. * @date 2017年7月26日 下午9:56:09
  7. *
  8.  */
  9. @ConfigurationProperties(prefix = “mysql.datasource.two”)
  10. public class TwoDbProperties {
  11.     private String url;
  12.     private String username;
  13.     private String password;
  14.     /** min-pool-size 最小连接数 **/
  15.     private int minPoolSize;
  16.     /** max-pool-size 最大连接数 **/
  17.     private int maxPoolSize;
  18.     /** max-lifetime 连接最大存活时间 **/
  19.     private int maxLifetime;
  20.     /** borrow-connection-timeout 获取连接失败重新获等待最大时间,在这个时间内如果有可用连接,将返回 **/
  21.     private int borrowConnectionTimeout;
  22.     /** login-timeout java数据库连接池,最大可等待获取datasouce的时间 **/
  23.     private int loginTimeout;
  24.     /** maintenance-interval 连接回收时间 **/
  25.     private int maintenanceInterval;
  26.     /** max-idle-time 最大闲置时间,超过最小连接池连接的连接将将关闭 **/
  27.     private int maxIdleTime;
  28.     /** test-query 测试SQL **/
  29.     private String testQuery;
  30. // 省略getter 和 setter 方法
  31. }

主数据源加载, 注入Atomikos的数据源Bean

  1. @Configuration
  2. @MapperScan(basePackages = “com.wenqy.mapper.one”, sqlSessionTemplateRef = “oneSqlSessionTemplate”)
  3. public class OneDbConfig {
  4.     /**
  5.      * @throws SQLException 
  6.     * @Title: oneDataSource 
  7.     * @Description: 主数据源 
  8.     * @param @param oneDbProperties
  9.     * @return DataSource    数据源
  10.     * @throws
  11.      */
  12.     @Primary
  13.     @Bean(name = “oneDataSource”)
  14.     public DataSource oneDataSource(OneDbProperties oneDbProperties) throws SQLException {
  15.         MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
  16.         mysqlXADataSource.setUrl(oneDbProperties.getUrl());
  17.         mysqlXADataSource.setUser(oneDbProperties.getUsername());
  18.         mysqlXADataSource.setPassword(oneDbProperties.getPassword());
  19.         AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
  20.         xaDataSource.setXaDataSource(mysqlXADataSource);
  21.         xaDataSource.setUniqueResourceName(“oneDataSource”);
  22.         xaDataSource.setMinPoolSize(oneDbProperties.getMinPoolSize());
  23.         xaDataSource.setMaxPoolSize(oneDbProperties.getMaxPoolSize());
  24.         xaDataSource.setMaxLifetime(oneDbProperties.getMaxLifetime());
  25.         xaDataSource.setBorrowConnectionTimeout(oneDbProperties.getBorrowConnectionTimeout());
  26.         xaDataSource.setLoginTimeout(oneDbProperties.getLoginTimeout());
  27.         xaDataSource.setMaintenanceInterval(oneDbProperties.getMaintenanceInterval());
  28.         xaDataSource.setMaxIdleTime(oneDbProperties.getMaxIdleTime());
  29.         xaDataSource.setTestQuery(oneDbProperties.getTestQuery());
  30.         return xaDataSource;
  31.     }
  32.     @Bean(name = “oneSqlSessionFactory”)
  33.     public SqlSessionFactory oneSqlSessionFactory(@Qualifier(“oneDataSource”) DataSource dataSource)
  34.             throws Exception {
  35.         SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
  36.         bean.setDataSource(dataSource);
  37.         bean.setMapperLocations(
  38.                 new PathMatchingResourcePatternResolver().getResources(“classpath:/mapper/config/one/*.xml”));
  39.         return bean.getObject();
  40.     }
  41.     @Bean(name = “oneSqlSessionTemplate”)
  42.     public SqlSessionTemplate oneSqlSessionTemplate(
  43.             @Qualifier(“oneSqlSessionFactory”) SqlSessionFactory sqlSessionFactory) throws Exception {
  44.         return new SqlSessionTemplate(sqlSessionFactory);
  45.     }
  46. }

其他数据源加载,注入Atomikos的数据源Bean

  1. @Configuration
  2. @MapperScan(basePackages = “com.wenqy.mapper.two”, sqlSessionTemplateRef = “twoSqlSessionTemplate”)
  3. public class TwoDbConfig {
  4.     @Bean(name = “twoDataSource”)
  5.     public DataSource twoDataSource(TwoDbProperties twoDbProperties) throws SQLException {
  6.         MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
  7.         mysqlXaDataSource.setUrl(twoDbProperties.getUrl());
  8.         mysqlXaDataSource.setPassword(twoDbProperties.getPassword());
  9.         mysqlXaDataSource.setUser(twoDbProperties.getUsername());
  10.         mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
  11.         AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
  12.         xaDataSource.setXaDataSource(mysqlXaDataSource);
  13.         xaDataSource.setUniqueResourceName(“twoDataSource”);
  14.         xaDataSource.setMinPoolSize(twoDbProperties.getMinPoolSize());
  15.         xaDataSource.setMaxPoolSize(twoDbProperties.getMaxPoolSize());
  16.         xaDataSource.setMaxLifetime(twoDbProperties.getMaxLifetime());
  17.         xaDataSource.setBorrowConnectionTimeout(twoDbProperties.getBorrowConnectionTimeout());
  18.         xaDataSource.setLoginTimeout(twoDbProperties.getLoginTimeout());
  19.         xaDataSource.setMaintenanceInterval(twoDbProperties.getMaintenanceInterval());
  20.         xaDataSource.setMaxIdleTime(twoDbProperties.getMaxIdleTime());
  21.         xaDataSource.setTestQuery(twoDbProperties.getTestQuery());
  22.         return xaDataSource;
  23.     }
  24.     @Bean(name = “twoSqlSessionFactory”)
  25.     public SqlSessionFactory twoSqlSessionFactory(@Qualifier(“twoDataSource”) DataSource dataSource)
  26.             throws Exception {
  27.         SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
  28.         bean.setDataSource(dataSource);
  29.         bean.setMapperLocations(
  30.                 new PathMatchingResourcePatternResolver().getResources(“classpath:/mapper/config/two/*.xml”));
  31.         return bean.getObject();
  32.     }
  33.     @Bean(name = “twoSqlSessionTemplate”)
  34.     public SqlSessionTemplate twoSqlSessionTemplate(
  35.             @Qualifier(“twoSqlSessionFactory”) SqlSessionFactory sqlSessionFactory) throws Exception {
  36.         return new SqlSessionTemplate(sqlSessionFactory);
  37.     }
  38. }

事务配置

注入Atomikos自己的事务管理器

  1. /**
  2.  * 
  3. * @ClassName: TransactionManagerConfig
  4. * @Description: 事务管理配置
  5. * @author
  6. *
  7.  */
  8. @Configuration
  9. @EnableTransactionManagement
  10. public class TransactionManagerConfig {
  11.     /** 
  12.      * 自定义事务 
  13.      * MyBatis自动参与到spring事务管理中,无需额外配置,
  14.      * 只要org.mybatis.spring.SqlSessionFactoryBean
  15.      * 引用的数据源与DataSourceTransactionManager引用的数据源一致即可,否则事务管理会不起作用。
  16.      */
  17.     @Bean(name = “userTransaction”)
  18.     public UserTransaction userTransaction() throws Throwable {
  19.         UserTransactionImp userTransactionImp = new UserTransactionImp();
  20.         userTransactionImp.setTransactionTimeout(10000);
  21.         return userTransactionImp;
  22.     }
  23.     @Bean(name = “atomikosTransactionManager”, initMethod = “init”, destroyMethod = “close”)
  24.     public TransactionManager atomikosTransactionManager() throws Throwable {
  25.         UserTransactionManager userTransactionManager = new UserTransactionManager();
  26.         userTransactionManager.setForceShutdown(false);
  27.         return userTransactionManager;
  28.     }
  29.     @Bean(name = “transactionManager”)
  30.     @DependsOn({ “userTransaction”“atomikosTransactionManager” })
  31.     public PlatformTransactionManager transactionManager() throws Throwable {
  32.         UserTransaction userTransaction = userTransaction();
  33.         JtaTransactionManager manager = new JtaTransactionManager(userTransaction, atomikosTransactionManager());
  34.         return manager;
  35.     }
  36. }

单元测试

编写测试类,测试服务。

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @SpringBootTest
  3. public class TestMutiDataSource {
  4.     @Autowired
  5.     private UserService userService;
  6.     @Test
  7.     public void testInsertData() throws Exception {
  8.         userService.insertData();
  9.     }
  10. }

Service实现类,注释了要抛出异常错误

  1. @Transactional(value=“transactionManager”)
  2.     public void insertData() throws Exception {
  3.         User user = new User();
  4.         user.setId(new Random().nextInt());
  5.         user.setName(“wenqy”);
  6.         userMapper.saveUser( user );
  7. //      if (true) {
  8. //          throw new RuntimeException(“insert failure”);
  9. //      }
  10.         Person person = new Person();
  11.         person.setId(new Random().nextInt());
  12.         person.setEmail(“wen.qy@qq.com”);
  13.         personMapper.savePerson( person );
  14. //      if (true) {
  15. //          throw new RuntimeException(“insert failure”);
  16. //      }
  17.     }

结果两表都插入数据

multidatasource_atomikos_correct

去掉第一个注释块,抛出运行时异常,执行测试,两表没插入数据,正确。恢复第一个注释块,去掉第一个注释块,抛出运行时异常,执行测试,两表没插入数据,正确。更换异常为检查异常,如,SQLException时,发现,两表都插入数据了,没有发生回滚,错误。

  1.     @Transactional(value=“transactionManager”)
  2.     public void insertData() throws Exception {
  3.         User user = new User();
  4.         user.setId(new Random().nextInt());
  5.         user.setName(“wenqy”);
  6.         userMapper.saveUser( user );
  7. //      if (true) {
  8. //          throw new RuntimeException(“insert failure”);
  9. //      }
  10.         Person person = new Person();
  11.         person.setId(new Random().nextInt());
  12.         person.setEmail(“wen.qy@qq.com”);
  13.         personMapper.savePerson( person );
  14.         if (true) {
  15.             throw new SQLException(“insert failure”);
  16.         }
  17.     }

指定事务需要回滚所抛出的异常,这里是Exception

  1. @Transactional(value=“transactionManager”,rollbackFor=Exception.class)
  2.     public void insertDataRollbackForException() throws Exception {
  3.         User user = new User();
  4.         user.setId(new Random().nextInt());
  5.         user.setName(“wenqy”);
  6.         userMapper.saveUser( user );
  7. //      if (true) {
  8. //          throw new RuntimeException(“insert failure”);
  9. //      }
  10.         Person person = new Person();
  11.         person.setId(new Random().nextInt());
  12.         person.setEmail(“wen.qy@qq.com”);
  13.         personMapper.savePerson( person );
  14.         if (true) {
  15.             throw new SQLException(“insert failure”);
  16.         }
  17.     }

然后继续测试,执行测试,两表没插入数据,说明事务回滚正确

multidatasource_atomikos_correct_err

结论

默认配置下,Spring只有在抛出运行时异常,即RuntimeException及其子类(Errors也会导致事务回滚)时,才回滚该事务。Atomikos分布式事务和Spring声明式事务一样,对业务透明,非侵入式的。Atomikos支持与Spring的无缝衔接。而Spring默认的事务隔离级别是数据库默认事务。需要数据库支持事务,如,MySQL不能使用MYISAM引擎。

参考:

atomikos官网:https://www.atomikos.com/

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

12 + 4 = ?