百味皆苦 java后端开发攻城狮

MK-分布式事务实践

2021-09-05
百味皆苦

简介

  • 数据一致性问题
    • 数据的并发访问,修改
    • 不同请求之间的数据隔离
    • 多个服务共同完成一个业务请求,保证都完成或失败
    • 发生异常时的数据回滚

事务原则与实现

  • A:原子性
  • C:一致性
  • I:隔离性
  • D:持久性

spring事务机制

  • 事务抽象,事务传播,事务隔离
  • spring事务抽象
    • PlatformTransactionManager
    • TransactionDefinition
    • TransactionStatus
  • PlatformTransactionManager的常见实现
    • DataSourceTransactionManager
    • JpaTransactionManager
    • JmsTransactionManager
    • JtaTransactionManager

Jpa事务实例

  • pom

  • <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
      
    	<groupId>com.imooc.example</groupId>
    	<artifactId>spring-trans-jpa</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
      
    	<name>spring-trans-jpa</name>
    	<description>Demo project for Spring JPA transaction</description>
      
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.9.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
      
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    	</properties>
      
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-data-jpa</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>com.h2database</groupId>
    			<artifactId>h2</artifactId>
    			<scope>runtime</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
      
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
      
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
      
      
    </project>
      
    
  • dao

  • public interface CustomerRepository extends JpaRepository<Customer, Long> {
        Customer findOneByUsername(String username);
    }
    
  • service

  • //注解方式
    @Service
    public class CustomerServiceTxInAnnotation {
      
        private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInAnnotation.class);
      
        @Autowired
        private CustomerRepository customerRepository;
      
        @Transactional
        public Customer create(Customer customer) {
            LOG.info("CustomerService In Annotation create customer:{}", customer.getUsername());
            if (customer.getId() != null) {
                throw new RuntimeException("用户已经存在");
            }
            customer.setUsername("Annotation:" + customer.getUsername());
            return customerRepository.save(customer);
        }
    }
      
    //代码方式
    @Service
    public class CustomerServiceTxInCode {
        private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInCode.class);
      
        @Autowired
        private CustomerRepository customerRepository;
        @Autowired
        private PlatformTransactionManager transactionManager;
      
        public Customer create(Customer customer) {
            LOG.info("CustomerService In Code create customer:{}", customer.getUsername());
            if (customer.getId() != null) {
                throw new RuntimeException("用户已经存在");
            }
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            def.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE);
            def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
            def.setTimeout(15);
            TransactionStatus status = transactionManager.getTransaction(def);
            try {
                customer.setUsername("Code:" + customer.getUsername());
                customerRepository.save(customer);
                transactionManager.commit(status);
                return customer;
            } catch (Exception e) {
                transactionManager.rollback(status);
                throw e;
            }
        }
    }
    
  • web

  • @RestController
    @RequestMapping("/api/customer")
    public class CustomerResource {
        private static final Logger LOG = LoggerFactory.getLogger(CustomerResource.class);
      
        @Autowired
        private CustomerServiceTxInAnnotation customerService;
        @Autowired
        private CustomerServiceTxInCode customerServiceInCode;
        @Autowired
        private CustomerRepository customerRepository;
      
        @PostMapping("/annotation")
        public Customer createInAnnotation(@RequestBody Customer customer) {
            LOG.info("CustomerResource create in annotation create customer:{}", customer.getUsername());
            return customerService.create(customer);
        }
      
        @PostMapping("/code")
        public Customer createInCode(@RequestBody Customer customer) {
            LOG.info("CustomerResource create in code create customer:{}", customer.getUsername());
            return customerServiceInCode.create(customer);
        }
      
        @GetMapping("")
        public List<Customer> getAll() {
            return customerRepository.findAll();
        }
      
    }
    

Jms事务实例

  • session原生事务

  • img

  • JmsTransactionManager事务

  • img

  • pom

  • <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
      
    	<groupId>com.imooc.example</groupId>
    	<artifactId>spring-trans-jms</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
      
    	<name>spring-trans-jms</name>
    	<description>Demo project for Spring JMS transaction</description>
      
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.9.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
      
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    	</properties>
      
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-activemq</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
      
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
      
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
      
      
    </project>
      
    
  • config

  • @EnableJms
    @Configuration
    public class JmsConfig {
        private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class);
      
        @Bean
        public JmsTemplate initJmsTemplate(ConnectionFactory connectionFactory) {
            LOG.debug("init jms template with converter.");
            JmsTemplate template = new JmsTemplate();
            template.setConnectionFactory(connectionFactory); // JmsTemplate使用的connectionFactory跟JmsTransactionManager使用的必须是同一个,不能在这里封装成caching之类的。
            return template;
        }
      
        // 这个用于设置 @JmsListener使用的containerFactory
        @Bean
        public JmsListenerContainerFactory<?> msgFactory(ConnectionFactory connectionFactory,
                                                         DefaultJmsListenerContainerFactoryConfigurer configurer,
                                                         PlatformTransactionManager transactionManager) {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setTransactionManager(transactionManager);
            factory.setCacheLevelName("CACHE_CONNECTION");
            factory.setReceiveTimeout(10000L);
            configurer.configure(factory, connectionFactory);
            return factory;
        }
      
        @Bean
        public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
            return new JmsTransactionManager(connectionFactory);
        }
      
    }
    
  • service

  • @Service
    public class CustomerService {
      
        private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class);
      
        @Autowired
        JmsTemplate jmsTemplate;
        @Autowired
        private PlatformTransactionManager transactionManager;
      
        @PostConstruct
        public void init() {
            jmsTemplate.setReceiveTimeout(3000);
        }
      
        @JmsListener(destination = "customer:msg:new", containerFactory = "msgFactory")
        public void handle(String msg) {
            LOG.debug("Get JMS message to from customer:{}", msg);
            String reply = "Replied - " + msg;
            jmsTemplate.convertAndSend("customer:msg:reply", reply);
            if (msg.contains("error")) {
                simulateError();
            }
        }
      
        @JmsListener(destination = "customer:msg2:new", containerFactory = "msgFactory")
        public void handle2(String msg) {
            LOG.debug("Get JMS message2 to from customer:{}", msg);
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            def.setTimeout(15);
            TransactionStatus status = transactionManager.getTransaction(def);
            try {
                String reply = "Replied-2 - " + msg;
                jmsTemplate.convertAndSend("customer:msg:reply", reply);
                if (!msg.contains("error")) {
                    transactionManager.commit(status);
                } else {
                    transactionManager.rollback(status);
                }
            } catch (Exception e) {
                transactionManager.rollback(status);
                throw e;
            }
        }
      
        private void simulateError() {
            throw new RuntimeException("some Data error.");
        }
    }
    
  • web

  • @RestController
    @RequestMapping("/api/customer")
    public class CustomerResource {
      
        @Autowired
        JmsTemplate jmsTemplate;
        @Autowired
        private CustomerService customerService;
      
        @PostMapping("/message1/listen")
        public void createMsgWithListener(@RequestParam String msg) {
            jmsTemplate.convertAndSend("customer:msg:new", msg);
        }
        @PostMapping("/message1/direct")
        public void createMsgDirect(@RequestParam String msg) {
            customerService.handle(msg);
        }
      
        @PostMapping("/message2/listen")
        public void createMsg2WithListener(@RequestParam String msg) {
            jmsTemplate.convertAndSend("customer:msg2:new", msg);
        }
        @PostMapping("/message2/direct")
        public void createMsg2Direct(@RequestParam String msg) {
            customerService.handle2(msg);
        }
      
        @GetMapping("/message")
        public String getMsg() {
            Object reply = jmsTemplate.receiveAndConvert("customer:msg:reply");
            return String.valueOf(reply);
        }
    }
    

