rabbitmq使用笔记
docker安装rabbitmq
1. 拉取镜像
1 | docker pull rabbitmq:3.8.7-management |
1.1 启动
1 |
|
1.2 配置信息
1 |
|
1.direct模式 根据路由键匹配,同一个消息只会被同个队列同个消费者消息。
发送失败策略关注:publisher-returns、publisher-confirm-type、template.mandatory
1 | /** |
NONE值是禁用发布确认模式,是默认值
CORRELATED值是发布消息成功到交换器后会触发回调方法
SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
SIMPLE是在发送消息后,设置等待时间,同步返回发送结果
1.3 消费者手动确认、重试
只有在消费过程中抛出了异常,才会触发重试,消息才会进入到配置的error队列。此时队列中的消息还是存在的,是unAck状态,
如果此时配置了prefetch,假如配置的是2,当unAck条数达到2后,后面的消息都不会被消费了,会一直堵着
如果重启mq,会消费到一次这条消息,error队列中也存在这条消息。
如果只是basicNack(),消息不会触发本地重试,此时如果配置了死信队列,则进入死信队列,否则直接丢弃消息。
basicReject(),此时如果配置了死信队列,则进入死信队列,否则直接丢弃消息。
1 | try { |
在捕获异常后又抛出,本地测试可以让异常消息从队列中移除到error队列中。
不再次抛出时,basicNack不重新放回队列,消息直接丢弃,也不会阻塞。
消费时如果开启手动,在basicAck等应答之前触发了异常,没有应答消息,消息还是会存在队列里面,状态是unAcked,配置了error时也会进入到error队列,
1.4 死信队列、队列超时、消息超时
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
1.消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false 2.消息是一个过期消息,超时无人消费 3.要投递的队列消息满了,无法投递
如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中
在消费死信队列(DLX)中的消息时,如果最后调用 basicNack() 方法,并且不将消息重新放入队列(即设置 requeue=false),
那么该消息将最终被丢弃。因为它已经是一个死信,即无法被正常消费的消息,所以RabbitMQ不会再次尝试将其投递给任何队列。
DLX:(dead-letter-exchange的缩写)死信队列交换机
DLK:(dead-letter-routing-key的缩写)死信队列routingKey
一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况: 消息所在的队列设置了超时时间 消息本身设置了超时时间
利用插件,实现延迟队列 在申明延迟队列时需要注意,directExchange.setDelayed(true)
在发送延迟消息时,一直会收到失败的回调,原因如下:
如果配置了发送回调ReturnCallback,rabbitmq_delayed_message_exchange插件会回调该方法,因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期/时间到了才会发往队列。
所以如果是延迟队列的交换器,则直接放过,并不是bug
1 | public String send(Integer seconds){ |
消息变成死信的情况:
- 消息被拒绝,并且设置requeue参数为false(basicNack\basicReject)
- 消息过期(队列ttl、消息ttl)
- 队列达到最大长度。
- springboot设置死信队列
队列最大长度可配置,命令式配置、代码配置
在声明期间使用 x-arguments 定义最大队列长度
可以通过为队列声明参数 x-max-length 提供一个非负整数值来设置最大消息数。
可以通过为队列声明参数 x-max-length-bytes 提供一个非负整数值,设置最大字节长度。
如果设置了两个参数,那么两个参数都将适用;无论先达到哪个限制,都将强制执行。
溢出行为可以通过向队列声明参数 x-overflow 提供字符串值来设置。可能的值是 drop-head (默认)或 reject-publish
消息堆积
增加消费者数量,增加并发数,分析堆积原因,修改消息分发策略,降低消费能力弱的预处理数
消息丢失
队列和交换机都需要设置为持久化,异常的消息最好能进入单独的队列或者进入死信队列
重复消费
借助mysql、redis先存在消费时再查询,redis的setIfExist等
消息分发策略
公平分发:
在不配置listener.simple.prefetch时消息为轮询分发,默认是250条。
非公平分发:
当prefetch配置为1时,消费越快,处理的消息越多。
basicQos、prefetch
prefetch:
prefetch允许为每个consumer指定最大的unacked messages数目。简单来说就是用来指定一个consumer一次可以从Rabbit中获取多少条message并缓存在client中(RabbitMQ提供的各种语言的client library)。一旦缓冲区满了,Rabbit将会停止投递新的message到该consumer中直到它发出ack。
假设prefetch值设为10,共有两个consumer。意味着每个consumer每次会从queue中预抓取 10 条消息到本地缓存着等待消费。同时该channel的unacked数变为20。而Rabbit投递的顺序是,先为consumer1投递满10个message,再往consumer2投递10个message。如果这时有新message需要投递,先判断channel的unacked数是否等于20,如果是则不会将消息投递到consumer中,message继续呆在queue中。之后其中consumer对一条消息进行ack,unacked此时等于19,Rabbit就判断哪个consumer的unacked少于10,就投递到哪个consumer中。
总的来说,consumer负责不断处理消息,不断ack,然后只要unacked数少于prefetch * consumer数目,broker就不断将消息投递过去。
prefetch的设置与以下几点有关:
1.客户端服务端之间网络传输时间
2.consumer消耗一条消息所执行的业务逻辑的耗时
3.网络状况
配置文件中设置后对所有消费者生效
1 | listener: |
对单个消费者限制,需要定义一个ContainerFactory
并且在@RabbitListener
中指定
1 |
|
对于同一个消费者,单个broker下面只能配置一次containerFactory
并发
配置文件中
1 | listener: |
这种是对所有消费者生效的,对于单个消费者如果需要增加并发,concurrency = "1-3"
1 | "1-3") (queues = DirectExchangeConfig.DIRECT_QUEUE2,concurrency = |
优先级队列
在创建队列时
1 |
|
maxPriority
取值是0-255,100表示的是,在发送消息时可以设置100以内的优先级。
发送消息时
1 | rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, |
优先级只有在队列阻塞时才会排序,即消息还在ready状态,未发送到消费者客户端
messageId、correlationId
在Spring Boot集成RabbitMQ进行消息消费时,messageId和correlationId是两个不同的属性,它们在AMQP(Advanced Message Queuing Protocol)规范中有各自的定义和作用。
messageId:
messageId是消息的唯一标识符,用于在消息的生命周期中标识和跟踪消息。
它由消息的生产者设置,并且一旦设置后就不能更改。
messageId允许最多255个字节的UTF-8编码数据,并以未压缩的方式存储在Basic.Properties数据结构中。
messageId的主要用途是在消息系统中唯一地识别消息,特别是在消息流对系统中的各个组件进行耦合时,它使得消息能够在消息头中携带数据,从而唯一地识别该消息。
correlationId:
correlationId用于标识一组相关的消息,它主要用于请求/响应模式中的消息关联。
当一个消费者(或客户端)发送一个请求消息并期望得到一个响应时,它可以在请求消息中设置一个correlationId。
当响应消息返回时,响应消息也会包含相同的correlationId,这样消费者就可以将响应与原始请求关联起来。
correlationId也允许最多255个字节的UTF-8编码数据,并同样以未压缩的方式存储在Basic.Properties数据结构中。
在Spring Boot应用中,当使用RabbitMQ进行消息消费时,你可以通过API设置或获取messageId和correlationId。例如,在发送消息时,你可以使用rabbitTemplate的convertAndSend方法,并在Message对象中设置messageId和correlationId。同样,在接收消息时,你可以从接收到的Message对象中获取这些属性。
需要注意的是,虽然messageId和correlationId都是消息属性,但它们的用途和设置方式有所不同。messageId主要用于唯一标识消息,而correlationId则用于标识相关的一组消息,特别是在请求/响应模式中。
内存预警、磁盘预警、内存换页
内存预警:
默认内存阈值是0.4,当已使用内存高于系统内存*0.4
后,会触发系统预警,阻塞客户端链接,停止心跳检查
有两种配置方式
阀值设定(默认0.4)
具体内存设定
1 | 命令式设置 |
磁盘预警
当磁盘剩余空间低于确定的阀值(默认50MB)时,RabbitMQ同样会阻塞生产者,目的是避免填充整个磁盘,导致Rabbit中断,可避免因非持久化的消息持续换页而耗尽磁盘空间导致服务器奔溃。
这个阀值可以减小,但不能完全消除因磁盘耗尽而导致奔溃的可能性,比如在两次磁盘空间检查空隙内,第一次检查时:60MB,第二次检查可能就是1MB,就会出现警告
1 |
|
与内存空间不同的是,磁盘预警的触发机制是:低于下限模式,而内存预警是,高于上限模式
内存换页
磁盘空间远远大于内存空间,因此需要进行资源的置换,不可能等到内存空间触及达到0.4阀值的时候,再把消息写入到磁盘中去
在某个 Broker 节点及内存阻塞生产者之前,它会尝试将队列中的消息,换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入磁盘,其中持久化的消息本身就在磁盘中有一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。
➳ 结论:为了避免内存空间爆满 和 消息的持久化,会将内存中的消息定时写入到磁盘中去。
★ 触发换页机制
默认情况下,内存达到50% 的阈值时就会换页处理,也就是说,在默认情况下该内存的阈值是 0.4 的情况下,当内存超过 0.2 时,就会进行换页动作。
比如:电脑1000MB 内存,内存阀值为0.4,配置的换页阀值是0.5,rabbit 最大能够使用的内存空间是 1000*0.4=400MB,由于配置的换页内存为 0.5,因此使用率在达到极限400MB之前,会把内存中的 200MB 进行转移到磁盘中,从而达到稳健的运行。
★ 配置换页值
可通过conf配置文件设置换页阀值
1 | vm_memory_high_watermark.ralative = 0.4 |
思考:为什么设置小于1的值,如果设置为1,内部都已经达到极限了,再去换页意义不是很大了
消息过期时间
通过两种设置实现
队列设置过期时间,队列中的所有消息都有同样的过期时间。
消息设置过期时间,指定一条消息的过期时间。
如果同时指定了Message TTL和Queue TTL,则优先时间较小的那一个:
1 |
|
1 | properties1.setExpiration("6000"); |
消息推送到队列后,如果指定时间内没有被消费,则会自动过期。上面我绑定了死信队列,过期了消息会在里面。否则消息过期会被丢弃。
注意:
RabbitMQ只会对队列头部的消息进行过期淘汰。如果单独给消息设置TTL,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短。会造成消息不会及时地过期淘汰,导致消息的堆积。
两种设置有效期的删除策略是不同的:
- 通过队列设置的,一旦消息过期,就会从队列中抹去,因为过期的消息肯定在队列头部,RabbitMQ只需要定期处理头部过期消息即可。
- 而单独设置有效期的,如果要删除则需要遍历整个队列,所以采取消费时判定是否过期处理删除
x-expires、x-message-ttl区别
x-expires
队列有效期
用来控制队列在指定时间未被使用过后删除,未被使用包括以下三点:
- 没有任何消费者
- 未被重新声明过期时间
- 未调用过Basic.Get命令
x-message-ttl
消息有效期
异常
1 | ERROR 32664 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'directQueue2' in vhost '/': received the value 'dl.direct' of type 'longstr' but current is none, class-id=50, method-id=10) |
解决方法:
未安装延迟队列插件/安装后需要重新绑定交换机
1 | 【Rabbitmq】【02】确认消息 This operation is only available within the scope of an invoke operation |
解决方法:
使用invoke调用
1 | boolean res=rabbitTemplate.invoke(t->{ |
- 本文标题:rabbitmq使用笔记
- 本文作者:HeRui
- 本文链接:https://hr2812.cn/2024/02/20/about-rabbitmq-notes/
- 版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!