RabbitMQ
2026/4/21 15:41:05 网站建设 项目流程

RabbitMQ

  • 快速入门
  • Spring AMQP
    • 快速入门
    • 一个队列绑定多个消费者
  • 交换机
    • 作用
    • Direct 交换机
    • Topic 交换机
    • 代码声明队列和交换机
    • 消息转换器
  • 消息的可靠性
    • 生产者的可靠性
      • 生产者重连
      • 生产者确认(尽量不要用)
    • MQ 的可靠性
      • 数据持久化
      • Lazy Queue(惰性队列)
    • 消费者可靠性
      • 消费者确认

快速入门

创建用户(每个项目创建一个用户)

创建虚拟主机

创建队列

发送消息


Spring AMQP

快速入门

# 配置类spring:rabbitmq:host:192.168.16.128port:5672username:lwypassword:123456virtual-host:/lwy
<!-- 导入依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
// 发送消息StringqueueName="mqtest1";// 队列名字Stringmessage="hello world";rabbitTemplate.convertAndSend(queueName,message);
// 接收消息并处理@RabbitListener(queues="mqtest1")// 可对应多个队列publicvoidlisten(Stringmessage){System.out.println("mqtest1 收到消息:"+message);}

一个队列绑定多个消费者

默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积(当消费者1 一秒处理 50 条消息,消费者 2 一秒处理 5 条消息,有 50 条消息的时候 mq 仍然会让消费者 1 处理 25 条,消费者 2 处理 25 条,就会导致消费者 1 处理完了不工作了,消费者 2 还在工作)
因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:

spring:rabbitmq:host:192.168.16.128port:5672username:lwypassword:123456virtual-host:/lwylistener:simple:prefetch:1

交换机

作用

  1. 接收 publisher 发送的消息
  2. 将消息按照规则路由到与之绑定的队列
    Fanout 交换机
    FanoutExchange会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式
// 发送消息到交换机StringexchangeName="lwy.fanout";Stringmessage="hello world";rabbitTemplate.convertAndSend(exchangeName,"",message);

Direct 交换机

DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的Routingkey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

StringexchangeName="lwy.direct";Stringmessage="blue";// 第二个参数是路由关键字rabbitTemplate.convertAndSend(exchangeName,"yellow",message);

Topic 交换机

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以“ . ”分割
Queue与Exchange指定BindingKey时可以使用通配符:

  • #:代指0个或多个单词。例:china.#可以匹配 china.news、china.weather
  • *:代指一个单词。china.*可以匹配 china.news 不可匹配 china.news.sports

代码声明队列和交换机

// 如果没有指定的队列和交换机会自动创建@RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct1"),exchange=@Exchange(name="lwy.direct",type=ExchangeTypes.DIRECT),key={"red","blue"}// 集合))publicvoiddirect1(Stringmessage)throwsInterruptedException{System.out.println("direct1 收到消息:"+message);}

消息转换器

rabbitMQ 本身只接受和传输字节流,传输对象的时候会乱码,通常需要消息转换器自动序列化和反序列化

<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}

消息的可靠性

生产者的可靠性

生产者重连

有的时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:

spring:rabbitmq:connection-timeout:1s# 连接超时时间template:retry:enabled:true# 开启超时重试initial-interval:1000ms# 初始重试间隔时间multiplier:1# 重试间隔时间倍数max-attempts:3# 最大重试次数

注意:

  • 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。
  • 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

生产者确认(尽量不要用)

RabbitMQ 有 PublisherConfirm和PublisherReturn两种确认机制。开启确认机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功(路由失败一般是代码有问题)
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

MQ 的可靠性

数据持久化

非持久的消息会存储到内存中,如果 mq 重启消息就会消失,持久化的会存到磁盘中
注意:如果开启了持久化和生产者确认,只有在消息 持久化完成后才会给生产者返回 ACK 回执

Lazy Queue(惰性队列)

特性:

  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储
@RabbitListener(queuesToDeclare=@Queue(name="lazy.queue",durable="true",// 持久化队列arguments=@Argument(name="x-queue-mode",value="lazy")// lazy队列))publicvoidlazy(Stringmessage){System.out.println("lazy.queue 收到消息:"+message);}

消费者可靠性

消费者确认

当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

可通过配置文件选择 ACK 处理方式

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack,当业务出现异常时:
    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或校验异常,自动返回 reject
spring:rabbitmq:listener:simple:prefetch:1acknowledge-mode:none

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询