百味皆苦 java后端开发攻城狮

MQ保证消息不丢失

2020-08-09
百味皆苦
MQ

业务流程

“发送确认”模式:即生产者通过MQ发送消息后,MQ需要将“已发送成功/失败”反馈给生产者,告知生产者消息已投递成功,此方式可确保消息正确地发送至RabbitMQ

“消费确认”模式:即消费者监听到MQ中队列的消息并执行完对应的业务逻辑后,需要发送“消息已被成功监听、消费”反馈给MQ,此方式可保证接收方正确接收并消费了消息,消费成功后消息将从队列中移除

“避免消息重复投递”:生产者在生产消息时,MQ内部会针对每条消息生成一个MsgId,该标识可以作为去重的依据(消息投递失败并重传),避免重复的消息进入队列

“消息消费时保证幂等性”:这一点可以利用业务本身的特性来实现,即每个业务实体一般都会有一个唯一的ID,就像数据库表中唯一的主键一样,在监听消费处理时根据ID作为去重的依据

“持久化”:将队列、交换机、消息都设置为持久化模式,确保消息在投递、发送期间出现MQ服务宕机后重启恢复过来时消息依旧存在

“消息消费重试机制”:指的是消费者在监听、消费、处理消息的过程中出现了异常,导致业务逻辑没有处理成功,此时可以开启“消息重入队列”机制,设置消息重入队列N次 进行 重试消费

“消息投递补偿机制”:指的是消息在生产、投递期间出现“投递失败”,也就是“发送不成功”的情况,此时可以将其加入到DB中,并开启定时任务,拉取那些投递不成功的消息,重新投递入队列,如此一来便可以保证消息不丢失且准备被投递

流程图

发送邮件案例

消息日志表

首先,需要在数据库中创建一数据库表msg_log,用于记录RabbitMQ消息的投递以及消费过程

CREATE TABLE `msg_log` (
  `msg_id` varchar(155) NOT NULL DEFAULT '' COMMENT '消息唯一标识',
  `msg` text COMMENT '消息体, json格式化',
  `exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交换机',
  `routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由键',
  `status` int(11) NOT NULL DEFAULT '0' COMMENT '状态: 0投递中 1投递成功 2投递失败 3已消费',
  `try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
  `next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`msg_id`),
  UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投递日志';

控制器

创建用于发送邮件的控制器MailController

@RestController
@RequestMapping("mail")
public class MailController {

    @Autowired
    private MailService mailService;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private Environment env;

    @Autowired
    private MsgLogMapper msgLogMapper;

    private static final Snowflake snowflake=new Snowflake(3,2);

    //发送邮件
    @RequestMapping(value = "send/v2",method = RequestMethod.POST)
    public BaseResponse sendMailV2(@RequestBody @Validated MailDto dto, BindingResult result){
        String checkRes=ValidatorUtil.checkResult(result);
        if (StringUtils.isNotBlank(checkRes)){
            return new BaseResponse(StatusCode.InvalidParams.getCode(),checkRes);
        }
        BaseResponse response=new BaseResponse(StatusCode.Success);
        try {
            MsgLog entity=new MsgLog();
            //雪花算法生成唯一ID
            String msgId=snowflake.nextIdStr();

            entity.setMsgId(msgId);
            entity.setExchange(env.getProperty("mq.email.v2.exchange"));
            entity.setRoutingKey(env.getProperty("mq.email.v2.routing.key"));
            entity.setStatus(Constant.MsgStatus.Sending.getStatus());
            entity.setTryCount(0);
            entity.setCreateTime(DateTime.now().toDate());

            dto.setMsgId(msgId);
            final String msg=new Gson().toJson(dto);

            entity.setMsg(msg);
            //1:在发送消息之前先创建并入库,并设置其状态为 “投递中”
            msgLogMapper.insertSelective(entity);

            //2:RabbitMQ 执行发送逻辑,发送消息
            rabbitTemplate.convertAndSend(entity.getExchange(), entity.getRoutingKey(),msg,new CorrelationData(entity.getMsgId()));

        }catch (Exception e){
            response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
        }
        return response;
    }
}

MQ配置

为了能让RabbitMQ真正发送消息,我们需要在RabbitmqConfig配置类中创建相应的交换机、路由以及队列绑定;除此之外,需要在自定义注入RabbitTemplate Bean组件中加入“发送确认”的处理逻辑,即监听 “消息是否真的投递成功”;

同时,也需要在自定义配置“监听器容器工厂” Bean组件中加入“消息确认消费模式”,在这里,我们采用的是“手动确认消费”模式

//通用化 Rabbitmq 配置
@Configuration
public class RabbitmqConfig {

    private final static Logger log = LoggerFactory.getLogger(RabbitmqConfig.class);

    @Autowired
    private Environment env;
    @Autowired
    private CachingConnectionFactory connectionFactory;
    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    @Autowired
    private MsgLogMapper msgLogMapper;