Jta分布式事务实例

  • 本地事务
  • img
  • 外部全局事务
    • 外部事务管理器提供事务管理
    • 通过spring事务接口,调用外部管理器
    • 使用JNDI等方式获取外部事务管理器的实例
    • 外部事务管理器一般由应用服务器提供,如Jboss等
    • 外部事务管理器提供JTA事务管理
    • JTA事务管理器可以管理多个数据资源
    • 通过2阶段提交实现多数据源事务
  • image-20210907221517324
  • JTA事务管理的弊端
    • 两阶段提交
    • 事务实践太长,锁数据的实践太长
    • 低性能,低吞吐量
  • 不使用JTA实现多数据源的事务管理
    • spring事务同步机制
    • 多个数据源上实现近似事务一致性
    • 高性能,高吞吐量
单数据源
  • 使用spring JTA事务管理
  • Atomikos外部事务管理器提供JTA事务管理
  • 使用一个数据库-单数据源

  • pom

  • <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
      
    	<groupId>com.imooc.example</groupId>
    	<artifactId>spring-trans-jta</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
      
    	<name>spring-trans-jta</name>
    	<description>Demo project for Spring JTA transaction</description>
      
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.9.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
      
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    	</properties>
      
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-data-jpa</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-jta-atomikos</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>com.h2database</groupId>
    			<artifactId>h2</artifactId>
    			<scope>runtime</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
      
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
      
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
      
      
    </project>
      
    
  • dao

  • public interface CustomerRepository extends JpaRepository<Customer, Long> {
        Customer findOneByUsername(String username);
    }
    
  • service

  • //注解方式
    @Service
    public class CustomerServiceTxInAnnotation {
      
        private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInAnnotation.class);
      
        @Autowired
        private CustomerRepository customerRepository;
      
        @Transactional
        public Customer create(Customer customer) {
            LOG.info("CustomerService In Annotation create customer:{}", customer.getUsername());
            if (customer.getId() != null) {
                throw new RuntimeException("用户已经存在");
            }
            customer.setUsername("Annotation:" + customer.getUsername());
            return customerRepository.save(customer);
        }
    }
      
      
    //代码方式
    @Service
    public class CustomerServiceTxInCode {
      
        private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInCode.class);
      
        @Autowired
        private CustomerRepository customerRepository;
        @Autowired
        private PlatformTransactionManager transactionManager;
      
        public Customer create(Customer customer) {
            LOG.info("CustomerService In Code create customer:{}", customer.getUsername());
            if (customer.getId() != null) {
                throw new RuntimeException("用户已经存在");
            }
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
    //        def.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE);
            def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
            def.setTimeout(15);
            TransactionStatus status = transactionManager.getTransaction(def);
            try {
                customer.setUsername("Code:" + customer.getUsername());
                customerRepository.save(customer);
                transactionManager.commit(status);
                return customer;
            } catch (Exception e) {
                transactionManager.rollback(status);
                throw e;
            }
        }
    }
    
  • web

  • @RestController
    @RequestMapping("/api/customer")
    public class CustomerResource {
        private static final Logger LOG = LoggerFactory.getLogger(CustomerResource.class);
      
        @Autowired
        private CustomerServiceTxInAnnotation customerService;
        @Autowired
        private CustomerServiceTxInCode customerServiceInCode;
        @Autowired
        private CustomerRepository customerRepository;
      
        @PostMapping("/annotation")
        public Customer createInAnnotation(@RequestBody Customer customer) {
            LOG.info("CustomerResource create in annotation create customer:{}", customer.getUsername());
            return customerService.create(customer);
        }
      
        @PostMapping("/code")
        public Customer createInCode(@RequestBody Customer customer) {
            LOG.info("CustomerResource create in code create customer:{}", customer.getUsername());
            return customerServiceInCode.create(customer);
        }
      
        @GetMapping("")
        public List<Customer> getAll() {
            return customerRepository.findAll();
        }
      
    }
    
多数据源
  • 多个数据源:DB,MQ

  • pom

  • <dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-activemq</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-data-jpa</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-jta-atomikos</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>com.h2database</groupId>
    			<artifactId>h2</artifactId>
    			<scope>runtime</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
      
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
    
  • service

  • //注解方式
    @Service
    public class CustomerServiceTxInAnnotation {
      
        private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInAnnotation.class);
      
        @Autowired
        private CustomerRepository customerRepository;
        @Autowired
        private JmsTemplate jmsTemplate;
      
        @Transactional
        public Customer create(Customer customer) {
            LOG.info("CustomerService In Annotation create customer:{}", customer.getUsername());
            if (customer.getId() != null) {
                throw new RuntimeException("用户已经存在");
            }
            customer.setUsername("Annotation:" + customer.getUsername());
            jmsTemplate.convertAndSend("customer:msg:reply", customer.getUsername() + " created.");
            return customerRepository.save(customer);
        }
      
        @Transactional
        @JmsListener(destination = "customer:msg:new")
        public Customer createByListener(String name) {
            LOG.info("CustomerService In Annotation by Listener create customer:{}", name);
            Customer customer = new Customer();
            customer.setUsername("Annotation:" + name);
            customer.setRole("USER");
            customer.setPassword("111111");
      
            jmsTemplate.convertAndSend("customer:msg:reply", customer.getUsername() + " created.");
            return customerRepository.save(customer);
        }
      
    }
      
      
    //代码方式
    @Service
    public class CustomerServiceTxInCode {
      
        private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInCode.class);
      
        @Autowired
        private CustomerRepository customerRepository;
        @Autowired
        private PlatformTransactionManager transactionManager;
        @Autowired
        private JmsTemplate jmsTemplate;
      
        public Customer create(Customer customer) {
            LOG.info("CustomerService In Code create customer:{}", customer.getUsername());
            if (customer.getId() != null) {
                throw new RuntimeException("用户已经存在");
            }
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
            def.setTimeout(15);
            TransactionStatus status = transactionManager.getTransaction(def);
            try {
                customer.setUsername("Code:" + customer.getUsername());
                customerRepository.save(customer);
                jmsTemplate.convertAndSend("customer:msg:reply", customer.getUsername() + " created.");
                transactionManager.commit(status);
                return customer;
            } catch (Exception e) {
                transactionManager.rollback(status);
                throw e;
            }
        }
      
        @JmsListener(destination = "customer:msg2:new")
        public void createByListener(String name) {
            LOG.info("CustomerService In Code by Listener create customer:{}", name);
            Customer customer = new Customer();
            customer.setUsername("Code:" + name);
            customer.setRole("USER");
            customer.setPassword("111111");
      
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
            def.setTimeout(15);
            TransactionStatus status = transactionManager.getTransaction(def);
            try {
                customerRepository.save(customer);
                jmsTemplate.convertAndSend("customer:msg:reply", customer.getUsername() + " created.");
                transactionManager.commit(status);
            } catch (Exception e) {
                transactionManager.rollback(status);
                throw e;
            }
        }
    }
    
  • web

  • @RestController
    @RequestMapping("/api/customer")
    public class CustomerResource {
      
        @Autowired
        JmsTemplate jmsTemplate;
      
        @Autowired
        private CustomerServiceTxInAnnotation customerService;
        @Autowired
        private CustomerServiceTxInCode customerServiceInCode;
        @Autowired
        private CustomerRepository customerRepository;
      
        @PostMapping("/annotation")
        public Customer createInAnnotation(@RequestBody Customer customer) {
            return customerService.create(customer);
        }
      
        @PostMapping("/code")
        public Customer createInCode(@RequestBody Customer customer) {
            return customerServiceInCode.create(customer);
        }
      
        @Transactional
        @PostMapping("/message/annotation")
        public void createMsgWithListener(@RequestParam String userName) {
            jmsTemplate.convertAndSend("customer:msg:new", userName);
        }
      
        @Transactional
        @PostMapping("/message/code")
        public void createMsgDirect(@RequestParam String userName) {
            jmsTemplate.convertAndSend("customer:msg2:new", userName);
        }
      
        @GetMapping("")
        public List<Customer> getAll() {
            return customerRepository.findAll();
        }
      
    }
    

