这次打算说一下 rabbitmq 的延迟队列。

延迟队列,名字中有个队列,队列是先进先出的。所以说延迟队列是一个有方向性的。

其次,延迟队列和普通队列最大的区别就是,普通队列里的消息是希望自己早点被取出来消费。而延迟队列中的消息都是由时间来控制的。也就是说,他们进入队列的时候,就已经被安排何时被取出了

rabbitmq 实现延迟队列主要有种方式。

第一种是使用普通队列和死信队列来模拟实现延迟的效果。大致上是将消息放入一个没有被监听的队列上,设置 TTL (一条消息的最大存活时间) 为延迟的时间,时间到了没有被消费,直接成为私信。监听私信队列来进行操作。

第二种是使用 rabbitmq 官方提供的 delayed 插件来真正实现延迟队列。本文对第二种进行详解

应用场景

  • 订单超时支付取消订单
  • 用户发起退款卖家 3 天不处理自动退款
  • 预约抢购活动,活动开始前 10 分钟短信通知用户

安装延迟插件

默认交换机是有 4 种模式的

img点击并拖拽以移动

现在我们去安装延迟插件

https://www.rabbitmq.com/community-plugins.html

前往官网去下载延迟插件

img点击并拖拽以移动img点击并拖拽以移动下载完成之后,上传到我们的服务器。使用下面的命令将插件复制到 mq 容器的 plugins 目录下

1
docker cp rabbitmq_delayed_message_exchange-3.10.2.ez rabbitmq:/plugins

点击并拖拽以移动img

容器日志提示已经启用 delayed 插件

点击并拖拽以移动img

点击并拖拽以移动交换机新增一个延迟模式

img点击并拖拽以移动编辑

配置类

直接上代码,首先是我们的配置类。初始化一个队列和一个延迟交换机 (这里我交换机模式用的是路由模式)。将队列绑定到交换机上并设置路由 Key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Component
public class RabbitmqDelayedConfig {

/**
* 初始化延迟交换机
* @return
*/
@Bean
public CustomExchange delayedExchangeInit() {
Map<String, Object> args = new HashMap<>();
// 设置类型,可以为fanout、direct、topic
args.put("x-delayed-type", "direct");
// 第一个参数是延迟交换机名字,第二个是交换机类型,第三个设置持久化,第四个设置自动删除,第五个放参数
return new CustomExchange("delayed_exchange","x-delayed-message", true,false,args);
}

/**
* 初始化短信队列
* @return
*/
@Bean
public Queue delayedSmsQueueInit() {
return new Queue("sms_delayed_queue", true);
}


/**
* 短信队列绑定到交换机
* @param delayedSmsQueueInit
* @param customExchange
* @return
*/
@Bean
public Binding delayedBindingSmsQueue(Queue delayedSmsQueueInit, CustomExchange customExchange) {
// 延迟队列绑定延迟交换机并设置RoutingKey为sms
return BindingBuilder.bind(delayedSmsQueueInit).to(customExchange).with("sms").noargs();
}
}

点击并拖拽以移动

生产者

将消息转发到”delayed_exchange” 交换机上路由键为”sms” 的队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
@Log4j2
public class Producer {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 延迟模式
* @param msg 消息
* @param time 延迟时间
*/
public void delayedTest(String msg,Integer time) {
// 第一个参数是延迟交换机名称,第二个是Routingkey,第三个是消息主题,第四个是X,并设置延迟时间,单位 是毫秒
rabbitTemplate.convertAndSend("delayed_exchange","sms",msg,a -> {
a.getMessageProperties().setDelay(time);
return a;
});
log.info("延迟模式默认交换机已发出消息");
}

}

点击并拖拽以移动

消费者

监听指定的队列。一旦队列中有消息则立刻取出进行消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
@Log4j2
public class ConsumerDelayed {

/**
* 延迟模式短信消费者
* @param message
*/
@RabbitListener(queues = {"sms_delayed_queue"})
public void getSmsConsumer(String message) {
log.info(new Date().toLocaleString() + "延迟模式短信消费者接收到信息:" + message);
}

}

点击并拖拽以移动img点击并拖拽以移动img点击并拖拽以移动 发出消息并且 10S 后延迟队列对消息进行消费,延迟队列实现成功