    //多实例消费者 – 设定确认消费模式为手动确认
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory,connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //确认消费模式
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
       factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.concurrency",int.class));
        factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.max-concurrency",int.class));
        factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.simple.prefetch",int.class));
        return factory;
    }

    //消息发送操作组件自定义注入配置:设置 "生产需要确认"
    @Bean
    public RabbitTemplate rabbitTemplate(){
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息发送成功:消息唯一标识correlationData({}),消息确认结果:ack({}),失败原因:cause({})",correlationData,ack,cause);

                //发送确认结果
                if (correlationData!=null && StringUtils.isNotBlank(correlationData.getId())){
                    if (ack) {
                        //消息投递成功
                        msgLogMapper.updateMsgSendStatus(correlationData.getId(), Constant.MsgStatus.SendSuccess.getStatus());
                    } else {
                        //消息投递失败
                        msgLogMapper.updateMsgSendStatus(correlationData.getId(), Constant.MsgStatus.SendFail.getStatus());
                    }
                }
            }
        });
        // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
        // 启动消息失败返回,比如路由不到队列时触发回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.warn("消息丢失:交换器exchange({}),路由route({}),响应结果replyCode({}),响应信息replyText({}),消息message:{}",exchange,routingKey,replyCode,replyText,message);
            }
        });
        return rabbitTemplate;
    }

    //构建发送邮件消息模型-队列、交换机做持久化,消息到时候也设置持久化,将意味着MQ在发送消息的整个过程,将会被记录到持久化日志文件中
    //可靠性投递
    @Bean
    public Queue emailQueueV2(){
        return new Queue(env.getProperty("mq.email.v2.queue"),true);
    }
    @Bean
    public DirectExchange emailExchangeV2(){
        return new DirectExchange(env.getProperty("mq.email.v2.exchange"),true,false);
    }
    @Bean
    public Binding emailBindingV2(){
        return BindingBuilder.bind(emailQueueV2()).to(emailExchangeV2()).with(env.getProperty("mq.email.v2.routing.key"));
    }
}

消息监听器

创建用于“监听消费消息”的消费者MqListener,其监听消费处理消息的逻辑为:获取并解析出消息体的内容,根据MsgId查询消息当前的发送状态,如果处于“未投递”或者“投递失败”且“重试次数还没超过最大次数”时,则获取该消息体的真实内容,并将其交给MailService执行“发送邮件”的逻辑

@Component
public class MqListener {

    private static final Logger log= LoggerFactory.getLogger(MqListener.class);

    @Autowired
    private MailService mailService;

    @Autowired
    private MsgLogMapper msgLogMapper;

    //监听消费发送邮件消息-v2
    @RabbitListener(queues = {"${mq.email.v2.queue}"},containerFactory = "multiListenerContainer")
    public void consumeMailV2(Message message, Channel channel) throws Exception{
        log.info("---监听消费发送邮件消息-v2---开始处理中...");
        Long deliverTag=message.getMessageProperties().getDeliveryTag();

        Integer tryCount=0;
        String msgId="";
        try {
            MailDto dto=new Gson().fromJson(new String(message.getBody()), MailDto.class);
            msgId=dto.getMsgId();
            log.info("---监听到消息-v2:{}",dto);

            //maxTry=msgLogMapper.selectMaxTry(dto.getMsgId());

            /*此处可以加上一层分布式锁,保证超高并发时处理消息的原理操作 ~ 读者自行实现,有问题可以随时交流*/

            //防止重复投递 - 保证幂等性
            MsgLog msgLog=msgLogMapper.selectByPrimaryKey(dto.getMsgId());
            if (msgLog!=null && !Constant.MsgStatus.ConsumeSuccess.getStatus().equals(msgLog.getStatus()) && msgLog.getTryCount()<Constant.MsgTryCount){
                tryCount=msgLog.getTryCount();

                //核心业务逻辑【消息处理~发送邮件(同步;异步)】
                boolean res=mailService.sendSimpleEmail(msgId,dto.getSubject(),dto.getContent(),dto.getTos());
                if (res){
                    //确认消费
                    channel.basicAck(deliverTag,false);
                    return;
                }
            }
        }catch (Exception e){
            e.printStackTrace();

            //出现异常时可以走重试机制 ~ 重试次数为3次,每次间隔8s(自己灵活设定即可)
            if (tryCount<Constant.MsgTryCount){
                Thread.sleep(8000);

                channel.basicNack(deliverTag,false,true);
                msgLogMapper.updateMaxTry(msgId);
                return;
            }
        }

        //当走到这一步时代表消息已被消费(status=3)或者 重试次数达到上限 等情况-但不管是哪种情况,都需要将消息从队列中移除,防止下次项目重启时重新监听消费
        channel.basicAck(deliverTag,false);
    }
}

发送邮件业务类

mailService的sendSimpleEmail方法则主要用于执行真正的“发送邮件”逻辑,同时将最终的发送结果(业务逻辑处理结果)及时更新至数据库表“消息的发送状态”中

@Service
public class MailService {
    private static final Logger log= LoggerFactory.getLogger(MailService.class);

    @Autowired
    private Environment env;

    @Autowired
    private JavaMailSender mailSender;

    @Autowired
    private MsgLogMapper msgLogMapper;