模式和技术

  • JTA实现多服务的分布式事务
  • img
  • 不使用JTA,消息驱动的分布式事务
  • img
  • 如何选择
    • 强一致性事务:JTA性能最差,只适用于单个服务内
    • 弱,最终一致性事务:最大努力一次提交,链式事务(设计相应的错误处理机制)
    • MQ-DB:最大努力一次提交+重试
    • 多个DB:链式事务管理
    • 多个数据源:链式事务,或其他事务同步方式
  • 实现模式
    • 消息驱动模式:Message Driven
    • 事件溯源模式:Event Sourcing
    • TCC模式:Try-Confirm-Cancel
  • 幂等性
    • 幂等操作:任意多次执行所产生的影响,与一次执行的影响相同
    • 方法的幂等性:使用同样的参数调用一个方法多次,与调用一次结果相同
    • 接口的幂等性:接口被重复调用,结果一致
  • 微服务接口的幂等性
    • 重要性:经常需要通过重试实现分布式事务的最终一致性
    • get方法不会对系统产生副作用,具有幂等性
    • post,put,delete方法的实现需要满足幂等性

image-20210908222708151

  • 分布式系统唯一性id:guid
    • 分布式系统的全局唯一标识
    • uuid:生成唯一id的规范
    • 用于唯一标识,处理重复消息
    • 数据库自增序列
    • uuid:唯一id标准,128位,几种版本
    • mongodb的objectId:时间戳+机器id+进程id+序号
    • redis的incr操作,zookeeper节点的版本号
  • 使用哪种方式
    • 自增id:考虑安全性,部署
    • 时间有序:便于通过id判断创建时间
    • 长度,是否数字类型:是否建立索引
  • 分布式系统分布式对象
    • redis:redission库:rlock,rmap,rqueue等对象
    • zookeeper:netfilx curator库:lock,queue等对象

实例1:DatasourceTransactionManager

  • MySQL+MySQL

  • 链式事务:DatasourceTransactionManager

  • 不处理重试

  • pom

  • <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
      
    	<groupId>com.imooc.example</groupId>
    	<artifactId>spring-dtx-db-db</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
      
    	<name>spring-dtx-db-db</name>
    	<description>Demo project for Spring distributed transaction for DB-DB</description>
      
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.9.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
      
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    	</properties>
      
    	<dependencies>
      
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-jdbc</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>com.zaxxer</groupId>
    			<artifactId>HikariCP</artifactId>
    			<version>2.7.8</version>
    		</dependency>
    		<dependency>
    			<groupId>mysql</groupId>
    			<artifactId>mysql-connector-java</artifactId>
    			<version>5.1.39</version>
    		</dependency>
      
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
            <dependency>
                <groupId>org.springframework.data</groupId>
                <artifactId>spring-data-commons</artifactId>
                <version>1.13.7.RELEASE</version>
            </dependency>
        </dependencies>
      
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
      
      
    </project>
      
    
  • yml

  •   
    # 默认的Datasource配置
    # spring.datasource.url = jdbc:mysql://localhost:3307/imooc_user
    # spring.datasource.username = root
    # spring.datasource.password = 123456
    # spring.datasource.driverClassName=com.mysql.jdbc.Driver
      
    spring.ds_user.url = jdbc:mysql://localhost:3306/imooc_user
    spring.ds_user.username = root
    spring.ds_user.password = 123456
    spring.ds_user.driver-class-name = com.mysql.jdbc.Driver
      
    spring.ds_order.url=jdbc:mysql://localhost:3307/imooc_order
    spring.ds_order.username=root
    spring.ds_order.password=123456
    spring.ds_order.driver-class-name=com.mysql.jdbc.Driver
    
  • service

  • @Service
    public class CustomerService {
      
        @Autowired
        @Qualifier("userJdbcTemplate")
        private JdbcTemplate userJdbcTemplate;
      
        @Autowired
        @Qualifier("orderJdbcTemplate")
        private JdbcTemplate orderJdbcTemplate;
      
        private static final String SQL_UPDATE_DEPOSIT = "UPDATE customer SET deposit = deposit - ? where id = ?";
        private static final String SQL_CREATE_ORDER = "INSERT INTO customer_order (customer_id, title, amount) VALUES (?, ?, ?)";
      
        @Transactional
        public void createOrder(Order order) {
            userJdbcTemplate.update(SQL_UPDATE_DEPOSIT, order.getAmount(), order.getCustomerId());
            if (order.getTitle().contains("error1")) {
                throw new RuntimeException("Error1");
            }
            orderJdbcTemplate.update(SQL_CREATE_ORDER, order.getCustomerId(), order.getTitle(), order.getAmount());
            if (order.getTitle().contains("error2")) {
                throw new RuntimeException("Error2");
            }
        }
      
      
        public Map userInfo(Long customerId) {
            Map customer = userJdbcTemplate.queryForMap("SELECT * from customer where id = " + customerId);
            List orders = orderJdbcTemplate.queryForList("SELECT * from customer_order where customer_id = " + customerId);
      
            Map result = new HashMap();
            result.put("customer", customer);
            result.put("orders", orders);
            return result;
        }
      
    }
      
    
  • config

  • @Configuration
    public class DBConfiguration {
      
        @Bean
        @Primary //默认使用这个DataSourceProperties
        @ConfigurationProperties(prefix = "spring.ds_user")
        public DataSourceProperties userDataSourceProperties() {
            return new DataSourceProperties();
        }
      
        @Bean
        @Primary
        public DataSource userDataSource() {
            return userDataSourceProperties().initializeDataSourceBuilder().type(HikariDataSource.class).build();
        }
      
        //链式事务管理器,把两个事务管理器连接起来
        @Bean
        public PlatformTransactionManager transactionManager() {
            DataSourceTransactionManager userTM = new DataSourceTransactionManager(userDataSource());
            PlatformTransactionManager orderTM = new DataSourceTransactionManager(orderDataSource());
            ChainedTransactionManager tm = new ChainedTransactionManager(userTM, orderTM);
            return tm;
        }
      
        @Bean
        public JdbcTemplate userJdbcTemplate(@Qualifier("userDataSource") DataSource userDataSource) {
            return new JdbcTemplate(userDataSource);
        }
      
        @Bean
        @ConfigurationProperties(prefix = "spring.ds_order")
        public DataSourceProperties orderDataSourceProperties() {
            return new DataSourceProperties();
        }
      
        @Bean
        public DataSource orderDataSource() {
            return orderDataSourceProperties().initializeDataSourceBuilder().type(HikariDataSource.class).build();
        }
      
        @Bean
        public JdbcTemplate orderJdbcTemplate(@Qualifier("orderDataSource") DataSource orderDataSource) {
            return new JdbcTemplate(orderDataSource);
        }
    }
    
  • web

  • @RestController
    @RequestMapping("/api/customer")
    public class CustomerResource {
      
        @Autowired
        private CustomerService customerService;
      
        @PostMapping("/order")
        public void create(@RequestBody Order order) {
            customerService.createOrder(order);
        }
      
        @GetMapping("/{id}")
        public Map userInfo(@PathVariable Long id) {
            return customerService.userInfo(id);
        }
      
    }
      
    

