RabbitMQ
- 快速入门
- Spring AMQP
- 快速入门
- 一个队列绑定多个消费者
- 交换机
- 作用
- Direct 交换机
- Topic 交换机
- 代码声明队列和交换机
- 消息转换器
- 消息的可靠性
- 生产者的可靠性
- 生产者重连
- 生产者确认(尽量不要用)
- MQ 的可靠性
- 数据持久化
- Lazy Queue(惰性队列)
- 消费者可靠性
- 消费者确认
快速入门
创建用户(每个项目创建一个用户)
创建虚拟主机
创建队列
发送消息
Spring AMQP
快速入门
# 配置类spring:rabbitmq:host:192.168.16.128port:5672username:lwypassword:123456virtual-host:/lwy<!-- 导入依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>// 发送消息StringqueueName="mqtest1";// 队列名字Stringmessage="hello world";rabbitTemplate.convertAndSend(queueName,message);// 接收消息并处理@RabbitListener(queues="mqtest1")// 可对应多个队列publicvoidlisten(Stringmessage){System.out.println("mqtest1 收到消息:"+message);}一个队列绑定多个消费者
默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积(当消费者1 一秒处理 50 条消息,消费者 2 一秒处理 5 条消息,有 50 条消息的时候 mq 仍然会让消费者 1 处理 25 条,消费者 2 处理 25 条,就会导致消费者 1 处理完了不工作了,消费者 2 还在工作)
因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:
spring:rabbitmq:host:192.168.16.128port:5672username:lwypassword:123456virtual-host:/lwylistener:simple:prefetch:1交换机
作用
- 接收 publisher 发送的消息
- 将消息按照规则路由到与之绑定的队列
Fanout 交换机
FanoutExchange会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式
// 发送消息到交换机StringexchangeName="lwy.fanout";Stringmessage="hello world";rabbitTemplate.convertAndSend(exchangeName,"",message);Direct 交换机
DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的Routingkey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
StringexchangeName="lwy.direct";Stringmessage="blue";// 第二个参数是路由关键字rabbitTemplate.convertAndSend(exchangeName,"yellow",message);Topic 交换机
TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以“ . ”分割
Queue与Exchange指定BindingKey时可以使用通配符:
- #:代指0个或多个单词。例:china.#可以匹配 china.news、china.weather
- *:代指一个单词。china.*可以匹配 china.news 不可匹配 china.news.sports
代码声明队列和交换机
// 如果没有指定的队列和交换机会自动创建@RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct1"),exchange=@Exchange(name="lwy.direct",type=ExchangeTypes.DIRECT),key={"red","blue"}// 集合))publicvoiddirect1(Stringmessage)throwsInterruptedException{System.out.println("direct1 收到消息:"+message);}消息转换器
rabbitMQ 本身只接受和传输字节流,传输对象的时候会乱码,通常需要消息转换器自动序列化和反序列化
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}消息的可靠性
生产者的可靠性
生产者重连
有的时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:
spring:rabbitmq:connection-timeout:1s# 连接超时时间template:retry:enabled:true# 开启超时重试initial-interval:1000ms# 初始重试间隔时间multiplier:1# 重试间隔时间倍数max-attempts:3# 最大重试次数注意:
- 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。
- 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
生产者确认(尽量不要用)
RabbitMQ 有 PublisherConfirm和PublisherReturn两种确认机制。开启确认机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:
- 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功(路由失败一般是代码有问题)
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
- 其它情况都会返回NACK,告知投递失败
MQ 的可靠性
数据持久化
非持久的消息会存储到内存中,如果 mq 重启消息就会消失,持久化的会存到磁盘中
注意:如果开启了持久化和生产者确认,只有在消息 持久化完成后才会给生产者返回 ACK 回执
Lazy Queue(惰性队列)
特性:
- 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
@RabbitListener(queuesToDeclare=@Queue(name="lazy.queue",durable="true",// 持久化队列arguments=@Argument(name="x-queue-mode",value="lazy")// lazy队列))publicvoidlazy(Stringmessage){System.out.println("lazy.queue 收到消息:"+message);}消费者可靠性
消费者确认
当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
- ack:成功处理消息,RabbitMQ从队列中删除该消息
- nack:消息处理失败,RabbitMQ需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
可通过配置文件选择 ACK 处理方式
- none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
- manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
- auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack,当业务出现异常时:
- 如果是业务异常,会自动返回nack
- 如果是消息处理或校验异常,自动返回 reject
spring:rabbitmq:listener:simple:prefetch:1acknowledge-mode:none