基本介绍
rabbitmq 是一个基于 Erlang 语言开发且非常好用的一款开源的 amqp (高级消息队列)。主要的业务场景有秒杀、消息的订阅分发,抢优惠卷等高并发场景。主要的亮点有三个
三大亮点
- 解耦:一个系统调用多个模块。互相调用的关系很复杂很麻烦。如果没有消息队列,每当一个新业务接入,我们都要在主系统调用新接口。使用消息队列,我们只需要关心是否送达。服务自己订阅想要的信息即可
- 削锋:高峰时期对服务器的压力。比如下单的时候,大量的数据直接访问过来根本没时间处理,不妨先把他们存到消息队列里,让服务器不至于崩溃的同时尽可能的快速执行队列中的任务
- 异步:对于不是特别重要的一些请求。假如说有一个操作,要调用三个服务,a200ms,b300ms,c200ms,如果不使用 mq 的话,用户至少要等 700ms,使用 mq 的话,直接发送 3 条消息到 mq 里,大大减少了耗时时间,同时用户体验也上个档次
说完优点,来说说缺点
三大缺点
- 系统可用性降低:mq 也会出问题,没使用 mq 之前,a 系统调用 b 系统,b 系统调用 c 系统。这样虽然耦合高,但是可以正常工作。如果把 mq 引进来,把数据都发给 mq,让 mq 来调用 abc 三个系统,万一 mq 挂掉了。这整个业务都崩了
- 系统复杂度提高:引入 mq 需要考虑消息是否重复消费,确保消息不丢失,还要确保消息的顺序性
- 数据不一致性:如果一个数据被重复消费,破坏了幂等性,也就发生了数据的不一致性
这次来介绍一下 6 大模式以及用法。
简单模式 (点对点)
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="img"
模式特点
- 一个生产者,一个消费者
- 默认使用 direct 交换机
初始化两个队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Component public class RabbitmqSimpleConfig {
@Bean public Queue simpleSmsQueueInit() { return new Queue("sms_simple_queue", true); }
@Bean public Queue simpleEmailQueueInit() { return new Queue("email_simple_queue", true); }
}
|
声明消费者并监听刚刚创建的 2 个队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Component @Log4j2 public class ConsumerSimple {
@RabbitListener(queues = {"sms_simple_queue"}) public void getSmsConsumer(String message) { log.info("短信消费者接收到信息:" + message); }
@RabbitListener(queues = {"email_simple_queue"}) public void getEmailConsumer(String message) { log.info("邮箱消费者接收到信息:" + message); } }
|
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="点击并拖拽以移动"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Component @Log4j2 public class Producer {
@Autowired private RabbitTemplate rabbitTemplate;
public void simpleTest() { String code= UUID.randomUUID().toString(); rabbitTemplate.convertAndSend("sms_simple_queue",code); rabbitTemplate.convertAndSend("email_simple_queue",code); log.info("简单模式默认交换机已发出消息"); } }
|
运行测试类,发送成功
1 2 3 4 5 6 7 8 9 10 11 12
| @SpringBootTest class RabbitmqdemoApplicationTests {
@Autowired private Producer producer;
@Test void contextLoads() { producer.simpleTest(); }
}
|
控制台打印消息
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="img"
成功消费 2 条信息
工作模式
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="img"
模式特点
- 一个生产者,多个消费者
- 一条消息只能被一个 **** 消费者获取
- 默认使用 direct 交换机
假如说我们的请求量太大,队列堆积的消息太多,一个消费者忙不过来,我们可以增加一个消费者 c2. 来帮 c1 分担一些任务。默认使用轮询策略。简单的说,工作模式就是多个消费者监听一个生产者
我们在刚刚的简单模式基础上我们将消费者里,多加一个监听 sms 的消费者。使用 sms 一个队列来测试工作模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Component @Log4j2 public class ConsumerSimple {
@RabbitListener(queues = {"sms_simple_queue"}) public void getSmsConsumer(String message) { log.info("短信消费者1号接收到信息:" + message); }
@RabbitListener(queues = {"sms_simple_queue"}) public void getSmsConsumer2(String message) { log.info("短信消费者2号接收到信息:" + message); }
@RabbitListener(queues = {"email_simple_queue"}) public void getEmailConsumer(String message) { log.info("邮箱消费者接收到信息:" + message); } }
|
生产者中我们发送 50 条消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Component @Log4j2 public class Producer {
@Autowired private RabbitTemplate rabbitTemplate;
public void simpleTest() { String code= UUID.randomUUID().toString(); for (int i = 0; i < 6; i++) { rabbitTemplate.convertAndSend("sms_simple_queue",code); } log.info("简单模式默认交换机已发出消息"); }
}
|
启动测试类进行测试,由于 mq 处理太快。有 2 条信息在测试类里打印出来了。不影响我们的结果
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="img"
可以看到 6 条消息被轮询的分发到了 2 个消费者当中data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="img"
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="点击并拖拽以移动"
发布订阅模式 / 广播模式 (fanout)
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="img"
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="点击并拖拽以移动"
模式特点
- 一个生产者,多个消费者
- 核心是 fanout 交换机
- 流程为:生产者生产消息 -> 交换机收到消息并转发给对应的绑定队列 -> 队列收到消息
一个生产者发出的消息会被多个消费者获取,平时使用频率也非常高。后面的 2 种模式都是在发布订阅模式上扩展的
首先我们创建 2 个队列,分别是消息的队列和邮箱队列 创建一个 fanout 交换机,将这两个队列绑定到交换机上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Component public class RabbitmqConfig {
@Bean public FanoutExchange fanoutExchangeInit() { return new FanoutExchange("fanout_exchange", true,false); }
@Bean public Queue smsQueueInit() { return new Queue("sms_queue", true); }
@Bean public Queue emailQueueInit() { return new Queue("email_queue", true); }
@Bean public Binding bindingSmsQueue(Queue smsQueueInit, FanoutExchange fanoutExchange) { return BindingBuilder.bind(smsQueueInit).to(fanoutExchange); }
@Bean public Binding bindingEmailQueue(Queue emailQueueInit, FanoutExchange fanoutExchange) { return BindingBuilder.bind(emailQueueInit).to(fanoutExchange); } }
|
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="点击并拖拽以移动"
通过控制台可以看到交换机绑定了 2 个队列
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="img"
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="点击并拖拽以移动"
生产者创建一个队列并发送一条消息。第一个参数是交换机名称,第二个 routingKey 设置为””,因为发布订阅不需要指定路由的 key。第三个参数为要发送的消息内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Component @Log4j2 public class Producer {
@Autowired private RabbitTemplate rabbitTemplate;
public void fanoutTest() { String code= UUID.randomUUID().toString(); rabbitTemplate.convertAndSend("fanout_exchange","",code); log.info("交换机已发出消息"); }
}
|
消费者使用 spring 提供好的注解分别监听 2 个队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Component @Log4j2 public class Consumer {
@RabbitListener(queues = {"sms_queue"}) public void getSmsConsumer(String message) { log.info("短信消费者接收到信息:" + message); }
@RabbitListener(queues = {"email_queue"}) public void getEmailConsumer(String message) { log.info("邮箱消费者接收到信息:" + message); } }
|
最后,使用测试类来调用生产者
1 2 3 4 5 6 7 8 9 10 11 12
| @SpringBootTest class RabbitmqdemoApplicationTests {
@Autowired private Producer producer;
@Test void contextLoads() { producer.fanoutTest(); }
}
|
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="点击并拖拽以移动"
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="img"
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="点击并拖拽以移动"
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="img"
发送成功,并且成功监听到消息
路由模式 (Direct)
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="img"
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="点击并拖拽以移动"
模式特点
- 一个生产者,多个消费者
- 使用 direct 交换机
- 队列绑定交换机的时候必须指定 RoutingKey
- 发送消息的时候必须指定 RoutingKey
- RoutingKey 可以重复
发布订阅模式是将消息转发给所有绑定的队列,而路由则会根据 RoutingKey (路由 key) 来匹配完全一致的 BindingKey 来完成转发。
这里我花了一幅图来帮助大家理解。我们有 ABC 三个队列,三个队列的 routingKey 分别为 sms,email,phone。这个时候我们的路由交换机有一条消息,这个消息指定了路由 key 为 phone。那么这条消息只有路由 key 为 phone 的队列 C 会收到。
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="img"
代码实现也非常简单。
首先是生产者,初始化一个 Direct 类型的交换机。创建 2 个队列并且绑定到交换机上。和发布订阅模式不一样的是多了一个 with,用来设置 RoutingKey。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Component public class RabbitmqDirectConfig {
@Bean public DirectExchange directExchangeInit() { return new DirectExchange("direct_exchange", true,false); }
@Bean public Queue directSmsQueueInit() { return new Queue("sms_direct_queue", true); }
@Bean public Queue directEmailQueueInit() { return new Queue("email_direct_queue", true); }
@Bean public Binding directBindingSmsQueue(Queue directSmsQueueInit, DirectExchange directExchange) { return BindingBuilder.bind(directSmsQueueInit).to(directExchange).with("sms"); }
@Bean public Binding directBindingEmailQueue(Queue directEmailQueueInit, DirectExchange directExchange) { return BindingBuilder.bind(directEmailQueueInit).to(directExchange).with("email"); } }
|
消费者 进行监听
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Component @Log4j2 public class ConsumerDirect {
@RabbitListener(queues = {"sms_direct_queue"}) public void getSmsConsumer(String message) { log.info("路由模式短信消费者接收到信息:" + message); }
@RabbitListener(queues = {"email_direct_queue"}) public void getEmailConsumer(String message) { log.info("路由模式邮箱消费者接收到信息:" + message); } }
|
定义测试方法,只对 RoutingKey 为 sms 的消息队列进行转发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Component @Log4j2 public class Producer {
@Autowired private RabbitTemplate rabbitTemplate;
public void directTest() { String code= UUID.randomUUID().toString(); rabbitTemplate.convertAndSend("direct_exchange","sms",code); log.info("路由交换机已发出消息"); } }
|
启动测试类,可以看到只有 sms 队列收到了消息。email 没有反应,证明我们的路由成功了。
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="img"
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="点击并拖拽以移动"
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="img"
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="点击并拖拽以移动"
主题模式 (通配符模式)
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="img"
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="点击并拖拽以移动"
模式特点
- 和路由模式基础上增加了 使用通配符来进行 RoutingKey 匹配
- 使用 topic 交换机
- * 匹配 1 个词
- #匹配 0/1 / 多个词
主题模式也叫通配符模式。通配符有 <#> 和 <> #代表 0 级,1 级或多级。代表 1 级 设置通配符其实就是设置我们的 RoutingKey,使用 topic 交换机来进行规则匹配。
如下图,我们的规则设置为
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="img"
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="点击并拖拽以移动"
email 队列匹配 *.email.*,那么我们的交换机转发消息时的 RoutingKey 只能为 xxx.email.xxx (xxx 可以为任意长度),必须是有且只能有 1 级。比如 com.email.com,这个规则就可以被正确的转发到 email 队列中
1
| rabbitTemplate.convertAndSend("topic_exchange","com.email.com",code);
|
sms 队列匹配 #.sms.# #号是任意级,可以为 0,可以为 1,可以为多个。以下 6 种规则全部可以转发到 sms 队列中
1 2 3 4 5 6
| rabbitTemplate.convertAndSend("topic_exchange","sms",code); rabbitTemplate.convertAndSend("topic_exchange","com.sms",code); rabbitTemplate.convertAndSend("topic_exchange","com.sms.com",code); rabbitTemplate.convertAndSend("topic_exchange","com.com.sms.com",code); rabbitTemplate.convertAndSend("topic_exchange","com.sms.com.com",code); rabbitTemplate.convertAndSend("topic_exchange","com.com.com.com.sms.com",code);
|
有 6 条待处理消息
代码实现也很简单,首先依旧是配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Component public class RabbitmqTopicConfig {
@Bean public TopicExchange topicExchangeInit() { return new TopicExchange("topic_exchange", true,false); }
@Bean public Queue topicSmsQueueInit() { return new Queue("sms_topic_queue", true); }
@Bean public Queue topicEmailQueueInit() { return new Queue("email_topic_queue", true); }
@Bean public Binding topicBindingSmsQueue(Queue topicSmsQueueInit, TopicExchange topicExchangeInit) { return BindingBuilder.bind(topicSmsQueueInit).to(topicExchangeInit).with("#.sms.#"); }
@Bean public Binding topicBindingEmailQueue(Queue topicEmailQueueInit, TopicExchange topicExchangeInit) { return BindingBuilder.bind(topicEmailQueueInit).to(topicExchangeInit).with("*.email.*"); } }
|
绑定好交换机后记得绑定好路由规则 RoutingKey
消费者监听队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Component @Log4j2 public class ConsumerTopic {
@RabbitListener(queues = {"sms_topic_queue"}) public void getSmsConsumer(String message) { log.info("主题模式短信消费者接收到信息:序号:"+ message); }
@RabbitListener(queues = {"email_topic_queue"}) public void getEmailConsumer(String message) { log.info("主题模式邮箱消费者接收到信息:" + message); } }
|
生产者输入交换机名,输入 RoutingKey 发消息到交换机中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Component @Log4j2 public class Producer {
@Autowired private RabbitTemplate rabbitTemplate;
public void topicTest() { String code= UUID.randomUUID().toString(); rabbitTemplate.convertAndSend("topic_exchange","com.comcomcom.sms.sora33.33",code); rabbitTemplate.convertAndSend("topic_exchange","com.email.sora33",code); log.info("主题交换机已发出消息"); } }
|
测试类进行测试
1 2 3 4 5 6 7 8 9 10 11 12
| @SpringBootTest class RabbitmqdemoApplicationTests {
@Autowired private Producer producer;
@Test void contextLoads() { producer.topicTest(); }
}
|
因为我们的两个路由 RoutingKey 都可以匹配到对应的队列,所以成功路由到对应的队列当中进行消费
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="img"
data:image/s3,"s3://crabby-images/1c4fb/1c4fb4a004ac374ae735c210f8560be0dce354ac" alt="点击并拖拽以移动"
6.PRC 模式
远程调用,很少使用。后续补充…