实例2:JpaTransactionManager

  • MySQL+MySQL

  • 链式事务:JpaTransactionManager

  • 不处理重试

  • pom

  • <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
      
    	<groupId>com.imooc.example</groupId>
    	<artifactId>spring-dtx-jpa-db</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
      
    	<name>spring-dtx-jpa-db</name>
    	<description>Demo project for Spring distributed transaction for DB-DB</description>
      
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.9.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
      
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    	</properties>
      
    	<dependencies>
      
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-data-jpa</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>mysql</groupId>
    			<artifactId>mysql-connector-java</artifactId>
    			<version>5.1.39</version>
    		</dependency>
      
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
      
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
      
      
    </project>
      
    
  • service

  • @Service
    public class CustomerService {
      
        @Autowired
        private CustomerRepository customerRepository;
      
        @Autowired
        @Qualifier("orderJdbcTemplate")
        private JdbcTemplate orderJdbcTemplate;
      
        private static final String SQL_CREATE_ORDER = "INSERT INTO customer_order (customer_id, title, amount) VALUES (?, ?, ?)";
      
        @Transactional
        public void createOrder(Order order) {
            Customer customer = customerRepository.findOne(order.getCustomerId());
            customer.setDeposit(customer.getDeposit() - order.getAmount());
            customerRepository.save(customer);
            if (order.getTitle().contains("error1")) {
                throw new RuntimeException("Error1");
            }
            orderJdbcTemplate.update(SQL_CREATE_ORDER, order.getCustomerId(), order.getTitle(), order.getAmount());
      
            if (order.getTitle().contains("error2")) {
                throw new RuntimeException("Error2");
            }
        }
      
      
        public Map userInfo(Long customerId) {
            Customer customer = customerRepository.findOne(customerId);
            List orders = orderJdbcTemplate.queryForList("SELECT * from customer_order where customer_id = " + customerId);
      
            Map result = new HashMap();
            result.put("customer", customer);
            result.put("orders", orders);
            return result;
        }
      
    }
    
  • config

  • @Configuration
    public class DBConfiguration {
      
        @Bean(name = "userDataSource")
        @Primary
        @ConfigurationProperties(prefix = "spring.ds_user")
        public DataSource userDataSource() {
            return DataSourceBuilder.create().build();
        }
      
        @Bean(name = "orderDataSource")
        public DataSource orderDataSource() {
            return DataSourceBuilder.create().build();
        }
      
        @Bean
        public JdbcTemplate orderJdbcTemplate(@Qualifier("orderDataSource") DataSource orderDataSource) {
            return new JdbcTemplate(orderDataSource);
        }
      
        @Bean
        public LocalContainerEntityManagerFactoryBean entityManagerFactory() {
            HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
            vendorAdapter.setGenerateDdl(false); // we have already created tables.
      
            LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();
            factory.setJpaVendorAdapter(vendorAdapter);
            factory.setPackagesToScan("com.imooc.example");
            factory.setDataSource(userDataSource());
            return factory;
        }
      
        //链式事务管理器
        @Bean
        public PlatformTransactionManager transactionManager() {
            JpaTransactionManager userTM = new JpaTransactionManager();
            userTM.setEntityManagerFactory(entityManagerFactory().getObject());
            PlatformTransactionManager orderTM = new DataSourceTransactionManager(orderDataSource());
            ChainedTransactionManager tm = new ChainedTransactionManager(userTM, orderTM);
            return tm;
        }
    }
    
  • web

  • @RestController
    @RequestMapping("/api/customer")
    public class CustomerResource {
      
        @Autowired
        private CustomerService customerService;
      
        @PostMapping("/order")
        public void create(@RequestBody Order order) {
            customerService.createOrder(order);
        }
      
        @GetMapping("/{id}")
        public Map userInfo(@PathVariable Long id) {
            return customerService.userInfo(id);
        }
      
    }
    

实例3:activeMQ+MySQL

  • JMS-DB

  • activeMQ+MySQL

  • 最大努力一次提交:TransactionAwareConnectionFactoryProxy

  • pom

  • <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
      
    	<groupId>com.imooc.example</groupId>
    	<artifactId>spring-dtx-jms-db</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
      
    	<name>spring-dtx-jms-db</name>
    	<description>Demo project for Spring distributed transaction for DB and MQ resources.</description>
      
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.9.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
      
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    	</properties>
      
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-activemq</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-data-jpa</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>mysql</groupId>
    			<artifactId>mysql-connector-java</artifactId>
    			<version>5.1.39</version>
    		</dependency>
      
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
      
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
      
      
    </project>
      
    
  • yaml

  •   
    spring.datasource.url = jdbc:mysql://localhost:3306/imooc_user
    spring.datasource.username = root
    spring.datasource.password = 123456
    spring.datasource.driverClassName=com.mysql.jdbc.Driver
      
    spring.activemq.broker-url = tcp://localhost:61616
    
  • service

  • @Service
    public class CustomerService {
      
        private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class);
      
        @Autowired
        private CustomerRepository customerRepository;
        @Autowired
        private JmsTemplate jmsTemplate;
      
        @Transactional
        @JmsListener(destination = "customer:msg:new")
        public void handle(String msg) {
            LOG.info("Get msg1:{}", msg);
            Customer customer = new Customer();
            customer.setUsername(msg);
            customer.setDeposit(100);
            customerRepository.save(customer);
            if (msg.contains("error1")) {
                throw new RuntimeException("Error1");
            }
      
            jmsTemplate.convertAndSend("customer:msg:reply", msg + " created.");
            if (msg.contains("error2")) {
                throw new RuntimeException("Error2");
            }
        }
      
        @Transactional
        public Customer create(Customer customer) {
            LOG.info("CustomerService In Annotation create customer:{}", customer.getUsername());
            if (customer.getId() != null) {
                throw new RuntimeException("用户已经存在");
            }
            customer.setUsername("Annotation:" + customer.getUsername());
            customer = customerRepository.save(customer);
            if (customer.getUsername().contains("error1")) {
                throw new RuntimeException("Error1");
            }
            jmsTemplate.convertAndSend("customer:msg:reply", customer.getUsername() + " created.");
            if (customer.getUsername().contains("error2")) {
                throw new RuntimeException("Error2");
            }
      
            return customer;
        }
      
    }
    
  • config

  • @Configuration
    public class JmsConfiguration {
      
        @Bean
        public JmsTemplate initJmsTemplate(ConnectionFactory connectionFactory) {
            JmsTemplate template = new JmsTemplate();
            template.setConnectionFactory(connectionFactory);
            template.setSessionTransacted(true);
            return template;
        }
      
        //把JMS事务同步到JPA事务上
        @Bean
        public ConnectionFactory connectionFactory() {
            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
            TransactionAwareConnectionFactoryProxy proxy = new TransactionAwareConnectionFactoryProxy();
            proxy.setTargetConnectionFactory(cf);
            proxy.setSynchedLocalTransactionAllowed(true);
            return proxy;
        }
      
    }
    
  • web

  • @RestController
    @RequestMapping("/api/customer")
    public class CustomerResource {
      
        @Autowired
        JmsTemplate jmsTemplate;
      
        @Autowired
        private CustomerService customerService;
      
        @Autowired
        private CustomerRepository customerRepository;
      
        //通过jpa创建
        @PostMapping("/annotation")
        public Customer createInAnnotation(@RequestBody Customer customer) {
            return customerService.create(customer);
        }
      
        //通过activeMQ创建
        @PostMapping("/msg")
        public void create(@RequestBody Customer customer) {
            jmsTemplate.convertAndSend("customer:msg:new", customer.getUsername());
        }
      
      
        @GetMapping("")
        public List<Customer> getAll() {
            return customerRepository.findAll();
        }
      
        @GetMapping("/msg")
        public String getMsg() {
            jmsTemplate.setReceiveTimeout(3000);
            Object reply = jmsTemplate.receiveAndConvert("customer:msg:reply");
            return String.valueOf(reply);
        }
      
    }
    

消息驱动模型

  • 消息驱动模型
  • img
  • 注意
    • 消息中间件需要支持事务
    • 如果处理重试的消息
    • 发生业务异常时回滚操作
  • 系统错误的处理
    • 方法1:将出错未处理的消息写到失败队列,进行相应回滚操作
    • 方法2:通过定时任务检查超时订单,对未完成的订单做自动回滚
    • 方法3:保存出错消息,人工处理

