这次打算说一下 rabbitmq 的延迟队列。
延迟队列,名字中有个队列,队列是先进先出的。所以说延迟队列是一个有方向性的。
其次,延迟队列和普通队列最大的区别就是,普通队列里的消息是希望自己早点被取出来消费。而延迟队列中的消息都是由时间来控制的。也就是说,他们进入队列的时候,就已经被安排何时被取出了
rabbitmq 实现延迟队列主要有种方式。
第一种是使用普通队列和死信队列来模拟实现延迟的效果。大致上是将消息放入一个没有被监听的队列上,设置 TTL (一条消息的最大存活时间) 为延迟的时间,时间到了没有被消费,直接成为私信。监听私信队列来进行操作。
第二种是使用 rabbitmq 官方提供的 delayed 插件来真正实现延迟队列。本文对第二种进行详解
应用场景
- 订单超时支付取消订单
- 用户发起退款卖家 3 天不处理自动退款
- 预约抢购活动,活动开始前 10 分钟短信通知用户
安装延迟插件
默认交换机是有 4 种模式的
现在我们去安装延迟插件
https://www.rabbitmq.com/community-plugins.html
前往官网去下载延迟插件
下载完成之后,上传到我们的服务器。使用下面的命令将插件复制到 mq 容器的 plugins 目录下
1
| docker cp rabbitmq_delayed_message_exchange-3.10.2.ez rabbitmq:/plugins
|
容器日志提示已经启用 delayed 插件
交换机新增一个延迟模式
编辑
配置类
直接上代码,首先是我们的配置类。初始化一个队列和一个延迟交换机 (这里我交换机模式用的是路由模式)。将队列绑定到交换机上并设置路由 Key
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Component public class RabbitmqDelayedConfig {
@Bean public CustomExchange delayedExchangeInit() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange("delayed_exchange","x-delayed-message", true,false,args); }
@Bean public Queue delayedSmsQueueInit() { return new Queue("sms_delayed_queue", true); }
@Bean public Binding delayedBindingSmsQueue(Queue delayedSmsQueueInit, CustomExchange customExchange) { return BindingBuilder.bind(delayedSmsQueueInit).to(customExchange).with("sms").noargs(); } }
|
生产者
将消息转发到”delayed_exchange” 交换机上路由键为”sms” 的队列中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Component @Log4j2 public class Producer {
@Autowired private RabbitTemplate rabbitTemplate;
public void delayedTest(String msg,Integer time) { rabbitTemplate.convertAndSend("delayed_exchange","sms",msg,a -> { a.getMessageProperties().setDelay(time); return a; }); log.info("延迟模式默认交换机已发出消息"); }
}
|
消费者
监听指定的队列。一旦队列中有消息则立刻取出进行消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Component @Log4j2 public class ConsumerDelayed {
@RabbitListener(queues = {"sms_delayed_queue"}) public void getSmsConsumer(String message) { log.info(new Date().toLocaleString() + "延迟模式短信消费者接收到信息:" + message); }
}
|
发出消息并且 10S 后延迟队列对消息进行消费,延迟队列实现成功