简介
- 数据一致性问题
- 数据的并发访问,修改
- 不同请求之间的数据隔离
- 多个服务共同完成一个业务请求,保证都完成或失败
- 发生异常时的数据回滚
事务原则与实现
- A:原子性
- C:一致性
- I:隔离性
- D:持久性
spring事务机制
- 事务抽象,事务传播,事务隔离
- spring事务抽象
- PlatformTransactionManager
- TransactionDefinition
- TransactionStatus
- PlatformTransactionManager的常见实现
- DataSourceTransactionManager
- JpaTransactionManager
- JmsTransactionManager
- JtaTransactionManager
Jpa事务实例
-
pom
-
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.imooc.example</groupId> <artifactId>spring-trans-jpa</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring-trans-jpa</name> <description>Demo project for Spring JPA transaction</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
-
dao
-
public interface CustomerRepository extends JpaRepository<Customer, Long> { Customer findOneByUsername(String username); }
-
service
-
//注解方式 @Service public class CustomerServiceTxInAnnotation { private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInAnnotation.class); @Autowired private CustomerRepository customerRepository; @Transactional public Customer create(Customer customer) { LOG.info("CustomerService In Annotation create customer:{}", customer.getUsername()); if (customer.getId() != null) { throw new RuntimeException("用户已经存在"); } customer.setUsername("Annotation:" + customer.getUsername()); return customerRepository.save(customer); } } //代码方式 @Service public class CustomerServiceTxInCode { private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInCode.class); @Autowired private CustomerRepository customerRepository; @Autowired private PlatformTransactionManager transactionManager; public Customer create(Customer customer) { LOG.info("CustomerService In Code create customer:{}", customer.getUsername()); if (customer.getId() != null) { throw new RuntimeException("用户已经存在"); } DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); def.setTimeout(15); TransactionStatus status = transactionManager.getTransaction(def); try { customer.setUsername("Code:" + customer.getUsername()); customerRepository.save(customer); transactionManager.commit(status); return customer; } catch (Exception e) { transactionManager.rollback(status); throw e; } } }
-
web
-
@RestController @RequestMapping("/api/customer") public class CustomerResource { private static final Logger LOG = LoggerFactory.getLogger(CustomerResource.class); @Autowired private CustomerServiceTxInAnnotation customerService; @Autowired private CustomerServiceTxInCode customerServiceInCode; @Autowired private CustomerRepository customerRepository; @PostMapping("/annotation") public Customer createInAnnotation(@RequestBody Customer customer) { LOG.info("CustomerResource create in annotation create customer:{}", customer.getUsername()); return customerService.create(customer); } @PostMapping("/code") public Customer createInCode(@RequestBody Customer customer) { LOG.info("CustomerResource create in code create customer:{}", customer.getUsername()); return customerServiceInCode.create(customer); } @GetMapping("") public List<Customer> getAll() { return customerRepository.findAll(); } }
Jms事务实例
-
session原生事务
-
JmsTransactionManager事务
-
pom
-
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.imooc.example</groupId> <artifactId>spring-trans-jms</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring-trans-jms</name> <description>Demo project for Spring JMS transaction</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
-
config
-
@EnableJms @Configuration public class JmsConfig { private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class); @Bean public JmsTemplate initJmsTemplate(ConnectionFactory connectionFactory) { LOG.debug("init jms template with converter."); JmsTemplate template = new JmsTemplate(); template.setConnectionFactory(connectionFactory); // JmsTemplate使用的connectionFactory跟JmsTransactionManager使用的必须是同一个,不能在这里封装成caching之类的。 return template; } // 这个用于设置 @JmsListener使用的containerFactory @Bean public JmsListenerContainerFactory<?> msgFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer, PlatformTransactionManager transactionManager) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setTransactionManager(transactionManager); factory.setCacheLevelName("CACHE_CONNECTION"); factory.setReceiveTimeout(10000L); configurer.configure(factory, connectionFactory); return factory; } @Bean public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) { return new JmsTransactionManager(connectionFactory); } }
-
service
-
@Service public class CustomerService { private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class); @Autowired JmsTemplate jmsTemplate; @Autowired private PlatformTransactionManager transactionManager; @PostConstruct public void init() { jmsTemplate.setReceiveTimeout(3000); } @JmsListener(destination = "customer:msg:new", containerFactory = "msgFactory") public void handle(String msg) { LOG.debug("Get JMS message to from customer:{}", msg); String reply = "Replied - " + msg; jmsTemplate.convertAndSend("customer:msg:reply", reply); if (msg.contains("error")) { simulateError(); } } @JmsListener(destination = "customer:msg2:new", containerFactory = "msgFactory") public void handle2(String msg) { LOG.debug("Get JMS message2 to from customer:{}", msg); DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setTimeout(15); TransactionStatus status = transactionManager.getTransaction(def); try { String reply = "Replied-2 - " + msg; jmsTemplate.convertAndSend("customer:msg:reply", reply); if (!msg.contains("error")) { transactionManager.commit(status); } else { transactionManager.rollback(status); } } catch (Exception e) { transactionManager.rollback(status); throw e; } } private void simulateError() { throw new RuntimeException("some Data error."); } }
-
web
-
@RestController @RequestMapping("/api/customer") public class CustomerResource { @Autowired JmsTemplate jmsTemplate; @Autowired private CustomerService customerService; @PostMapping("/message1/listen") public void createMsgWithListener(@RequestParam String msg) { jmsTemplate.convertAndSend("customer:msg:new", msg); } @PostMapping("/message1/direct") public void createMsgDirect(@RequestParam String msg) { customerService.handle(msg); } @PostMapping("/message2/listen") public void createMsg2WithListener(@RequestParam String msg) { jmsTemplate.convertAndSend("customer:msg2:new", msg); } @PostMapping("/message2/direct") public void createMsg2Direct(@RequestParam String msg) { customerService.handle2(msg); } @GetMapping("/message") public String getMsg() { Object reply = jmsTemplate.receiveAndConvert("customer:msg:reply"); return String.valueOf(reply); } }
Jta分布式事务实例
- 本地事务
- 外部全局事务
- 外部事务管理器提供事务管理
- 通过spring事务接口,调用外部管理器
- 使用JNDI等方式获取外部事务管理器的实例
- 外部事务管理器一般由应用服务器提供,如Jboss等
- 外部事务管理器提供JTA事务管理
- JTA事务管理器可以管理多个数据资源
- 通过2阶段提交实现多数据源事务
- JTA事务管理的弊端
- 两阶段提交
- 事务实践太长,锁数据的实践太长
- 低性能,低吞吐量
- 不使用JTA实现多数据源的事务管理
- spring事务同步机制
- 多个数据源上实现近似事务一致性
- 高性能,高吞吐量
单数据源
- 使用spring JTA事务管理
- Atomikos外部事务管理器提供JTA事务管理
-
使用一个数据库-单数据源
-
pom
-
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.imooc.example</groupId> <artifactId>spring-trans-jta</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring-trans-jta</name> <description>Demo project for Spring JTA transaction</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
-
dao
-
public interface CustomerRepository extends JpaRepository<Customer, Long> { Customer findOneByUsername(String username); }
-
service
-
//注解方式 @Service public class CustomerServiceTxInAnnotation { private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInAnnotation.class); @Autowired private CustomerRepository customerRepository; @Transactional public Customer create(Customer customer) { LOG.info("CustomerService In Annotation create customer:{}", customer.getUsername()); if (customer.getId() != null) { throw new RuntimeException("用户已经存在"); } customer.setUsername("Annotation:" + customer.getUsername()); return customerRepository.save(customer); } } //代码方式 @Service public class CustomerServiceTxInCode { private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInCode.class); @Autowired private CustomerRepository customerRepository; @Autowired private PlatformTransactionManager transactionManager; public Customer create(Customer customer) { LOG.info("CustomerService In Code create customer:{}", customer.getUsername()); if (customer.getId() != null) { throw new RuntimeException("用户已经存在"); } DefaultTransactionDefinition def = new DefaultTransactionDefinition(); // def.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); def.setTimeout(15); TransactionStatus status = transactionManager.getTransaction(def); try { customer.setUsername("Code:" + customer.getUsername()); customerRepository.save(customer); transactionManager.commit(status); return customer; } catch (Exception e) { transactionManager.rollback(status); throw e; } } }
-
web
-
@RestController @RequestMapping("/api/customer") public class CustomerResource { private static final Logger LOG = LoggerFactory.getLogger(CustomerResource.class); @Autowired private CustomerServiceTxInAnnotation customerService; @Autowired private CustomerServiceTxInCode customerServiceInCode; @Autowired private CustomerRepository customerRepository; @PostMapping("/annotation") public Customer createInAnnotation(@RequestBody Customer customer) { LOG.info("CustomerResource create in annotation create customer:{}", customer.getUsername()); return customerService.create(customer); } @PostMapping("/code") public Customer createInCode(@RequestBody Customer customer) { LOG.info("CustomerResource create in code create customer:{}", customer.getUsername()); return customerServiceInCode.create(customer); } @GetMapping("") public List<Customer> getAll() { return customerRepository.findAll(); } }
多数据源
-
多个数据源:DB,MQ
-
pom
-
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
-
service
-
//注解方式 @Service public class CustomerServiceTxInAnnotation { private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInAnnotation.class); @Autowired private CustomerRepository customerRepository; @Autowired private JmsTemplate jmsTemplate; @Transactional public Customer create(Customer customer) { LOG.info("CustomerService In Annotation create customer:{}", customer.getUsername()); if (customer.getId() != null) { throw new RuntimeException("用户已经存在"); } customer.setUsername("Annotation:" + customer.getUsername()); jmsTemplate.convertAndSend("customer:msg:reply", customer.getUsername() + " created."); return customerRepository.save(customer); } @Transactional @JmsListener(destination = "customer:msg:new") public Customer createByListener(String name) { LOG.info("CustomerService In Annotation by Listener create customer:{}", name); Customer customer = new Customer(); customer.setUsername("Annotation:" + name); customer.setRole("USER"); customer.setPassword("111111"); jmsTemplate.convertAndSend("customer:msg:reply", customer.getUsername() + " created."); return customerRepository.save(customer); } } //代码方式 @Service public class CustomerServiceTxInCode { private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInCode.class); @Autowired private CustomerRepository customerRepository; @Autowired private PlatformTransactionManager transactionManager; @Autowired private JmsTemplate jmsTemplate; public Customer create(Customer customer) { LOG.info("CustomerService In Code create customer:{}", customer.getUsername()); if (customer.getId() != null) { throw new RuntimeException("用户已经存在"); } DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); def.setTimeout(15); TransactionStatus status = transactionManager.getTransaction(def); try { customer.setUsername("Code:" + customer.getUsername()); customerRepository.save(customer); jmsTemplate.convertAndSend("customer:msg:reply", customer.getUsername() + " created."); transactionManager.commit(status); return customer; } catch (Exception e) { transactionManager.rollback(status); throw e; } } @JmsListener(destination = "customer:msg2:new") public void createByListener(String name) { LOG.info("CustomerService In Code by Listener create customer:{}", name); Customer customer = new Customer(); customer.setUsername("Code:" + name); customer.setRole("USER"); customer.setPassword("111111"); DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); def.setTimeout(15); TransactionStatus status = transactionManager.getTransaction(def); try { customerRepository.save(customer); jmsTemplate.convertAndSend("customer:msg:reply", customer.getUsername() + " created."); transactionManager.commit(status); } catch (Exception e) { transactionManager.rollback(status); throw e; } } }
-
web
-
@RestController @RequestMapping("/api/customer") public class CustomerResource { @Autowired JmsTemplate jmsTemplate; @Autowired private CustomerServiceTxInAnnotation customerService; @Autowired private CustomerServiceTxInCode customerServiceInCode; @Autowired private CustomerRepository customerRepository; @PostMapping("/annotation") public Customer createInAnnotation(@RequestBody Customer customer) { return customerService.create(customer); } @PostMapping("/code") public Customer createInCode(@RequestBody Customer customer) { return customerServiceInCode.create(customer); } @Transactional @PostMapping("/message/annotation") public void createMsgWithListener(@RequestParam String userName) { jmsTemplate.convertAndSend("customer:msg:new", userName); } @Transactional @PostMapping("/message/code") public void createMsgDirect(@RequestParam String userName) { jmsTemplate.convertAndSend("customer:msg2:new", userName); } @GetMapping("") public List<Customer> getAll() { return customerRepository.findAll(); } }
模式和技术
- JTA实现多服务的分布式事务
- 不使用JTA,消息驱动的分布式事务
- 如何选择
- 强一致性事务:JTA性能最差,只适用于单个服务内
- 弱,最终一致性事务:最大努力一次提交,链式事务(设计相应的错误处理机制)
- MQ-DB:最大努力一次提交+重试
- 多个DB:链式事务管理
- 多个数据源:链式事务,或其他事务同步方式
- 实现模式
- 消息驱动模式:Message Driven
- 事件溯源模式:Event Sourcing
- TCC模式:Try-Confirm-Cancel
- 幂等性
- 幂等操作:任意多次执行所产生的影响,与一次执行的影响相同
- 方法的幂等性:使用同样的参数调用一个方法多次,与调用一次结果相同
- 接口的幂等性:接口被重复调用,结果一致
- 微服务接口的幂等性
- 重要性:经常需要通过重试实现分布式事务的最终一致性
- get方法不会对系统产生副作用,具有幂等性
- post,put,delete方法的实现需要满足幂等性
- 分布式系统唯一性id:guid
- 分布式系统的全局唯一标识
- uuid:生成唯一id的规范
- 用于唯一标识,处理重复消息
- 数据库自增序列
- uuid:唯一id标准,128位,几种版本
- mongodb的objectId:时间戳+机器id+进程id+序号
- redis的incr操作,zookeeper节点的版本号
- 使用哪种方式
- 自增id:考虑安全性,部署
- 时间有序:便于通过id判断创建时间
- 长度,是否数字类型:是否建立索引
- 分布式系统分布式对象
- redis:redission库:rlock,rmap,rqueue等对象
- zookeeper:netfilx curator库:lock,queue等对象
实例1:DatasourceTransactionManager
-
MySQL+MySQL
-
链式事务:DatasourceTransactionManager
-
不处理重试
-
pom
-
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.imooc.example</groupId> <artifactId>spring-dtx-db-db</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring-dtx-db-db</name> <description>Demo project for Spring distributed transaction for DB-DB</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>com.zaxxer</groupId> <artifactId>HikariCP</artifactId> <version>2.7.8</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.39</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-commons</artifactId> <version>1.13.7.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
-
yml
-
# 默认的Datasource配置 # spring.datasource.url = jdbc:mysql://localhost:3307/imooc_user # spring.datasource.username = root # spring.datasource.password = 123456 # spring.datasource.driverClassName=com.mysql.jdbc.Driver spring.ds_user.url = jdbc:mysql://localhost:3306/imooc_user spring.ds_user.username = root spring.ds_user.password = 123456 spring.ds_user.driver-class-name = com.mysql.jdbc.Driver spring.ds_order.url=jdbc:mysql://localhost:3307/imooc_order spring.ds_order.username=root spring.ds_order.password=123456 spring.ds_order.driver-class-name=com.mysql.jdbc.Driver
-
service
-
@Service public class CustomerService { @Autowired @Qualifier("userJdbcTemplate") private JdbcTemplate userJdbcTemplate; @Autowired @Qualifier("orderJdbcTemplate") private JdbcTemplate orderJdbcTemplate; private static final String SQL_UPDATE_DEPOSIT = "UPDATE customer SET deposit = deposit - ? where id = ?"; private static final String SQL_CREATE_ORDER = "INSERT INTO customer_order (customer_id, title, amount) VALUES (?, ?, ?)"; @Transactional public void createOrder(Order order) { userJdbcTemplate.update(SQL_UPDATE_DEPOSIT, order.getAmount(), order.getCustomerId()); if (order.getTitle().contains("error1")) { throw new RuntimeException("Error1"); } orderJdbcTemplate.update(SQL_CREATE_ORDER, order.getCustomerId(), order.getTitle(), order.getAmount()); if (order.getTitle().contains("error2")) { throw new RuntimeException("Error2"); } } public Map userInfo(Long customerId) { Map customer = userJdbcTemplate.queryForMap("SELECT * from customer where id = " + customerId); List orders = orderJdbcTemplate.queryForList("SELECT * from customer_order where customer_id = " + customerId); Map result = new HashMap(); result.put("customer", customer); result.put("orders", orders); return result; } }
-
config
-
@Configuration public class DBConfiguration { @Bean @Primary //默认使用这个DataSourceProperties @ConfigurationProperties(prefix = "spring.ds_user") public DataSourceProperties userDataSourceProperties() { return new DataSourceProperties(); } @Bean @Primary public DataSource userDataSource() { return userDataSourceProperties().initializeDataSourceBuilder().type(HikariDataSource.class).build(); } //链式事务管理器,把两个事务管理器连接起来 @Bean public PlatformTransactionManager transactionManager() { DataSourceTransactionManager userTM = new DataSourceTransactionManager(userDataSource()); PlatformTransactionManager orderTM = new DataSourceTransactionManager(orderDataSource()); ChainedTransactionManager tm = new ChainedTransactionManager(userTM, orderTM); return tm; } @Bean public JdbcTemplate userJdbcTemplate(@Qualifier("userDataSource") DataSource userDataSource) { return new JdbcTemplate(userDataSource); } @Bean @ConfigurationProperties(prefix = "spring.ds_order") public DataSourceProperties orderDataSourceProperties() { return new DataSourceProperties(); } @Bean public DataSource orderDataSource() { return orderDataSourceProperties().initializeDataSourceBuilder().type(HikariDataSource.class).build(); } @Bean public JdbcTemplate orderJdbcTemplate(@Qualifier("orderDataSource") DataSource orderDataSource) { return new JdbcTemplate(orderDataSource); } }
-
web
-
@RestController @RequestMapping("/api/customer") public class CustomerResource { @Autowired private CustomerService customerService; @PostMapping("/order") public void create(@RequestBody Order order) { customerService.createOrder(order); } @GetMapping("/{id}") public Map userInfo(@PathVariable Long id) { return customerService.userInfo(id); } }
实例2:JpaTransactionManager
-
MySQL+MySQL
-
链式事务:JpaTransactionManager
-
不处理重试
-
pom
-
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.imooc.example</groupId> <artifactId>spring-dtx-jpa-db</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring-dtx-jpa-db</name> <description>Demo project for Spring distributed transaction for DB-DB</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.39</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
-
service
-
@Service public class CustomerService { @Autowired private CustomerRepository customerRepository; @Autowired @Qualifier("orderJdbcTemplate") private JdbcTemplate orderJdbcTemplate; private static final String SQL_CREATE_ORDER = "INSERT INTO customer_order (customer_id, title, amount) VALUES (?, ?, ?)"; @Transactional public void createOrder(Order order) { Customer customer = customerRepository.findOne(order.getCustomerId()); customer.setDeposit(customer.getDeposit() - order.getAmount()); customerRepository.save(customer); if (order.getTitle().contains("error1")) { throw new RuntimeException("Error1"); } orderJdbcTemplate.update(SQL_CREATE_ORDER, order.getCustomerId(), order.getTitle(), order.getAmount()); if (order.getTitle().contains("error2")) { throw new RuntimeException("Error2"); } } public Map userInfo(Long customerId) { Customer customer = customerRepository.findOne(customerId); List orders = orderJdbcTemplate.queryForList("SELECT * from customer_order where customer_id = " + customerId); Map result = new HashMap(); result.put("customer", customer); result.put("orders", orders); return result; } }
-
config
-
@Configuration public class DBConfiguration { @Bean(name = "userDataSource") @Primary @ConfigurationProperties(prefix = "spring.ds_user") public DataSource userDataSource() { return DataSourceBuilder.create().build(); } @Bean(name = "orderDataSource") public DataSource orderDataSource() { return DataSourceBuilder.create().build(); } @Bean public JdbcTemplate orderJdbcTemplate(@Qualifier("orderDataSource") DataSource orderDataSource) { return new JdbcTemplate(orderDataSource); } @Bean public LocalContainerEntityManagerFactoryBean entityManagerFactory() { HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter(); vendorAdapter.setGenerateDdl(false); // we have already created tables. LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean(); factory.setJpaVendorAdapter(vendorAdapter); factory.setPackagesToScan("com.imooc.example"); factory.setDataSource(userDataSource()); return factory; } //链式事务管理器 @Bean public PlatformTransactionManager transactionManager() { JpaTransactionManager userTM = new JpaTransactionManager(); userTM.setEntityManagerFactory(entityManagerFactory().getObject()); PlatformTransactionManager orderTM = new DataSourceTransactionManager(orderDataSource()); ChainedTransactionManager tm = new ChainedTransactionManager(userTM, orderTM); return tm; } }
-
web
-
@RestController @RequestMapping("/api/customer") public class CustomerResource { @Autowired private CustomerService customerService; @PostMapping("/order") public void create(@RequestBody Order order) { customerService.createOrder(order); } @GetMapping("/{id}") public Map userInfo(@PathVariable Long id) { return customerService.userInfo(id); } }
实例3:activeMQ+MySQL
-
JMS-DB
-
activeMQ+MySQL
-
最大努力一次提交:TransactionAwareConnectionFactoryProxy
-
pom
-
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.imooc.example</groupId> <artifactId>spring-dtx-jms-db</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring-dtx-jms-db</name> <description>Demo project for Spring distributed transaction for DB and MQ resources.</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.39</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
-
yaml
-
spring.datasource.url = jdbc:mysql://localhost:3306/imooc_user spring.datasource.username = root spring.datasource.password = 123456 spring.datasource.driverClassName=com.mysql.jdbc.Driver spring.activemq.broker-url = tcp://localhost:61616
-
service
-
@Service public class CustomerService { private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class); @Autowired private CustomerRepository customerRepository; @Autowired private JmsTemplate jmsTemplate; @Transactional @JmsListener(destination = "customer:msg:new") public void handle(String msg) { LOG.info("Get msg1:{}", msg); Customer customer = new Customer(); customer.setUsername(msg); customer.setDeposit(100); customerRepository.save(customer); if (msg.contains("error1")) { throw new RuntimeException("Error1"); } jmsTemplate.convertAndSend("customer:msg:reply", msg + " created."); if (msg.contains("error2")) { throw new RuntimeException("Error2"); } } @Transactional public Customer create(Customer customer) { LOG.info("CustomerService In Annotation create customer:{}", customer.getUsername()); if (customer.getId() != null) { throw new RuntimeException("用户已经存在"); } customer.setUsername("Annotation:" + customer.getUsername()); customer = customerRepository.save(customer); if (customer.getUsername().contains("error1")) { throw new RuntimeException("Error1"); } jmsTemplate.convertAndSend("customer:msg:reply", customer.getUsername() + " created."); if (customer.getUsername().contains("error2")) { throw new RuntimeException("Error2"); } return customer; } }
-
config
-
@Configuration public class JmsConfiguration { @Bean public JmsTemplate initJmsTemplate(ConnectionFactory connectionFactory) { JmsTemplate template = new JmsTemplate(); template.setConnectionFactory(connectionFactory); template.setSessionTransacted(true); return template; } //把JMS事务同步到JPA事务上 @Bean public ConnectionFactory connectionFactory() { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); TransactionAwareConnectionFactoryProxy proxy = new TransactionAwareConnectionFactoryProxy(); proxy.setTargetConnectionFactory(cf); proxy.setSynchedLocalTransactionAllowed(true); return proxy; } }
-
web
-
@RestController @RequestMapping("/api/customer") public class CustomerResource { @Autowired JmsTemplate jmsTemplate; @Autowired private CustomerService customerService; @Autowired private CustomerRepository customerRepository; //通过jpa创建 @PostMapping("/annotation") public Customer createInAnnotation(@RequestBody Customer customer) { return customerService.create(customer); } //通过activeMQ创建 @PostMapping("/msg") public void create(@RequestBody Customer customer) { jmsTemplate.convertAndSend("customer:msg:new", customer.getUsername()); } @GetMapping("") public List<Customer> getAll() { return customerRepository.findAll(); } @GetMapping("/msg") public String getMsg() { jmsTemplate.setReceiveTimeout(3000); Object reply = jmsTemplate.receiveAndConvert("customer:msg:reply"); return String.valueOf(reply); } }
消息驱动模型
- 消息驱动模型
- 注意
- 消息中间件需要支持事务
- 如果处理重试的消息
- 发生业务异常时回滚操作
- 系统错误的处理
- 方法1:将出错未处理的消息写到失败队列,进行相应回滚操作
- 方法2:通过定时任务检查超时订单,对未完成的订单做自动回滚
- 方法3:保存出错消息,人工处理
卖票实例
- order服务,user服务,ticket服务
- activeMQ作为消息中间件
- 错误处理:定时任务检查超时并回滚
- 幂等性:实现方法的幂等性
- 流程
- 失败场景
- 异常订单处理
- 定时处理异常订单:1未被处理完成 2未被按失败订单处理
- 解锁票,撤销交票
- 对于余额之类的重要数据,可能使用人工处理
- 实现锁票的安全性
- 利用@JmsListener设置一个消费者,不适用于多实例
- 使用事务和数据库锁的特性
- 分布式锁
register服务
-
pom
-
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.imooc.example</groupId> <artifactId>registry</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>registry</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <spring-cloud.version>Edgware.SR2</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-security</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka-server</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-hystrix-dashboard</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
-
yaml
-
server: port: 8761 spring: application: name: registry security: basic: enabled: true user: name: imooc password: 111111 eureka: client: register-with-eureka: false fetch-registry: false serviceUrl: defaultZone: http://imooc:111111@localhost:${server.port}/eureka/
-
application
-
@SpringBootApplication @EnableEurekaServer @EnableHystrixDashboard public class RegistryApplication { public static void main(String[] args) { SpringApplication.run(RegistryApplication.class, args); } }
proxy服务
-
pom
-
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.imooc.example</groupId> <artifactId>proxy</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>proxy</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <spring-cloud.version>Edgware.SR2</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-zuul</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
-
yaml
-
server: port: 8888 spring: application: name: proxy eureka: client: serviceUrl: defaultZone: http://imooc:111111@localhost:8761/eureka/ #zuul: # routes: # userApi: # path: /home/** # stripPrefix: false # serviceId: user
-
application
-
@SpringBootApplication @EnableEurekaClient @EnableZuulProxy public class ProxyApplication { public static void main(String[] args) { SpringApplication.run(ProxyApplication.class, args); } }
order服务
-
pom
-
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.imooc.example</groupId> <artifactId>order</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>order</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <spring-cloud.version>Edgware.SR2</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>com.imooc.example</groupId> <artifactId>service</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.39</version> </dependency> <dependency> <groupId>com.fasterxml.uuid</groupId> <artifactId>java-uuid-generator</artifactId> <version>3.1.5</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
-
yaml
-
server: port: 8082 spring: application: name: order datasource: url: jdbc:mysql://localhost:3306/imooc_order username: root password: 123456 driver-class-name: com.mysql.jdbc.Driver jpa: hibernate: ddl-auto: update eureka: client: serviceUrl: defaultZone: http://imooc:111111@localhost:8761/eureka/
-
po
-
@Entity(name = "customer_order") public class Order { @Id @GeneratedValue private Long id; private String uuid; private Long customerId; private String title; private Long ticketNum; private int amount; private String status; private String reason; private ZonedDateTime createdDate; }
-
dao
-
public interface OrderRepository extends JpaRepository<Order, Long> { List<Order> findAllByCustomerId(Long customerId); List<Order> findAllByStatusAndCreatedDateBefore(String status, ZonedDateTime checkTime); Order findOneByUuid(String uuid); }
-
service
-
@Service public class OrderService { private static final Logger LOG = LoggerFactory.getLogger(OrderService.class); @Autowired private JmsTemplate jmsTemplate; @Autowired private OrderRepository orderRepository; //锁票成功,开始处理订单 @Transactional @JmsListener(destination = "order:locked", containerFactory = "msgFactory") public void handle(OrderDTO msg) { LOG.info("Get new order to create:{}", msg); if (orderRepository.findOneByUuid(msg.getUuid()) != null) { // 通过保存到数据库,来使用uuid处理重复消息 LOG.info("Msg already processed:{}", msg); } else { Order order = newOrder(msg); orderRepository.save(order); msg.setId(order.getId()); } msg.setStatus("NEW"); //订单创建完毕,开始支付,由user服务处理 jmsTemplate.convertAndSend("order:pay", msg); } private Order newOrder(OrderDTO dto) { Order order = new Order(); order.setUuid(dto.getUuid()); order.setAmount(dto.getAmount()); order.setTitle(dto.getTitle()); order.setCustomerId(dto.getCustomerId()); order.setTicketNum(dto.getTicketNum()); order.setStatus("NEW"); order.setCreatedDate(ZonedDateTime.now()); return order; } //订单处理完成,入库 @Transactional @JmsListener(destination = "order:finish", containerFactory = "msgFactory") public void handleFinish(OrderDTO msg) { LOG.info("Get finished order:{}", msg); Order order = orderRepository.findOne(msg.getId()); order.setStatus("FINISH"); orderRepository.save(order); } /** * 订单失败的几种情况: * 1. 一开始索票失败。 * 2. 扣费失败后,解锁票,然后出发 * 3. 定时任务检测到订单超时 * @param msg */ @Transactional @JmsListener(destination = "order:fail", containerFactory = "msgFactory") public void handleFailed(OrderDTO msg) { LOG.info("Get failed order:{}", msg); Order order; if (msg.getId() == null) { order = newOrder(msg); order.setReason("TICKET_LOCK_FAIL"); // 锁票失败,可能票id不对,或者已被别人买走 } else { order = orderRepository.findOne(msg.getId()); if (msg.getStatus().equals("NOT_ENOUGH_DEPOSIT")) { order.setReason("NOT_ENOUGH_DEPOSIT"); } } order.setStatus("FAIL"); orderRepository.save(order); } @Scheduled(fixedDelay = 10000L) public void checkInvalidOrder() { ZonedDateTime checkTime = ZonedDateTime.now().minusMinutes(1L); List<Order> orders = orderRepository.findAllByStatusAndCreatedDateBefore("NEW", checkTime); orders.stream().forEach(order -> { LOG.error("Order timeout:{}", order); OrderDTO dto = new OrderDTO(); dto.setId(order.getId()); dto.setTicketNum(order.getTicketNum()); dto.setUuid(order.getUuid()); dto.setAmount(order.getAmount()); dto.setTitle(order.getTitle()); dto.setCustomerId(order.getCustomerId()); dto.setStatus("TIMEOUT"); jmsTemplate.convertAndSend("order:ticket_error", dto); }); } }
-
config
-
@Configuration public class JmsConfig { @Bean public ConnectionFactory connectionFactory() { ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); TransactionAwareConnectionFactoryProxy proxy = new TransactionAwareConnectionFactoryProxy(); proxy.setTargetConnectionFactory(cf); proxy.setSynchedLocalTransactionAllowed(true); return proxy; } @Bean public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, MessageConverter jacksonJmsMessageConverter) { JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); jmsTemplate.setMessageConverter(jacksonJmsMessageConverter); jmsTemplate.setSessionTransacted(true); return jmsTemplate; } @Bean public JmsListenerContainerFactory<?> msgFactory(ConnectionFactory cf, PlatformTransactionManager transactionManager, DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); configurer.configure(factory, cf); factory.setReceiveTimeout(10000L); // FIXME: 使用非内置的activeMQ服务器,这个就不需要, // factory.setCacheLevelName("CACHE_CONNECTION"); factory.setTransactionManager(transactionManager); factory.setConcurrency("10"); return factory; } @Bean public MessageConverter jacksonJmsMessageConverter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); converter.setTargetType(MessageType.TEXT); converter.setTypeIdPropertyName("_type"); return converter; } }
-
web
-
@RestController @RequestMapping("/api/order") public class OrderResource implements OrderCompositeService { @Autowired private OrderRepository orderRepository; @Autowired private JmsTemplate jmsTemplate; private TimeBasedGenerator uuidGenerator = Generators.timeBasedGenerator(); //入口,创建订单 @Transactional @PostMapping("") public void create(@RequestBody OrderDTO dto) { dto.setUuid(uuidGenerator.generate().toString()); //MQ发送创建订单请求,由ticket服务处理 jmsTemplate.convertAndSend("order:new", dto); } @GetMapping("/{customerId}") public List<OrderDTO> getMyOrder(@PathVariable Long customerId) { List<Order> orders = orderRepository.findAllByCustomerId(customerId); return orders.stream().map(order -> { OrderDTO dto = new OrderDTO(); dto.setId(order.getId()); dto.setStatus(order.getStatus()); dto.setTicketNum(order.getTicketNum()); dto.setAmount(order.getAmount()); dto.setCustomerId(order.getCustomerId()); dto.setTitle(order.getTitle()); dto.setUuid(order.getUuid()); return dto; }).collect(Collectors.toList()); } @GetMapping("") public List<Order> getAll() { return orderRepository.findAll(); } }
-
application
-
@SpringBootApplication @EnableDiscoveryClient public class OrderApplication { public static void main(String[] args) { SpringApplication.run(OrderApplication.class, args); } }
user服务
-
pom
-
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.imooc.example</groupId> <artifactId>user</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>user</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <spring-cloud.version>Edgware.SR2</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-feign</artifactId> <!--<artifactId>spring-cloud-starter-openfeign</artifactId> 如果feign client出错,可能需要这个--> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-hystrix</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.39</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.imooc.example</groupId> <artifactId>service</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
-
yaml
-
server: port: 8081 spring: application: name: user datasource: url: jdbc:mysql://localhost:3306/imooc_user username: root password: 123456 driver-class-name: com.mysql.jdbc.Driver jpa: hibernate: ddl-auto: update eureka: client: serviceUrl: defaultZone: http://imooc:111111@localhost:8761/eureka/
-
po
-
@Entity(name = "customer") public class Customer { @Id @GeneratedValue private Long id; @Column(name = "user_name") private String username; private String password; private String role; private int deposit; } @Entity(name = "pay_info") public class PayInfo { @Id @GeneratedValue private Long id; @Column(name = "order_id") private Long orderId; private String status; private int amount; }
-
dao
-
public interface CustomerRepository extends JpaRepository<Customer, Long> { Customer findOneByUsername(String username); @Modifying @Query("UPDATE customer SET deposit = deposit - ?2 WHERE id = ?1") int charge(Long customerId, int amount); } public interface PayInfoRepository extends JpaRepository<PayInfo, Long> { PayInfo findOneByOrderId(Long orderId); }
-
service
-
@Service public class CustomerService { private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class); @Autowired private JmsTemplate jmsTemplate; @Autowired private CustomerRepository customerRepository; @Autowired private PayInfoRepository payInfoRepository; //订单创建完毕,开始付款 @Transactional @JmsListener(destination = "order:pay", containerFactory = "msgFactory") public void handle(OrderDTO msg) { LOG.info("Get new order to pay:{}", msg); // 先检查payInfo判断重复消息。 PayInfo pay = payInfoRepository.findOneByOrderId(msg.getId()); if (pay != null) { LOG.warn("Order already paid, duplicated message."); return; } Customer customer = customerRepository.findOne(msg.getCustomerId()); if (customer.getDeposit() < msg.getAmount()) { LOG.info("No enough deposit, need amount:{}", msg.getAmount()); msg.setStatus("NOT_ENOUGH_DEPOSIT"); //余额不足 jmsTemplate.convertAndSend("order:ticket_error", msg); return; } pay = new PayInfo(); pay.setOrderId(msg.getId()); pay.setAmount(msg.getAmount()); pay.setStatus("PAID"); payInfoRepository.save(pay); // customer.setDeposit(customer.getDeposit() - msg.getAmount()); // customerRepository.save(customer); // 如果用户下了2个订单,这个handle方法不是单线程处理,或者有多个实例,又刚好这2个请求被同时处理, customerRepository.charge(msg.getCustomerId(), msg.getAmount()); msg.setStatus("PAID"); //支付完成,由ticket服务处理 jmsTemplate.convertAndSend("order:ticket_move", msg); } }
-
config
-
@Configuration public class JmsConfig { @Bean public ConnectionFactory connectionFactory() { ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); TransactionAwareConnectionFactoryProxy proxy = new TransactionAwareConnectionFactoryProxy(); proxy.setTargetConnectionFactory(cf); proxy.setSynchedLocalTransactionAllowed(true); return proxy; } @Bean public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, MessageConverter jacksonJmsMessageConverter) { JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); jmsTemplate.setMessageConverter(jacksonJmsMessageConverter); jmsTemplate.setSessionTransacted(true); return jmsTemplate; } @Bean public JmsListenerContainerFactory<?> msgFactory(ConnectionFactory cf, PlatformTransactionManager transactionManager, DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); configurer.configure(factory, cf); factory.setReceiveTimeout(10000L); // factory.setCacheLevelName("CACHE_CONNECTION"); factory.setTransactionManager(transactionManager); factory.setConcurrency("10"); return factory; } @Bean public MessageConverter jacksonJmsMessageConverter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); converter.setTargetType(MessageType.TEXT); converter.setTypeIdPropertyName("_type"); return converter; } }
-
web
-
@RestController @RequestMapping("/api/customer") public class CustomerResource { @PostConstruct public void init() { if (customerRepository.count() > 0) { return; } Customer customer = new Customer(); customer.setUsername("imooc"); customer.setPassword("111111"); customer.setRole("User"); customer.setDeposit(1000); customerRepository.save(customer); } @Autowired private CustomerRepository customerRepository; @Autowired private OrderCompositeService orderClient; @Autowired private TicketCompositeService ticketClient; @PostMapping("") public Customer create(@RequestBody Customer customer) { return customerRepository.save(customer); } @GetMapping("") @HystrixCommand public List<Customer> getAll() { return customerRepository.findAll(); } @GetMapping("/my") @HystrixCommand public Map getMyInfo() { Customer customer = customerRepository.findOneByUsername("imooc"); List<OrderDTO> orders = orderClient.getMyOrder(customer.getId()); List<TicketDTO> tickets = ticketClient.getMyTickets(customer.getId()); Map result = new HashMap(); result.put("customer", customer); result.put("orders", orders); result.put("tickets", tickets); return result; } }
-
application
-
@SpringBootApplication @EnableDiscoveryClient @EnableHystrix @EnableFeignClients public class UserApplication { public static void main(String[] args) { SpringApplication.run(UserApplication.class, args); } }
ticket服务
-
pom
-
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.imooc.example</groupId> <artifactId>ticket</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>ticket</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <spring-cloud.version>Edgware.SR2</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>com.imooc.example</groupId> <artifactId>service</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.39</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
-
yaml
-
server: port: 8083 spring: application: name: ticket datasource: url: jdbc:mysql://localhost:3306/imooc_ticket username: root password: 123456 driver-class-name: com.mysql.jdbc.Driver jpa: hibernate: ddl-auto: update eureka: client: serviceUrl: defaultZone: http://imooc:111111@localhost:8761/eureka/
-
po
-
@Entity(name = "ticket") public class Ticket { @Id @GeneratedValue private Long id; private Long ticketNum; private String name; private Long lockUser; private Long owner; }
-
dao
-
public interface TicketRepository extends JpaRepository<Ticket, Long> { List<Ticket> findAllByOwner(Long owner); Ticket findOneByTicketNum(Long num); // @Modifying(clearAutomatically = true) @Modifying @Query("UPDATE ticket SET lockUser = ?1 WHERE lockUser is NULL and ticketNum = ?2") int lockTicket(Long customerId, Long ticketNum); @Modifying @Query("UPDATE ticket SET lockUser = null WHERE lockUser = ?1 and ticketNum = ?2") int unLockTicket(Long customerId, Long ticketNum); @Modifying @Query("UPDATE ticket SET owner = ?1, lockUser = null WHERE lockUser = ?1 and ticketNum = ?2") int moveTicket(Long customerId, Long ticketNum); @Modifying @Query("UPDATE ticket SET owner = NULL WHERE owner = ?1 and ticketNum = ?2") int unMoveTicket(Long customerId, Long ticketNum); @Override @Modifying(clearAutomatically = true) Ticket save(Ticket o); }
-
service
-
@Service public class TicketService { private static final Logger LOG = LoggerFactory.getLogger(TicketService.class); @Autowired private JmsTemplate jmsTemplate; @Autowired private TicketRepository ticketRepository; //接收到order服务发送的创建订单请求,开始创建订单 @Transactional @JmsListener(destination = "order:new", containerFactory = "msgFactory") public void handleTicketLock(OrderDTO msg) { LOG.info("Get new order for ticket lock:{}", msg); //执行锁票操作 int lockCount = ticketRepository.lockTicket(msg.getCustomerId(), msg.getTicketNum()); if (lockCount == 0) { //锁票失败,发送给MQ,由订单服务处理 msg.setStatus("TICKET_LOCK_FAIL"); jmsTemplate.convertAndSend("order:fail", msg); } else { //锁票成功,发送消息给MQ,由订单服务处理 msg.setStatus("TICKET_LOCKED"); jmsTemplate.convertAndSend("order:locked", msg); } } //付款完成,移交订单 @Transactional @JmsListener(destination = "order:ticket_move", containerFactory = "msgFactory") public void handleTicketMove(OrderDTO msg) { LOG.info("Get new order for ticket move:{}", msg); int moveCount = ticketRepository.moveTicket(msg.getCustomerId(), msg.getTicketNum()); if (moveCount == 0) { LOG.info("Ticket already transferred."); } msg.setStatus("TICKET_MOVED"); //订单完成,交由order服务处理 jmsTemplate.convertAndSend("order:finish", msg); } /** * 触发 error_ticket 的情况: * 1. 扣费失败,需要解锁票 * 2. 订单超时,如果存在锁票就解锁,如果已经交票就撤回 * 这时候,都已经在OrderDTO里设置了失败的原因,所以这里就不再设置原因。 * @param msg */ @Transactional @JmsListener(destination = "order:ticket_error", containerFactory = "msgFactory") public void handleError(OrderDTO msg) { LOG.info("Get order error for ticket unlock:{}", msg); int count = ticketRepository.unMoveTicket(msg.getCustomerId(), msg.getTicketNum()); // 撤销票的转移 if (count == 0) { LOG.info("Ticket already unlocked:", msg); } count = ticketRepository.unLockTicket(msg.getCustomerId(), msg.getTicketNum()); // 撤销锁票 if (count == 0) { LOG.info("Ticket already unmoved, or not moved:", msg); } jmsTemplate.convertAndSend("order:fail", msg); } @Transactional public Ticket lockTicket(OrderDTO orderDTO) { Ticket ticket = ticketRepository.findOneByTicketNum(orderDTO.getTicketNum()); ticket.setLockUser(orderDTO.getCustomerId()); ticket = ticketRepository.save(ticket); try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { LOG.error(e.getMessage()); } return ticket; } @Transactional public int lockTicket2(OrderDTO orderDTO) { int updateCount = ticketRepository.lockTicket(orderDTO.getCustomerId(), orderDTO.getTicketNum()); LOG.info("Updated ticket count:{}", updateCount); try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { LOG.error(e.getMessage()); } return updateCount; } @Transactional public int unLockTicket(OrderDTO orderDTO) { int updateCount = ticketRepository.unLockTicket(orderDTO.getCustomerId(), orderDTO.getTicketNum()); LOG.info("Updated ticket count:{}", updateCount); return updateCount; } }
-
config
-
@Configuration public class JmsConfig { @Bean public ConnectionFactory connectionFactory() { ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); TransactionAwareConnectionFactoryProxy proxy = new TransactionAwareConnectionFactoryProxy(); proxy.setTargetConnectionFactory(cf); proxy.setSynchedLocalTransactionAllowed(true); return proxy; } @Bean public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, MessageConverter jacksonJmsMessageConverter) { JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); jmsTemplate.setMessageConverter(jacksonJmsMessageConverter); jmsTemplate.setSessionTransacted(true); return jmsTemplate; } @Bean public JmsListenerContainerFactory<?> msgFactory(ConnectionFactory cf, PlatformTransactionManager transactionManager, DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); configurer.configure(factory, cf); factory.setReceiveTimeout(10000L); factory.setCacheLevelName("CACHE_CONNECTION"); factory.setTransactionManager(transactionManager); factory.setConcurrency("10"); return factory; } @Bean public MessageConverter jacksonJmsMessageConverter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); converter.setTargetType(MessageType.TEXT); converter.setTypeIdPropertyName("_type"); return converter; } }
-
web
-
@RestController @RequestMapping("/api/ticket") public class TicketResource implements TicketCompositeService { @PostConstruct public void init() { if (ticketRepository.count() > 0) { return; } Ticket ticket = new Ticket(); ticket.setName("No.1"); ticket.setTicketNum(110L); ticketRepository.save(ticket); } @Autowired private TicketRepository ticketRepository; @Autowired private TicketService ticketService; @PostMapping("") public OrderDTO create(@RequestBody OrderDTO dto) { return dto; } @GetMapping("/{customerId}") public List<TicketDTO> getMyTickets(@PathVariable(name = "customerId") Long customerId) { List<Ticket> tickets = ticketRepository.findAllByOwner(customerId); return tickets.stream().map(ticket -> { TicketDTO dto = new TicketDTO(); dto.setTicketNum(ticket.getTicketNum()); dto.setId(ticket.getId()); dto.setLockUser(ticket.getLockUser()); dto.setName(ticket.getName()); dto.setOwner(ticket.getOwner()); return dto; }).collect(Collectors.toList()); } @PostMapping("/lock") public Ticket lock(@RequestBody OrderDTO dto) { return ticketService.lockTicket(dto); } @PostMapping("/lock2") public int lock2(@RequestBody OrderDTO dto) { return ticketService.lockTicket2(dto); } @GetMapping("") public List<Ticket> getAll() { return ticketRepository.findAll(); } }
-
application
-
@SpringBootApplication @EnableDiscoveryClient public class TicketApplication { public static void main(String[] args) { SpringApplication.run(TicketApplication.class, args); } }
入口
-
localhost:8888/order/api/order
-
{ "customerId":1, "title":"new_order", "amount":100, "ticketNum":100 }
事件溯源模式
- 消息驱动(msg-driven or event-driven)
- 事件不要求持久化保存
- 消息只是为了更新业务数据的状态,数据库才是一等数据
- 不要求所有的数据操作都通过消息驱动
- 事件溯源(event-sourcing)
- 事件作为一等数据保存
- 统一的事件管理器和接口,数据更新都由事件产生
- 数据库中数据的当前状态根据事件的聚合产生
- 聚合数据可以保存在数据库中,可以根据事件重新生成
- 事件溯源的优点
- 历史重现:从事件中重新生成视图数据库
- 方便的数据流处理与报告生成
- 性能
- 服务的松耦合
- 事件溯源的缺点
- 只能保证事务的最终一致性
- 设计和开发思维的转变,学习成本
- 事件结构的改变
- 扩展性:event store的分布式实现,事件的分布式处理
- 消息驱动 VS 事件溯源
- 一等数据:事件 VS 业务数据
- 事件永久保存,历史重现
- 所有数据更新都必须通过事件来产生
- event store服务承担更多的功能
- 事件溯源的数据一致性
- 一个事件只处理一个服务的数据
- 保证事件的至少一次处理,幂等性
- 业务请求的错误处理:多次重试失败,网络异常,服务不可用
- 实例
- 事件溯源和CQRS
- CQRS:命令查询职责分离
Axon框架
- 介绍
- 实现event sourcing和CQRS模式的框架
- 实现了命令,事件的分发,处理,聚合,查询,存储
- 提供标签式开发,易维护,并提供来springboot的集成
- 提供command和event
- 可扩展,可用于分布式环境,如springcloud
- 构成
- 聚合:aggregate
- 聚合的资源库:repository
- command:command bus和command handler
- event:event bus,event handler和event store
- saga:基于事件的流程管理模式
- query:执行数据查询操作的特殊command
- 可扩展性
- 分布式command
- 通过AMQP实现分布式event分发和处理
- axon处理command过程
- resource收到请求,send给commandGateway
- commandGateway执行拦截器等,再发给commandBus
- commandBus创建一个unitOfWork,关联一个事务,在其中调用commandHandler处理这个command
- commandHandler使用repository获得一个聚合对象,并聚合所有该对象的event,设置lock,然后调用处理方法
- commandHandler再触发一个event
- axon处理event过程
- commandHandler执行apply来触发一个event
- eventBus在这个event上执行拦截器等
- eventBus将这个event保存到eventstore
- eventBus调用在这个event上注册的所有处理方法(在unitOfWork中执行)
- 在eventHandler中更新聚合数据,保存视图数据库,触发其他command
- 使用axon框架的设计过程
- 领域模型设计
- 业务-command-command处理
- 数据-event-event处理
- 将数据保存到数据库:聚合数据-映射到-视图数据
- 查询-query
saga
- 事务驱动的业务流程管理模式
- 通过开始事件,结束事件,过程中的事件完成整个业务流程
- 保证在多个事件处理方法执行期间实现事务性
- startSage-sageEventHandler-endSaga
- 使用associate将不同的事件关联到同一个saga流程中
- 正常的结束业务都通过endSaga标签触发,超时使用eventScheduler,触发一个endSaga
- 一次业务流程的执行对应一个saga实例
- saga实例状态和关联的事件会保存在数据库中