卖票实例

  • order服务,user服务,ticket服务
  • activeMQ作为消息中间件
  • 错误处理:定时任务检查超时并回滚
  • 幂等性:实现方法的幂等性
  • img
  • 流程
  • image-20210909225053250
  • img
  • 失败场景
  • image-20210909225307575
  • 异常订单处理
    • 定时处理异常订单:1未被处理完成 2未被按失败订单处理
    • 解锁票,撤销交票
    • 对于余额之类的重要数据,可能使用人工处理
  • 实现锁票的安全性
    • 利用@JmsListener设置一个消费者,不适用于多实例
    • 使用事务和数据库锁的特性
    • 分布式锁

register服务

  • pom

  • <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
      
    	<groupId>com.imooc.example</groupId>
    	<artifactId>registry</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
      
    	<name>registry</name>
    	<description>Demo project for Spring Boot</description>
      
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.10.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
      
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    		<spring-cloud.version>Edgware.SR2</spring-cloud.version>
    	</properties>
      
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-security</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-eureka-server</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-hystrix-dashboard</artifactId>
    		</dependency>
      
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
      
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>${spring-cloud.version}</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
      
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
      
      
    </project>
      
    
  • yaml

  • server:
      port: 8761
    spring:
      application:
        name: registry
    security:
      basic:
        enabled: true
      user:
        name: imooc
        password: 111111
    eureka:
      client:
        register-with-eureka: false
        fetch-registry: false
        serviceUrl:
          defaultZone: http://imooc:111111@localhost:${server.port}/eureka/
    
  • application

  • @SpringBootApplication
    @EnableEurekaServer
    @EnableHystrixDashboard
    public class RegistryApplication {
      
    	public static void main(String[] args) {
    		SpringApplication.run(RegistryApplication.class, args);
    	}
    }
    

proxy服务

  • pom

  • <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
      
    	<groupId>com.imooc.example</groupId>
    	<artifactId>proxy</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
      
    	<name>proxy</name>
    	<description>Demo project for Spring Boot</description>
      
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.10.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
      
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    		<spring-cloud.version>Edgware.SR2</spring-cloud.version>
    	</properties>
      
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-eureka</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-zuul</artifactId>
    		</dependency>
      
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
      
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>${spring-cloud.version}</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
      
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
      
      
    </project>
      
    
  • yaml

  • server:
      port: 8888
    spring:
      application:
        name: proxy
      
    eureka:
      client:
        serviceUrl:
          defaultZone: http://imooc:111111@localhost:8761/eureka/
      
    #zuul:
    #    routes:
    #        userApi:
    #            path: /home/**
    #            stripPrefix: false
    #            serviceId: user
    
  • application

  • @SpringBootApplication
    @EnableEurekaClient
    @EnableZuulProxy
    public class ProxyApplication {
      
    	public static void main(String[] args) {
    		SpringApplication.run(ProxyApplication.class, args);
    	}
    }
    

order服务

  • pom

  • <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
      
    	<groupId>com.imooc.example</groupId>
    	<artifactId>order</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
      
    	<name>order</name>
    	<description>Demo project for Spring Boot</description>
      
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.10.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
      
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    		<spring-cloud.version>Edgware.SR2</spring-cloud.version>
    	</properties>
      
    	<dependencies>
    		<dependency>
    			<groupId>com.imooc.example</groupId>
    			<artifactId>service</artifactId>
    			<version>1.0-SNAPSHOT</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-eureka</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-activemq</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-data-jpa</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>mysql</groupId>
    			<artifactId>mysql-connector-java</artifactId>
    			<version>5.1.39</version>
    		</dependency>
    		<dependency>
    			<groupId>com.fasterxml.uuid</groupId>
    			<artifactId>java-uuid-generator</artifactId>
    			<version>3.1.5</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
      
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
      
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>${spring-cloud.version}</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
      
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
      
      
    </project>
      
    
  • yaml

  • server:
      port: 8082
    spring:
      application:
        name: order
      datasource:
        url: jdbc:mysql://localhost:3306/imooc_order
        username: root
        password: 123456
        driver-class-name: com.mysql.jdbc.Driver
      jpa:
        hibernate:
          ddl-auto: update
      
    eureka:
      client:
        serviceUrl:
          defaultZone: http://imooc:111111@localhost:8761/eureka/
    
  • po

  • @Entity(name = "customer_order")
    public class Order {
      
        @Id
        @GeneratedValue
        private Long id;
      
        private String uuid;
      
        private Long customerId;
      
        private String title;
      
        private Long ticketNum;
      
        private int amount;
      
        private String status;
      
        private String reason;
      
        private ZonedDateTime createdDate;
    }
    
  • dao

  • public interface OrderRepository extends JpaRepository<Order, Long> {
      
        List<Order> findAllByCustomerId(Long customerId);
      
        List<Order> findAllByStatusAndCreatedDateBefore(String status, ZonedDateTime checkTime);
      
        Order findOneByUuid(String uuid);
    }
    
  • service

  • @Service
    public class OrderService {
      
        private static final Logger LOG = LoggerFactory.getLogger(OrderService.class);
      
        @Autowired
        private JmsTemplate jmsTemplate;
        @Autowired
        private OrderRepository orderRepository;
      
        //锁票成功,开始处理订单
        @Transactional
        @JmsListener(destination = "order:locked", containerFactory = "msgFactory")
        public void handle(OrderDTO msg) {
            LOG.info("Get new order to create:{}", msg);
            if (orderRepository.findOneByUuid(msg.getUuid()) != null) { // 通过保存到数据库,来使用uuid处理重复消息
                LOG.info("Msg already processed:{}", msg);
            } else {
                Order order = newOrder(msg);
                orderRepository.save(order);
                msg.setId(order.getId());
            }
            msg.setStatus("NEW");
            //订单创建完毕,开始支付,由user服务处理
            jmsTemplate.convertAndSend("order:pay", msg);
        }
      
        private Order newOrder(OrderDTO dto) {
            Order order = new Order();
            order.setUuid(dto.getUuid());
            order.setAmount(dto.getAmount());
            order.setTitle(dto.getTitle());
            order.setCustomerId(dto.getCustomerId());
            order.setTicketNum(dto.getTicketNum());
            order.setStatus("NEW");
            order.setCreatedDate(ZonedDateTime.now());
            return order;
        }
      
        //订单处理完成,入库
        @Transactional
        @JmsListener(destination = "order:finish", containerFactory = "msgFactory")
        public void handleFinish(OrderDTO msg) {
            LOG.info("Get finished order:{}", msg);
            Order order = orderRepository.findOne(msg.getId());
            order.setStatus("FINISH");
            orderRepository.save(order);
        }
      
        /**
         * 订单失败的几种情况:
         * 1. 一开始索票失败。
         * 2. 扣费失败后,解锁票,然后出发
         * 3. 定时任务检测到订单超时
         * @param msg
         */
        @Transactional
        @JmsListener(destination = "order:fail", containerFactory = "msgFactory")
        public void handleFailed(OrderDTO msg) {
            LOG.info("Get failed order:{}", msg);
            Order order;
            if (msg.getId() == null) {
                order = newOrder(msg);
                order.setReason("TICKET_LOCK_FAIL"); // 锁票失败,可能票id不对,或者已被别人买走
            } else {
                order = orderRepository.findOne(msg.getId());
                if (msg.getStatus().equals("NOT_ENOUGH_DEPOSIT")) {
                    order.setReason("NOT_ENOUGH_DEPOSIT");
                }
            }
            order.setStatus("FAIL");
            orderRepository.save(order);
        }
      
        @Scheduled(fixedDelay = 10000L)
        public void checkInvalidOrder() {
            ZonedDateTime checkTime = ZonedDateTime.now().minusMinutes(1L);
            List<Order> orders = orderRepository.findAllByStatusAndCreatedDateBefore("NEW", checkTime);
            orders.stream().forEach(order -> {
                LOG.error("Order timeout:{}", order);
                OrderDTO dto = new OrderDTO();
                dto.setId(order.getId());
                dto.setTicketNum(order.getTicketNum());
                dto.setUuid(order.getUuid());
                dto.setAmount(order.getAmount());
                dto.setTitle(order.getTitle());
                dto.setCustomerId(order.getCustomerId());
                dto.setStatus("TIMEOUT");
                jmsTemplate.convertAndSend("order:ticket_error", dto);
            });
        }
    }
    
  • config

  • @Configuration
    public class JmsConfig {
      
        @Bean
        public ConnectionFactory connectionFactory() {
            ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
            TransactionAwareConnectionFactoryProxy proxy = new TransactionAwareConnectionFactoryProxy();
            proxy.setTargetConnectionFactory(cf);
            proxy.setSynchedLocalTransactionAllowed(true);
            return proxy;
        }
      
        @Bean
        public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, MessageConverter jacksonJmsMessageConverter) {
            JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
            jmsTemplate.setMessageConverter(jacksonJmsMessageConverter);
            jmsTemplate.setSessionTransacted(true);
            return jmsTemplate;
        }
      
        @Bean
        public JmsListenerContainerFactory<?> msgFactory(ConnectionFactory cf,
                                                         PlatformTransactionManager transactionManager,
                                                         DefaultJmsListenerContainerFactoryConfigurer configurer) {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            configurer.configure(factory, cf);
            factory.setReceiveTimeout(10000L);
            // FIXME: 使用非内置的activeMQ服务器,这个就不需要,
    //        factory.setCacheLevelName("CACHE_CONNECTION");
            factory.setTransactionManager(transactionManager);
            factory.setConcurrency("10");
            return factory;
        }
      
        @Bean
        public MessageConverter jacksonJmsMessageConverter() {
            MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
            converter.setTargetType(MessageType.TEXT);
            converter.setTypeIdPropertyName("_type");
            return converter;
        }
    }
    
  • web

  • @RestController
    @RequestMapping("/api/order")
    public class OrderResource implements OrderCompositeService {
      
        @Autowired
        private OrderRepository orderRepository;
        @Autowired
        private JmsTemplate jmsTemplate;
      
        private TimeBasedGenerator uuidGenerator = Generators.timeBasedGenerator();
      
        //入口,创建订单
        @Transactional
        @PostMapping("")
        public void create(@RequestBody OrderDTO dto) {
            dto.setUuid(uuidGenerator.generate().toString());
            //MQ发送创建订单请求,由ticket服务处理
            jmsTemplate.convertAndSend("order:new", dto);
        }
      
        @GetMapping("/{customerId}")
        public List<OrderDTO> getMyOrder(@PathVariable Long customerId) {
            List<Order> orders = orderRepository.findAllByCustomerId(customerId);
            return orders.stream().map(order -> {
                OrderDTO dto = new OrderDTO();
                dto.setId(order.getId());
                dto.setStatus(order.getStatus());
                dto.setTicketNum(order.getTicketNum());
                dto.setAmount(order.getAmount());
                dto.setCustomerId(order.getCustomerId());
                dto.setTitle(order.getTitle());
                dto.setUuid(order.getUuid());
                return dto;
            }).collect(Collectors.toList());
        }
      
        @GetMapping("")
        public List<Order> getAll() {
            return orderRepository.findAll();
        }
      
      
    }
    
  • application

  • @SpringBootApplication
    @EnableDiscoveryClient
    public class OrderApplication {
      
    	public static void main(String[] args) {
    		SpringApplication.run(OrderApplication.class, args);
    	}
    }
    