    //TODO:发送简单的邮件消息
    //@Async("threadPoolTaskExecutor")
    public Boolean sendSimpleEmail(final String msgId,final String subject,final String content,final String ... tos) throws Exception{
        Boolean res=true;
        try {
            SimpleMailMessage message=new SimpleMailMessage();
            message.setSubject(subject);
            message.setText(content);
            message.setTo(tos);
            message.setFrom(env.getProperty("mail.send.from"));
            mailSender.send(message);

            //int i=1/0;

            log.info("----发送简单的邮件消息完毕--->");
        }catch (Exception e){
            log.error("--发送简单的邮件消息,发生异常:",e.fillInStackTrace());
            res=false;
            throw e;
        }finally {
            this.updateMsgSendStatus(msgId,res);
        }
        return res;
    }

    //TODO:更新消息处理的结果
    private void updateMsgSendStatus(final String msgId,Boolean res){
        if (StringUtils.isNotBlank(msgId)){
            if (res){
                msgLogMapper.updateMsgManageSuccess(msgId, Constant.MsgStatus.ConsumeSuccess.getStatus());
            }else{
                msgLogMapper.updateMsgManageSuccess(msgId, Constant.MsgStatus.ConsumeFail.getStatus());
            }
        }
    }
}

定时任务补偿投递

开启一定时任务,拉取投递失败的消息重新进行投递

@Component
public class MqScheduler {

    private static final Logger log= LoggerFactory.getLogger(MqScheduler.class);

    @Autowired
    private MsgLogMapper msgLogMapper;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //定时任务重新拉取并投递
    @Scheduled(cron = "0/10 * * * * ?")
    public void reSendMsg(){
        try {
            String status= Constant.MsgStatus.Sending.getStatus()+","+Constant.MsgStatus.SendFail.getStatus();
            List<MsgLog> msgLogs=msgLogMapper.selectSendFailMsg(status);
            if (msgLogs!=null && !msgLogs.isEmpty()){
                msgLogs.forEach(msgLog -> {
                    if (StringUtils.isNotBlank(msgLog.getMsg()) && StringUtils.isNotBlank(msgLog.getExchange())
                            && StringUtils.isNotBlank(msgLog.getRoutingKey())){
                        //发送消息
                        rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(),msgLog.getMsg(),
                                new CorrelationData(msgLog.getMsgId()));
                        log.info("----已成功重新投递消息,消息id={}",msgLog.getMsgId());
                    }
                });
            }
        }catch (Exception e){
            log.error("定时任务重新拉取投递失败的消息~重新进行投递~发生异常:",e.fillInStackTrace());
        }
    }
}

测试

1:正常投递、正常消费的情况;

{
  "subject":"咿呀咿呀哟",
  "content":"http://www.mark-to-win.com/tutorial/51081.html",
  "tos":"baiweijieku@qq.com"
}

观察IDEA控制台的输出信息,会发现消息已经成功插入数据库,并投递成功,稍等一两秒后可以看到消息已经被监听消费

稍等片刻,会发现邮件已经成功,与此同时,数据库中该条消息的状态已变更为“消费成功”

打开自己的邮箱,会发现也已经成功收到了该邮件信息

2:投递失败,定时任务定时拉取重新投递的情况

为了演示“消息投递失败/不成功”的情况,我们可以将Controller中的sendMailV2()方法里的“发送MQ消息”的逻辑注释起来

这将意味着,前端Postman触发请求调用这个接口时,插入数据库中的消息的状态为“投递中”,即 status=0 的情况;除此之外,还可以偷偷地调整 消息对应的“交换机”或者“路由”的值,这样一来,RabbitmqConfig中RabbitTemplate方法将会触发“消息投递失败 404”的错误,也就是 status=2 的情况

下面我们就以第一种为主吧,将发送消息的代码逻辑注释起来,然后再次在Postman发起请求,此时会发现数据库表插入了一条 status=0 的消息

与此同时,我们的定时器因为cron设定时间为 10s 来一次轮询,因此稍等片刻后,会发现该条消息重新被拉取出来重新进入投递,最终成功走到了最后了

3:消息确认消费失败进入重试的情况

这种情况只有在“监听消费处理消息的过程出现异常” 才能触发“消息重入队列”机制,因此,我们只需要在真正处理消息的业务逻辑:“发送邮件”中嵌入一段 int i=1/0 的代码即可

而我们都知道,在执行这段代码之后,sendSimpleEmail() 势必会抛出异常,之后会更新数据库表中该消息的状态,即“消费失败”,之后便会触发外层MqListener的comsumeMailV2方法的catch逻辑

之后,消息便会“重入队列”,当然啦,每次重入队列时,时间间隔是8s(这个按照实际情况设定即可,这种写法还是比较粗糙~), 数据库表该消息的“最大重试次数”将会加1,直到超过 最大的重试次数(这里是 3次),便不会再进入队列了,而是会从队列中永久删除;于此同时,IDEA控制台会报出相应的异常信息,最终数据库表中该条消息的try_count也达到了最大重试次数:3次


上一篇 rabbitMQ

下一篇 ft工具类

Comments

Content