一。机制
首先我们要知道一条消息的传递过程。
生产者 -> 交换机 -> 队列
我们的生产者生产消息,生产完成的消息发送到交换机,由交换机去把这个消息转发到对应的队列上。这其中我们可能在生产者 -> 交换机丢失消息,也可能在 交换机 -> 队列上丢失消息。因此我们需要引入 2 个概念。
1: 生产者到交换机的可靠保证 (confirmCallback) 确认回调机制
2: 交换机到队列的保证 (returnCallback) 返回回调机制
二。保证生产者到交换机的可靠传递
因为我们的消息都要经过路由,然后去对应的队列,所以第一条线路至关重要。我们使用 confirm 机制。这个 confirm 机制是一个异步的,也就是说我们发送一条消息之后可以继续发送下一条消息。比自带的事务好很多。
使用 confirm 机制首先需要在配置文件中开启 confirm 机制
1 2 3 4 5 6 7 8
| rabbitmq: host: localhost port: 5672 virtual-host: / username: admin password: password # 开启生产者消息确认 publisher-confirm-type: correlated
|
生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @GetMapping("/send/{tel}") public Result send(@PathVariable("tel") String tel) { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { if (b) { log.info("消息发送到交换机成功"); } else { log.error("消息发送到交换机失败,失败信息[{}]",s); } } });
rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_EXCHANGE,"sms",tel);
return null; }
|
这样我们的消息如果发送到交换机,就会执行消息发送到交换机成功
现在我们测试一下,我在交换机名字后面加上一个字符串,现在这个交换机是不存在的。看看会发生什么
1
| rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_EXCHANGE+"sora33","sms",tel);
|
发送到交换机失败了,执行了我们失败里的回调。这里我们就可以看出这个 confirm 机制的作用了。它是用来确保确保我们的消息是否到达了交换机。到达了执行 ack,没有到达执行 nack。我们可以在发送失败的方法里加入自己的逻辑。比如加入到发送失败的表中,或者尝试重新发送…
消息的发送确认机制讲完之后。接下来我们来看一下交换机到队列要如何保证消息的可靠性。
三。保证交换机到队列的可靠传递
使用 ReturnCallback 机制来保证。假设我现在有一个路由模式的交换机。绑定了一个队列,叫 send_sms。对应的路由键是 sms。如果我给这个交换机发送一条消息。路由键指定 smssss。肯定是找不到对应的队列。那么这个时候就会触发 ReturnCallback。
setMandatory 是用来设置如果没有找到队列,是丢弃还是执行 returnedMessage 里的方法。false 丢弃。
要使用 ReturnCallback,我们同样需要在设置中打开配置,很简单。只需要在 yml 里的 mq 下面跟一条配置就行了。打开 return 回调机制
加入下面的回调属性设置。可以和消息确认机制一起使用。2 者互不影响,直接写上去就行。
1 2 3 4 5 6 7 8 9 10
| rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int i, String context, String exchange, String routeKey) { log.error("消息[{}]未到达队列[{}],使用的路由键[{}]",message,exchange,routeKey); } });
rabbitTemplate.setMandatory(true);
|
现在我给一个不存在的路由 key 发送。交换机肯定是找不到对应的队列的 我们的交换机目前只绑定了路由为 sms 的一个队列
1
| rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_EXCHANGE,"smssss",tel);
|
可以看到,虽然消息进入了交换机,但是找不到对应的队列,执行 ReturnCallback 回调函数
生产者方面的一些机制讲完之后。接下来我们来看消费者中的消息签收机制以及如何重新发送失败的消息。
因为 rabbitMQ 默认是签收消息的。我们先把签收模式设置为手动签收 顺便配置一下我们的重发配置
1 2 3 4 5 6 7 8 9 10 11
| # 消费端设置手动签收 listener: direct: acknowledge-mode: manual simple: acknowledge-mode: manual retry: # 开启消息重发机制 enabled: true # 重试次数3 max-attempts: 3
|
生产者代码 生产者的逻辑很简单。肯定会抛异常。因为我手动设置了一个被除数异常。进入到 catch 块中。我做了一个存入 redis 的操作,将这个消息的标签值作为键。值设置为 1. 作为重试次数。存入之后 mq 会自动进行一个重发。当判断重试次数达到 3 次。直接拒绝签收。并将该消息存到数据库中的重试表。进行一个人工操作…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| int a = 0;
@RabbitListener(queues = {RabbitConstant.SEND_SMS}) public void smsQueue(String tel, Message message, Channel channel) throws IOException { try { int c = 1/a; channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.info("签收成功[{}]",tel); } catch (Exception e) { Integer value = (Integer)redisUtil.get(message.getMessageProperties().getDeliveryTag() + ""); if (value == null) { redisUtil.set(message.getMessageProperties().getDeliveryTag()+"", 1); } else if (value.intValue() == 2) { log.error("消息[{}]消费失败...传递参数[{}]", message, tel); log.warn("已加入重试表..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); return; } else { redisUtil.set(message.getMessageProperties().getDeliveryTag() + "", ++value); } log.info("签收失败[{}]",tel); throw new RuntimeException("签收异常"); } }
|
当我们给 int c = 1/a 改为 c = 1/a++
这个时候第一次会进入 catch 块。第二次因为 a 自增。所以不会抛出异常,签收成功
四。总结
RabbitMQ 在我们工作中是常用的一个中间件,必须要对齐了如指。既然是中间件,那么势必会有消息丢失产生,还要保证消息的幂等性。本文章是一个进阶文章。RabbitMQ 基础跳转
https://soora33.github.io/posts/4222e73b.html
https://soora33.github.io/posts/414b6727.html