user服务

  • pom

  • <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
      
    	<groupId>com.imooc.example</groupId>
    	<artifactId>user</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
      
    	<name>user</name>
    	<description>Demo project for Spring Boot</description>
      
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.10.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
      
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    		<spring-cloud.version>Edgware.SR2</spring-cloud.version>
    	</properties>
      
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-feign</artifactId>
    			<!--<artifactId>spring-cloud-starter-openfeign</artifactId> 如果feign client出错,可能需要这个-->
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-eureka</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-activemq</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-hystrix</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-actuator</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-data-jpa</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>mysql</groupId>
    			<artifactId>mysql-connector-java</artifactId>
    			<version>5.1.39</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
      
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    		<dependency>
    			<groupId>com.imooc.example</groupId>
    			<artifactId>service</artifactId>
    			<version>1.0-SNAPSHOT</version>
    		</dependency>
    	</dependencies>
      
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>${spring-cloud.version}</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
      
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
      
      
    </project>
      
    
  • yaml

  • server:
      port: 8081
    spring:
      application:
        name: user
      datasource:
        url: jdbc:mysql://localhost:3306/imooc_user
        username: root
        password: 123456
        driver-class-name: com.mysql.jdbc.Driver
      jpa:
        hibernate:
          ddl-auto: update
      
    eureka:
      client:
        serviceUrl:
          defaultZone: http://imooc:111111@localhost:8761/eureka/
    
  • po

  • @Entity(name = "customer")
    public class Customer {
      
        @Id
        @GeneratedValue
        private Long id;
      
        @Column(name = "user_name")
        private String username;
      
        private String password;
      
        private String role;
      
        private int deposit;
    }
      
      
      
      
    @Entity(name = "pay_info")
    public class PayInfo {
      
        @Id
        @GeneratedValue
        private Long id;
      
        @Column(name = "order_id")
        private Long orderId;
      
        private String status;
      
        private int amount;
      
    }
    
  • dao

  • public interface CustomerRepository extends JpaRepository<Customer, Long> {
      
        Customer findOneByUsername(String username);
      
        @Modifying
        @Query("UPDATE customer SET deposit = deposit - ?2 WHERE id = ?1")
        int charge(Long customerId, int amount);
    }
      
      
      
    public interface PayInfoRepository extends JpaRepository<PayInfo, Long> {
      
        PayInfo findOneByOrderId(Long orderId);
    }
    
  • service

  • @Service
    public class CustomerService {
      
        private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class);
      
        @Autowired
        private JmsTemplate jmsTemplate;
        @Autowired
        private CustomerRepository customerRepository;
        @Autowired
        private PayInfoRepository payInfoRepository;
      
        //订单创建完毕,开始付款
        @Transactional
        @JmsListener(destination = "order:pay", containerFactory = "msgFactory")
        public void handle(OrderDTO msg) {
            LOG.info("Get new order to pay:{}", msg);
            // 先检查payInfo判断重复消息。
            PayInfo pay = payInfoRepository.findOneByOrderId(msg.getId());
            if (pay != null) {
                LOG.warn("Order already paid, duplicated message.");
                return;
            }
      
            Customer customer = customerRepository.findOne(msg.getCustomerId());
            if (customer.getDeposit() < msg.getAmount()) {
                LOG.info("No enough deposit, need amount:{}", msg.getAmount());
                msg.setStatus("NOT_ENOUGH_DEPOSIT");
                //余额不足
                jmsTemplate.convertAndSend("order:ticket_error", msg);
                return;
            }
      
            pay = new PayInfo();
            pay.setOrderId(msg.getId());
            pay.setAmount(msg.getAmount());
            pay.setStatus("PAID");
            payInfoRepository.save(pay);
    //        customer.setDeposit(customer.getDeposit() - msg.getAmount());
    //        customerRepository.save(customer); // 如果用户下了2个订单,这个handle方法不是单线程处理,或者有多个实例,又刚好这2个请求被同时处理,
            customerRepository.charge(msg.getCustomerId(), msg.getAmount());
      
            msg.setStatus("PAID");
            //支付完成,由ticket服务处理
            jmsTemplate.convertAndSend("order:ticket_move", msg);
        }
    }
    
  • config

  • @Configuration
    public class JmsConfig {
      
        @Bean
        public ConnectionFactory connectionFactory() {
            ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
            TransactionAwareConnectionFactoryProxy proxy = new TransactionAwareConnectionFactoryProxy();
            proxy.setTargetConnectionFactory(cf);
            proxy.setSynchedLocalTransactionAllowed(true);
            return proxy;
        }
      
        @Bean
        public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, MessageConverter jacksonJmsMessageConverter) {
            JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
            jmsTemplate.setMessageConverter(jacksonJmsMessageConverter);
            jmsTemplate.setSessionTransacted(true);
            return jmsTemplate;
        }
      
        @Bean
        public JmsListenerContainerFactory<?> msgFactory(ConnectionFactory cf,
                                                         PlatformTransactionManager transactionManager,
                                                         DefaultJmsListenerContainerFactoryConfigurer configurer) {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            configurer.configure(factory, cf);
            factory.setReceiveTimeout(10000L);
    //        factory.setCacheLevelName("CACHE_CONNECTION");
            factory.setTransactionManager(transactionManager);
            factory.setConcurrency("10");
            return factory;
        }
      
        @Bean
        public MessageConverter jacksonJmsMessageConverter() {
            MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
            converter.setTargetType(MessageType.TEXT);
            converter.setTypeIdPropertyName("_type");
            return converter;
        }
    }
    
  • web

  • @RestController
    @RequestMapping("/api/customer")
    public class CustomerResource {
      
        @PostConstruct
        public void init() {
            if (customerRepository.count() > 0) {
                return;
            }
            Customer customer = new Customer();
            customer.setUsername("imooc");
            customer.setPassword("111111");
            customer.setRole("User");
            customer.setDeposit(1000);
            customerRepository.save(customer);
        }
      
        @Autowired
        private CustomerRepository customerRepository;
        @Autowired
        private OrderCompositeService orderClient;
        @Autowired
        private TicketCompositeService ticketClient;
      
        @PostMapping("")
        public Customer create(@RequestBody Customer customer) {
            return customerRepository.save(customer);
        }
      
        @GetMapping("")
        @HystrixCommand
        public List<Customer> getAll() {
            return customerRepository.findAll();
        }
      
        @GetMapping("/my")
        @HystrixCommand
        public Map getMyInfo() {
            Customer customer = customerRepository.findOneByUsername("imooc");
            List<OrderDTO> orders = orderClient.getMyOrder(customer.getId());
            List<TicketDTO> tickets = ticketClient.getMyTickets(customer.getId());
            Map result = new HashMap();
            result.put("customer", customer);
            result.put("orders", orders);
            result.put("tickets", tickets);
            return result;
        }
      
    }
    
  • application

  • @SpringBootApplication
    @EnableDiscoveryClient
    @EnableHystrix
    @EnableFeignClients
    public class UserApplication {
      
    	public static void main(String[] args) {
    		SpringApplication.run(UserApplication.class, args);
    	}
    }
    

