业务流程
“发送确认”模式:即生产者通过MQ发送消息后,MQ需要将“已发送成功/失败”反馈给生产者,告知生产者消息已投递成功,此方式可确保消息正确地发送至RabbitMQ
“消费确认”模式:即消费者监听到MQ中队列的消息并执行完对应的业务逻辑后,需要发送“消息已被成功监听、消费”反馈给MQ,此方式可保证接收方正确接收并消费了消息,消费成功后消息将从队列中移除
“避免消息重复投递”:生产者在生产消息时,MQ内部会针对每条消息生成一个MsgId,该标识可以作为去重的依据(消息投递失败并重传),避免重复的消息进入队列
“消息消费时保证幂等性”:这一点可以利用业务本身的特性来实现,即每个业务实体一般都会有一个唯一的ID,就像数据库表中唯一的主键一样,在监听消费处理时根据ID作为去重的依据
“持久化”:将队列、交换机、消息都设置为持久化模式,确保消息在投递、发送期间出现MQ服务宕机后重启恢复过来时消息依旧存在
“消息消费重试机制”:指的是消费者在监听、消费、处理消息的过程中出现了异常,导致业务逻辑没有处理成功,此时可以开启“消息重入队列”机制,设置消息重入队列N次 进行 重试消费
“消息投递补偿机制”:指的是消息在生产、投递期间出现“投递失败”,也就是“发送不成功”的情况,此时可以将其加入到DB中,并开启定时任务,拉取那些投递不成功的消息,重新投递入队列,如此一来便可以保证消息不丢失且准备被投递
流程图
发送邮件案例
消息日志表
首先,需要在数据库中创建一数据库表msg_log,用于记录RabbitMQ消息的投递以及消费过程
CREATE TABLE `msg_log` (
`msg_id` varchar(155) NOT NULL DEFAULT '' COMMENT '消息唯一标识',
`msg` text COMMENT '消息体, json格式化',
`exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交换机',
`routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由键',
`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态: 0投递中 1投递成功 2投递失败 3已消费',
`try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
`next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`msg_id`),
UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投递日志';
控制器
创建用于发送邮件的控制器MailController
@RestController
@RequestMapping("mail")
public class MailController {
@Autowired
private MailService mailService;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Environment env;
@Autowired
private MsgLogMapper msgLogMapper;
private static final Snowflake snowflake=new Snowflake(3,2);
//发送邮件
@RequestMapping(value = "send/v2",method = RequestMethod.POST)
public BaseResponse sendMailV2(@RequestBody @Validated MailDto dto, BindingResult result){
String checkRes=ValidatorUtil.checkResult(result);
if (StringUtils.isNotBlank(checkRes)){
return new BaseResponse(StatusCode.InvalidParams.getCode(),checkRes);
}
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
MsgLog entity=new MsgLog();
//雪花算法生成唯一ID
String msgId=snowflake.nextIdStr();
entity.setMsgId(msgId);
entity.setExchange(env.getProperty("mq.email.v2.exchange"));
entity.setRoutingKey(env.getProperty("mq.email.v2.routing.key"));
entity.setStatus(Constant.MsgStatus.Sending.getStatus());
entity.setTryCount(0);
entity.setCreateTime(DateTime.now().toDate());
dto.setMsgId(msgId);
final String msg=new Gson().toJson(dto);
entity.setMsg(msg);
//1:在发送消息之前先创建并入库,并设置其状态为 “投递中”
msgLogMapper.insertSelective(entity);
//2:RabbitMQ 执行发送逻辑,发送消息
rabbitTemplate.convertAndSend(entity.getExchange(), entity.getRoutingKey(),msg,new CorrelationData(entity.getMsgId()));
}catch (Exception e){
response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
}
return response;
}
}
MQ配置
为了能让RabbitMQ真正发送消息,我们需要在RabbitmqConfig配置类中创建相应的交换机、路由以及队列绑定;除此之外,需要在自定义注入RabbitTemplate Bean组件中加入“发送确认”的处理逻辑,即监听 “消息是否真的投递成功”;
同时,也需要在自定义配置“监听器容器工厂” Bean组件中加入“消息确认消费模式”,在这里,我们采用的是“手动确认消费”模式
//通用化 Rabbitmq 配置
@Configuration
public class RabbitmqConfig {
private final static Logger log = LoggerFactory.getLogger(RabbitmqConfig.class);
@Autowired
private Environment env;
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
@Autowired
private MsgLogMapper msgLogMapper;
//多实例消费者 – 设定确认消费模式为手动确认
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory,connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//确认消费模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.concurrency",int.class));
factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.max-concurrency",int.class));
factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.simple.prefetch",int.class));
return factory;
}
//消息发送操作组件自定义注入配置:设置 "生产需要确认"
@Bean
public RabbitTemplate rabbitTemplate(){
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:消息唯一标识correlationData({}),消息确认结果:ack({}),失败原因:cause({})",correlationData,ack,cause);
//发送确认结果
if (correlationData!=null && StringUtils.isNotBlank(correlationData.getId())){
if (ack) {
//消息投递成功
msgLogMapper.updateMsgSendStatus(correlationData.getId(), Constant.MsgStatus.SendSuccess.getStatus());
} else {
//消息投递失败
msgLogMapper.updateMsgSendStatus(correlationData.getId(), Constant.MsgStatus.SendFail.getStatus());
}
}
}
});
// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
// 启动消息失败返回,比如路由不到队列时触发回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.warn("消息丢失:交换器exchange({}),路由route({}),响应结果replyCode({}),响应信息replyText({}),消息message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}
//构建发送邮件消息模型-队列、交换机做持久化,消息到时候也设置持久化,将意味着MQ在发送消息的整个过程,将会被记录到持久化日志文件中
//可靠性投递
@Bean
public Queue emailQueueV2(){
return new Queue(env.getProperty("mq.email.v2.queue"),true);
}
@Bean
public DirectExchange emailExchangeV2(){
return new DirectExchange(env.getProperty("mq.email.v2.exchange"),true,false);
}
@Bean
public Binding emailBindingV2(){
return BindingBuilder.bind(emailQueueV2()).to(emailExchangeV2()).with(env.getProperty("mq.email.v2.routing.key"));
}
}
消息监听器
创建用于“监听消费消息”的消费者MqListener,其监听消费处理消息的逻辑为:获取并解析出消息体的内容,根据MsgId查询消息当前的发送状态,如果处于“未投递”或者“投递失败”且“重试次数还没超过最大次数”时,则获取该消息体的真实内容,并将其交给MailService执行“发送邮件”的逻辑
@Component
public class MqListener {
private static final Logger log= LoggerFactory.getLogger(MqListener.class);
@Autowired
private MailService mailService;
@Autowired
private MsgLogMapper msgLogMapper;
//监听消费发送邮件消息-v2
@RabbitListener(queues = {"${mq.email.v2.queue}"},containerFactory = "multiListenerContainer")
public void consumeMailV2(Message message, Channel channel) throws Exception{
log.info("---监听消费发送邮件消息-v2---开始处理中...");
Long deliverTag=message.getMessageProperties().getDeliveryTag();
Integer tryCount=0;
String msgId="";
try {
MailDto dto=new Gson().fromJson(new String(message.getBody()), MailDto.class);
msgId=dto.getMsgId();
log.info("---监听到消息-v2:{}",dto);
//maxTry=msgLogMapper.selectMaxTry(dto.getMsgId());
/*此处可以加上一层分布式锁,保证超高并发时处理消息的原理操作 ~ 读者自行实现,有问题可以随时交流*/
//防止重复投递 - 保证幂等性
MsgLog msgLog=msgLogMapper.selectByPrimaryKey(dto.getMsgId());
if (msgLog!=null && !Constant.MsgStatus.ConsumeSuccess.getStatus().equals(msgLog.getStatus()) && msgLog.getTryCount()<Constant.MsgTryCount){
tryCount=msgLog.getTryCount();
//核心业务逻辑【消息处理~发送邮件(同步;异步)】
boolean res=mailService.sendSimpleEmail(msgId,dto.getSubject(),dto.getContent(),dto.getTos());
if (res){
//确认消费
channel.basicAck(deliverTag,false);
return;
}
}
}catch (Exception e){
e.printStackTrace();
//出现异常时可以走重试机制 ~ 重试次数为3次,每次间隔8s(自己灵活设定即可)
if (tryCount<Constant.MsgTryCount){
Thread.sleep(8000);
channel.basicNack(deliverTag,false,true);
msgLogMapper.updateMaxTry(msgId);
return;
}
}
//当走到这一步时代表消息已被消费(status=3)或者 重试次数达到上限 等情况-但不管是哪种情况,都需要将消息从队列中移除,防止下次项目重启时重新监听消费
channel.basicAck(deliverTag,false);
}
}
发送邮件业务类
mailService的sendSimpleEmail方法则主要用于执行真正的“发送邮件”逻辑,同时将最终的发送结果(业务逻辑处理结果)及时更新至数据库表“消息的发送状态”中
@Service
public class MailService {
private static final Logger log= LoggerFactory.getLogger(MailService.class);
@Autowired
private Environment env;
@Autowired
private JavaMailSender mailSender;
@Autowired
private MsgLogMapper msgLogMapper;
//TODO:发送简单的邮件消息
//@Async("threadPoolTaskExecutor")
public Boolean sendSimpleEmail(final String msgId,final String subject,final String content,final String ... tos) throws Exception{
Boolean res=true;
try {
SimpleMailMessage message=new SimpleMailMessage();
message.setSubject(subject);
message.setText(content);
message.setTo(tos);
message.setFrom(env.getProperty("mail.send.from"));
mailSender.send(message);
//int i=1/0;
log.info("----发送简单的邮件消息完毕--->");
}catch (Exception e){
log.error("--发送简单的邮件消息,发生异常:",e.fillInStackTrace());
res=false;
throw e;
}finally {
this.updateMsgSendStatus(msgId,res);
}
return res;
}
//TODO:更新消息处理的结果
private void updateMsgSendStatus(final String msgId,Boolean res){
if (StringUtils.isNotBlank(msgId)){
if (res){
msgLogMapper.updateMsgManageSuccess(msgId, Constant.MsgStatus.ConsumeSuccess.getStatus());
}else{
msgLogMapper.updateMsgManageSuccess(msgId, Constant.MsgStatus.ConsumeFail.getStatus());
}
}
}
}
定时任务补偿投递
开启一定时任务,拉取投递失败的消息重新进行投递
@Component
public class MqScheduler {
private static final Logger log= LoggerFactory.getLogger(MqScheduler.class);
@Autowired
private MsgLogMapper msgLogMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
//定时任务重新拉取并投递
@Scheduled(cron = "0/10 * * * * ?")
public void reSendMsg(){
try {
String status= Constant.MsgStatus.Sending.getStatus()+","+Constant.MsgStatus.SendFail.getStatus();
List<MsgLog> msgLogs=msgLogMapper.selectSendFailMsg(status);
if (msgLogs!=null && !msgLogs.isEmpty()){
msgLogs.forEach(msgLog -> {
if (StringUtils.isNotBlank(msgLog.getMsg()) && StringUtils.isNotBlank(msgLog.getExchange())
&& StringUtils.isNotBlank(msgLog.getRoutingKey())){
//发送消息
rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(),msgLog.getMsg(),
new CorrelationData(msgLog.getMsgId()));
log.info("----已成功重新投递消息,消息id={}",msgLog.getMsgId());
}
});
}
}catch (Exception e){
log.error("定时任务重新拉取投递失败的消息~重新进行投递~发生异常:",e.fillInStackTrace());
}
}
}
测试
1:正常投递、正常消费的情况;
{
"subject":"咿呀咿呀哟",
"content":"http://www.mark-to-win.com/tutorial/51081.html",
"tos":"baiweijieku@qq.com"
}
观察IDEA控制台的输出信息,会发现消息已经成功插入数据库,并投递成功,稍等一两秒后可以看到消息已经被监听消费
稍等片刻,会发现邮件已经成功,与此同时,数据库中该条消息的状态已变更为“消费成功”
打开自己的邮箱,会发现也已经成功收到了该邮件信息
2:投递失败,定时任务定时拉取重新投递的情况
为了演示“消息投递失败/不成功”的情况,我们可以将Controller中的sendMailV2()方法里的“发送MQ消息”的逻辑注释起来
这将意味着,前端Postman触发请求调用这个接口时,插入数据库中的消息的状态为“投递中”,即 status=0 的情况;除此之外,还可以偷偷地调整 消息对应的“交换机”或者“路由”的值,这样一来,RabbitmqConfig中RabbitTemplate方法将会触发“消息投递失败 404”的错误,也就是 status=2 的情况
下面我们就以第一种为主吧,将发送消息的代码逻辑注释起来,然后再次在Postman发起请求,此时会发现数据库表插入了一条 status=0 的消息
与此同时,我们的定时器因为cron设定时间为 10s 来一次轮询,因此稍等片刻后,会发现该条消息重新被拉取出来重新进入投递,最终成功走到了最后了
3:消息确认消费失败进入重试的情况
这种情况只有在“监听消费处理消息的过程出现异常” 才能触发“消息重入队列”机制,因此,我们只需要在真正处理消息的业务逻辑:“发送邮件”中嵌入一段 int i=1/0 的代码即可
而我们都知道,在执行这段代码之后,sendSimpleEmail() 势必会抛出异常,之后会更新数据库表中该消息的状态,即“消费失败”,之后便会触发外层MqListener的comsumeMailV2方法的catch逻辑
之后,消息便会“重入队列”,当然啦,每次重入队列时,时间间隔是8s(这个按照实际情况设定即可,这种写法还是比较粗糙~), 数据库表该消息的“最大重试次数”将会加1,直到超过 最大的重试次数(这里是 3次),便不会再进入队列了,而是会从队列中永久删除;于此同时,IDEA控制台会报出相应的异常信息,最终数据库表中该条消息的try_count也达到了最大重试次数:3次