ticket服务

  • pom

  • <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
      
    	<groupId>com.imooc.example</groupId>
    	<artifactId>ticket</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
      
    	<name>ticket</name>
    	<description>Demo project for Spring Boot</description>
      
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.10.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
      
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    		<spring-cloud.version>Edgware.SR2</spring-cloud.version>
    	</properties>
      
    	<dependencies>
    		<dependency>
    			<groupId>com.imooc.example</groupId>
    			<artifactId>service</artifactId>
    			<version>1.0-SNAPSHOT</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-openfeign</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-eureka</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-activemq</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-data-jpa</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>mysql</groupId>
    			<artifactId>mysql-connector-java</artifactId>
    			<version>5.1.39</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
      
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
      
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>${spring-cloud.version}</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
      
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
      
      
    </project>
      
    
  • yaml

  • server:
      port: 8083
    spring:
      application:
        name: ticket
      datasource:
        url: jdbc:mysql://localhost:3306/imooc_ticket
        username: root
        password: 123456
        driver-class-name: com.mysql.jdbc.Driver
      jpa:
        hibernate:
          ddl-auto: update
    eureka:
      client:
        serviceUrl:
          defaultZone: http://imooc:111111@localhost:8761/eureka/
    
  • po

  • @Entity(name = "ticket")
    public class Ticket {
      
        @Id
        @GeneratedValue
        private Long id;
      
        private Long ticketNum;
      
        private String name;
      
        private Long lockUser;
      
        private Long owner;
      
    }
    
  • dao

  • public interface TicketRepository extends JpaRepository<Ticket, Long> {
      
        List<Ticket> findAllByOwner(Long owner);
      
        Ticket findOneByTicketNum(Long num);
      
    //    @Modifying(clearAutomatically = true)
        @Modifying
        @Query("UPDATE ticket SET lockUser = ?1 WHERE lockUser is NULL and ticketNum = ?2")
        int lockTicket(Long customerId, Long ticketNum);
      
        @Modifying
        @Query("UPDATE ticket SET lockUser = null WHERE lockUser = ?1 and ticketNum = ?2")
        int unLockTicket(Long customerId, Long ticketNum);
      
        @Modifying
        @Query("UPDATE ticket SET owner = ?1, lockUser = null WHERE lockUser = ?1 and ticketNum = ?2")
        int moveTicket(Long customerId, Long ticketNum);
      
        @Modifying
        @Query("UPDATE ticket SET owner = NULL WHERE owner = ?1 and ticketNum = ?2")
        int unMoveTicket(Long customerId, Long ticketNum);
      
        @Override
        @Modifying(clearAutomatically = true)
        Ticket save(Ticket o);
      
    }
    
  • service

  • @Service
    public class TicketService {
      
        private static final Logger LOG = LoggerFactory.getLogger(TicketService.class);
      
        @Autowired
        private JmsTemplate jmsTemplate;
        @Autowired
        private TicketRepository ticketRepository;
      
        //接收到order服务发送的创建订单请求,开始创建订单
        @Transactional
        @JmsListener(destination = "order:new", containerFactory = "msgFactory")
        public void handleTicketLock(OrderDTO msg) {
            LOG.info("Get new order for ticket lock:{}", msg);
            //执行锁票操作
            int lockCount = ticketRepository.lockTicket(msg.getCustomerId(), msg.getTicketNum());
            if (lockCount == 0) {
                //锁票失败,发送给MQ,由订单服务处理
                msg.setStatus("TICKET_LOCK_FAIL");
                jmsTemplate.convertAndSend("order:fail", msg);
            } else {
                //锁票成功,发送消息给MQ,由订单服务处理
                msg.setStatus("TICKET_LOCKED");
                jmsTemplate.convertAndSend("order:locked", msg);
            }
        }
      
        //付款完成,移交订单
        @Transactional
        @JmsListener(destination = "order:ticket_move", containerFactory = "msgFactory")
        public void handleTicketMove(OrderDTO msg) {
            LOG.info("Get new order for ticket move:{}", msg);
            int moveCount = ticketRepository.moveTicket(msg.getCustomerId(), msg.getTicketNum());
            if (moveCount == 0) {
                LOG.info("Ticket already transferred.");
            }
            msg.setStatus("TICKET_MOVED");
            //订单完成,交由order服务处理
            jmsTemplate.convertAndSend("order:finish", msg);
        }
      
        /**
         * 触发 error_ticket 的情况:
         *  1. 扣费失败,需要解锁票
         *  2. 订单超时,如果存在锁票就解锁,如果已经交票就撤回
         *  这时候,都已经在OrderDTO里设置了失败的原因,所以这里就不再设置原因。
         * @param msg
         */
        @Transactional
        @JmsListener(destination = "order:ticket_error", containerFactory = "msgFactory")
        public void handleError(OrderDTO msg) {
            LOG.info("Get order error for ticket unlock:{}", msg);
            int count = ticketRepository.unMoveTicket(msg.getCustomerId(), msg.getTicketNum()); // 撤销票的转移
            if (count == 0) {
                LOG.info("Ticket already unlocked:", msg);
            }
            count = ticketRepository.unLockTicket(msg.getCustomerId(), msg.getTicketNum()); // 撤销锁票
            if (count == 0) {
                LOG.info("Ticket already unmoved, or not moved:", msg);
            }
            jmsTemplate.convertAndSend("order:fail", msg);
        }
      
        @Transactional
        public Ticket lockTicket(OrderDTO orderDTO) {
            Ticket ticket = ticketRepository.findOneByTicketNum(orderDTO.getTicketNum());
            ticket.setLockUser(orderDTO.getCustomerId());
            ticket = ticketRepository.save(ticket);
            try {
                Thread.sleep(10 * 1000);
            } catch (InterruptedException e) {
                LOG.error(e.getMessage());
            }
            return ticket;
        }
      
        @Transactional
        public int lockTicket2(OrderDTO orderDTO) {
            int updateCount = ticketRepository.lockTicket(orderDTO.getCustomerId(), orderDTO.getTicketNum());
            LOG.info("Updated ticket count:{}", updateCount);
            try {
                Thread.sleep(10 * 1000);
            } catch (InterruptedException e) {
                LOG.error(e.getMessage());
            }
            return updateCount;
        }
      
        @Transactional
        public int unLockTicket(OrderDTO orderDTO) {
            int updateCount = ticketRepository.unLockTicket(orderDTO.getCustomerId(), orderDTO.getTicketNum());
            LOG.info("Updated ticket count:{}", updateCount);
            return updateCount;
        }
    }
    
  • config

  • @Configuration
    public class JmsConfig {
      
        @Bean
        public ConnectionFactory connectionFactory() {
            ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
            TransactionAwareConnectionFactoryProxy proxy = new TransactionAwareConnectionFactoryProxy();
            proxy.setTargetConnectionFactory(cf);
            proxy.setSynchedLocalTransactionAllowed(true);
            return proxy;
        }
      
        @Bean
        public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, MessageConverter jacksonJmsMessageConverter) {
            JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
            jmsTemplate.setMessageConverter(jacksonJmsMessageConverter);
            jmsTemplate.setSessionTransacted(true);
            return jmsTemplate;
        }
      
        @Bean
        public JmsListenerContainerFactory<?> msgFactory(ConnectionFactory cf,
                                                         PlatformTransactionManager transactionManager,
                                                         DefaultJmsListenerContainerFactoryConfigurer configurer) {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            configurer.configure(factory, cf);
            factory.setReceiveTimeout(10000L);
            factory.setCacheLevelName("CACHE_CONNECTION");
            factory.setTransactionManager(transactionManager);
            factory.setConcurrency("10");
            return factory;
        }
      
        @Bean
        public MessageConverter jacksonJmsMessageConverter() {
            MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
            converter.setTargetType(MessageType.TEXT);
            converter.setTypeIdPropertyName("_type");
            return converter;
        }
    }
    
  • web

  • @RestController
    @RequestMapping("/api/ticket")
    public class TicketResource implements TicketCompositeService {
      
        @PostConstruct
        public void init() {
            if (ticketRepository.count() > 0) {
                return;
            }
            Ticket ticket = new Ticket();
            ticket.setName("No.1");
            ticket.setTicketNum(110L);
            ticketRepository.save(ticket);
        }
      
        @Autowired
        private TicketRepository ticketRepository;
        @Autowired
        private TicketService ticketService;
      
        @PostMapping("")
        public OrderDTO create(@RequestBody OrderDTO dto) {
            return dto;
        }
      
        @GetMapping("/{customerId}")
        public List<TicketDTO> getMyTickets(@PathVariable(name = "customerId") Long customerId) {
            List<Ticket> tickets = ticketRepository.findAllByOwner(customerId);
            return tickets.stream().map(ticket -> {
                TicketDTO dto = new TicketDTO();
                dto.setTicketNum(ticket.getTicketNum());
                dto.setId(ticket.getId());
                dto.setLockUser(ticket.getLockUser());
                dto.setName(ticket.getName());
                dto.setOwner(ticket.getOwner());
                return dto;
            }).collect(Collectors.toList());
        }
      
        @PostMapping("/lock")
        public Ticket lock(@RequestBody OrderDTO dto) {
            return ticketService.lockTicket(dto);
        }
        @PostMapping("/lock2")
        public int lock2(@RequestBody OrderDTO dto) {
            return ticketService.lockTicket2(dto);
        }
      
        @GetMapping("")
        public List<Ticket> getAll() {
            return ticketRepository.findAll();
        }
      
    }
    
  • application

  • @SpringBootApplication
    @EnableDiscoveryClient
    public class TicketApplication {
      
    	public static void main(String[] args) {
    		SpringApplication.run(TicketApplication.class, args);
    	}
    }
    

入口

  • localhost:8888/order/api/order

  • {
    "customerId":1,
    "title":"new_order",
    "amount":100,
    "ticketNum":100
    }
    

事件溯源模式

  • 消息驱动(msg-driven or event-driven)
    • 事件不要求持久化保存
    • 消息只是为了更新业务数据的状态,数据库才是一等数据
    • 不要求所有的数据操作都通过消息驱动
  • 事件溯源(event-sourcing)
    • 事件作为一等数据保存
    • 统一的事件管理器和接口,数据更新都由事件产生
    • 数据库中数据的当前状态根据事件的聚合产生
    • 聚合数据可以保存在数据库中,可以根据事件重新生成
  • 事件溯源的优点
    • 历史重现:从事件中重新生成视图数据库
    • 方便的数据流处理与报告生成
    • 性能
    • 服务的松耦合
  • 事件溯源的缺点
    • 只能保证事务的最终一致性
    • 设计和开发思维的转变,学习成本
    • 事件结构的改变
    • 扩展性:event store的分布式实现,事件的分布式处理
  • 消息驱动 VS 事件溯源
    • 一等数据:事件 VS 业务数据
    • 事件永久保存,历史重现
    • 所有数据更新都必须通过事件来产生
    • event store服务承担更多的功能
  • 事件溯源的数据一致性
    • 一个事件只处理一个服务的数据
    • 保证事件的至少一次处理,幂等性
    • 业务请求的错误处理:多次重试失败,网络异常,服务不可用
  • 实例
  • img
  • img
  • 事件溯源和CQRS
    • CQRS:命令查询职责分离

Axon框架

  • 介绍
    • 实现event sourcing和CQRS模式的框架
    • 实现了命令,事件的分发,处理,聚合,查询,存储
    • 提供标签式开发,易维护,并提供来springboot的集成
    • 提供command和event
    • 可扩展,可用于分布式环境,如springcloud
  • 构成
    • 聚合:aggregate
    • 聚合的资源库:repository
    • command:command bus和command handler
    • event:event bus,event handler和event store
    • saga:基于事件的流程管理模式
    • query:执行数据查询操作的特殊command
  • 可扩展性
    • 分布式command
    • 通过AMQP实现分布式event分发和处理
  • img
  • img
  • axon处理command过程
    • resource收到请求,send给commandGateway
    • commandGateway执行拦截器等,再发给commandBus
    • commandBus创建一个unitOfWork,关联一个事务,在其中调用commandHandler处理这个command
    • commandHandler使用repository获得一个聚合对象,并聚合所有该对象的event,设置lock,然后调用处理方法
    • commandHandler再触发一个event
  • img
  • axon处理event过程
    • commandHandler执行apply来触发一个event
    • eventBus在这个event上执行拦截器等
    • eventBus将这个event保存到eventstore
    • eventBus调用在这个event上注册的所有处理方法(在unitOfWork中执行)
    • 在eventHandler中更新聚合数据,保存视图数据库,触发其他command
  • img
  • 使用axon框架的设计过程
    • 领域模型设计
    • 业务-command-command处理
    • 数据-event-event处理
    • 将数据保存到数据库:聚合数据-映射到-视图数据
    • 查询-query

saga

  • 事务驱动的业务流程管理模式
  • 通过开始事件,结束事件,过程中的事件完成整个业务流程
  • 保证在多个事件处理方法执行期间实现事务性
  • startSage-sageEventHandler-endSaga
  • 使用associate将不同的事件关联到同一个saga流程中
  • 正常的结束业务都通过endSaga标签触发,超时使用eventScheduler,触发一个endSaga
  • 一次业务流程的执行对应一个saga实例
  • saga实例状态和关联的事件会保存在数据库中

Comments

Content