rabbitmq使用笔记

docker安装rabbitmq

1. 拉取镜像

1
docker pull rabbitmq:3.8.7-management

1.1 启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

docker run -d --name rbbitmq3.8.7 -p 5672:5672 -p 15672:15672 -v /server/app/rabbitmq/ --hostname node2 -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin 5726af297dd4



docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --hostname node2 5726af297dd4


docker cp rbbitmq3.8.7:/etc/rabbitmq C:\apps\rabbitmq\conf
docker cp rbbitmq3.8.7:/var/log/rabbitmq C:\apps\rabbitmq\log
docker cp rbbitmq3.8.7:/var/lib/rabbitmq C:\apps\rabbitmq\data



docker run -d -p 5672:5672 -p 15672:15672 -v C:\apps\rabbitmq\data:/var/lib/rabbitmq -v C:\apps\rabbitmq\conf:/etc/rabbitmq -v C:\apps\rabbitmq\log:/var/log/rabbitmq --name rabbitmq --hostname node2 5726af297dd4


docker cp C:\Users\76065\Downloads\rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq:/opt/rabbitmq/plugins


rabbitmq-plugins enable rabbitmq_delayed_message_exchange

1.2 配置信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

spring:
rabbitmq:
username: guest
password: guest
host: 127.0.0.1
port: 5672
virtual-host: /
# 生产者配置
# 消息发送失败后返回,开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
# rabbitTemplate.setReturnCallback
publisher-returns: true
# 发送者确认,开启后,成功返回ack,失败返回nack
# simple:同步等待confirm结果直到超时,correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-confirm-type: correlated
# 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
template:
mandatory: true

#消费者配置
listener:
simple:
#manual:手动ack,需要在业务代码结束后,调用api发送ack。
#auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
#none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
acknowledge-mode: manual
retry:
# 在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。
# 在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
# RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
# ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
# RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
# 比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,
# 后续由人工集中处理。注意这种入错误队列的方式,必须要先抛出异常触发重试,重试耗尽后才会进入错误队列。
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初始的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

1.direct模式 根据路由键匹配,同一个消息只会被同个队列同个消费者消息。
发送失败策略关注:publisher-returns、publisher-confirm-type、template.mandatory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
    /**
* The type of publisher confirms to use.
*/
public enum ConfirmType {

/**
* Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
* within scoped operations.
*/
SIMPLE,

/**
* Use with {@code CorrelationData} to correlate confirmations with sent
* messsages.
*/
CORRELATED,

/**
* Publisher confirms are disabled (default).
*/
NONE
}

NONE值是禁用发布确认模式,是默认值
CORRELATED值是发布消息成功到交换器后会触发回调方法
SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
SIMPLE是在发送消息后,设置等待时间,同步返回发送结果

1.3 消费者手动确认、重试

只有在消费过程中抛出了异常,才会触发重试,消息才会进入到配置的error队列。此时队列中的消息还是存在的,是unAck状态,
如果此时配置了prefetch,假如配置的是2,当unAck条数达到2后,后面的消息都不会被消费了,会一直堵着
如果重启mq,会消费到一次这条消息,error队列中也存在这条消息。

如果只是basicNack(),消息不会触发本地重试,此时如果配置了死信队列,则进入死信队列,否则直接丢弃消息。

basicReject(),此时如果配置了死信队列,则进入死信队列,否则直接丢弃消息。

1
2
3
4
5
6
try {
int i = 1/0;
}catch (Exception e){
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
throw e;
}

在捕获异常后又抛出,本地测试可以让异常消息从队列中移除到error队列中。
不再次抛出时,basicNack不重新放回队列,消息直接丢弃,也不会阻塞。
消费时如果开启手动,在basicAck等应答之前触发了异常,没有应答消息,消息还是会存在队列里面,状态是unAcked,配置了error时也会进入到error队列,

1.4 死信队列、队列超时、消息超时

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

1.消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false 2.消息是一个过期消息,超时无人消费 3.要投递的队列消息满了,无法投递

如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中

在消费死信队列(DLX)中的消息时,如果最后调用 basicNack() 方法,并且不将消息重新放入队列(即设置 requeue=false),
那么该消息将最终被丢弃。因为它已经是一个死信,即无法被正常消费的消息,所以RabbitMQ不会再次尝试将其投递给任何队列。
DLX:(dead-letter-exchange的缩写)死信队列交换机
DLK:(dead-letter-routing-key的缩写)死信队列routingKey

一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况: 消息所在的队列设置了超时时间 消息本身设置了超时时间

利用插件,实现延迟队列 在申明延迟队列时需要注意,directExchange.setDelayed(true)
在发送延迟消息时,一直会收到失败的回调,原因如下:

如果配置了发送回调ReturnCallback,rabbitmq_delayed_message_exchange插件会回调该方法,因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期/时间到了才会发往队列。
所以如果是延迟队列的交换器,则直接放过,并不是bug

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public String send(Integer seconds){
DateTime dateTime=DateUtil.offsetSecond(new Date(),seconds);
String msg="通知时间("+DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss")+"),通知内容("+DateUtil.formatDateTime(dateTime)+"召开会议)";
Message message=MessageBuilder.withBody(msg.getBytes()).build();
// RabbitMQ只会检查队列头部的消息是否过期,如果过期就放到死信队列
// 假如第一个过期时间很长,10s,第二个消息3s,则系统先看第一个消息,等 到第一个消息过期,放到DLX
// 此时才会检查第二个消息,但实际上此时第二个消息早已经过期了,但是并没 有先于第一个消息放到DLX。
// 插件rabbitmq_delayed_message_exchange帮我们搞定这个。
MessageProperties messageProperties=message.getMessageProperties();
// 设置到期时间,也就是提前10s提醒
messageProperties.setDelay((seconds-10)*1000);
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_DELAY,RabbitMQConstant.ROUTING_KEY_DELAY,message,new CorrelationData(UUID.randomUUID().toString().replaceAll("-","")));
return seconds+"秒后召开会议,已经定好闹钟了,到时提前告诉大家";
}

消息变成死信的情况:

  1. 消息被拒绝,并且设置requeue参数为false(basicNack\basicReject)
  2. 消息过期(队列ttl、消息ttl)
  3. 队列达到最大长度。
  4. springboot设置死信队列

队列最大长度可配置,命令式配置、代码配置
在声明期间使用 x-arguments 定义最大队列长度
可以通过为队列声明参数 x-max-length 提供一个非负整数值来设置最大消息数。
可以通过为队列声明参数 x-max-length-bytes 提供一个非负整数值,设置最大字节长度。
如果设置了两个参数,那么两个参数都将适用;无论先达到哪个限制,都将强制执行。
溢出行为可以通过向队列声明参数 x-overflow 提供字符串值来设置。可能的值是 drop-head (默认)或 reject-publish

消息堆积

增加消费者数量,增加并发数,分析堆积原因,修改消息分发策略,降低消费能力弱的预处理数

消息丢失

队列和交换机都需要设置为持久化,异常的消息最好能进入单独的队列或者进入死信队列

重复消费

借助mysql、redis先存在消费时再查询,redis的setIfExist等

消息分发策略

公平分发:
在不配置listener.simple.prefetch时消息为轮询分发,默认是250条。
非公平分发:
当prefetch配置为1时,消费越快,处理的消息越多。

basicQos、prefetch

prefetch:
prefetch允许为每个consumer指定最大的unacked messages数目。简单来说就是用来指定一个consumer一次可以从Rabbit中获取多少条message并缓存在client中(RabbitMQ提供的各种语言的client library)。一旦缓冲区满了,Rabbit将会停止投递新的message到该consumer中直到它发出ack。
假设prefetch值设为10,共有两个consumer。意味着每个consumer每次会从queue中预抓取 10 条消息到本地缓存着等待消费。同时该channel的unacked数变为20。而Rabbit投递的顺序是,先为consumer1投递满10个message,再往consumer2投递10个message。如果这时有新message需要投递,先判断channel的unacked数是否等于20,如果是则不会将消息投递到consumer中,message继续呆在queue中。之后其中consumer对一条消息进行ack,unacked此时等于19,Rabbit就判断哪个consumer的unacked少于10,就投递到哪个consumer中。
总的来说,consumer负责不断处理消息,不断ack,然后只要unacked数少于prefetch * consumer数目,broker就不断将消息投递过去。
prefetch的设置与以下几点有关:
1.客户端服务端之间网络传输时间
2.consumer消耗一条消息所执行的业务逻辑的耗时
3.网络状况
配置文件中设置后对所有消费者生效

1
2
3
listener:
simple:
prefetch: 250 #1

对单个消费者限制,需要定义一个ContainerFactory并且在@RabbitListener中指定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Autowired
private CachingConnectionFactory cachingConnectionFactory;

@Bean(name = "myContainerFactory")
public SimpleRabbitListenerContainerFactory myContainerFactory(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setPrefetchCount(100);
return factory;
}

@RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE2,concurrency = "1-3",containerFactory = "myContainerFactory")
public void queue2Customer(String content, Message message, Channel channel) throws Exception {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

对于同一个消费者,单个broker下面只能配置一次containerFactory

并发

配置文件中

1
2
3
4
5
6
listener:
simple:
#最多消费者数量
max-concurrency: 3
#最少消费者数量
concurrency: 1

这种是对所有消费者生效的,对于单个消费者如果需要增加并发,concurrency = "1-3"

1
2
3
4
@RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE2,concurrency = "1-3")
public void queue2Customer4(String content, Message message, Channel channel) throws Exception {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

优先级队列

在创建队列时

1
2
3
4
5
@Bean
public Queue directQueue() {
return QueueBuilder.durable(DIRECT_QUEUE)
.maxPriority(100).build();
}

maxPriority取值是0-255,100表示的是,在发送消息时可以设置100以内的优先级。
发送消息时

1
2
3
4
5
6
7
8
9
10
11
12
rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE,
key, msg+"_" + i, message -> {
MessageProperties properties1 = message.getMessageProperties();
String msgId = "_" + System.currentTimeMillis();
properties1.setMessageId(msgId);
if(first){
properties1.setPriority(10);
}else{
properties1.setPriority(1);
}
return message;
}, correlationData);

优先级只有在队列阻塞时才会排序,即消息还在ready状态,未发送到消费者客户端

messageId、correlationId

在Spring Boot集成RabbitMQ进行消息消费时,messageId和correlationId是两个不同的属性,它们在AMQP(Advanced Message Queuing Protocol)规范中有各自的定义和作用。

messageId:
messageId是消息的唯一标识符,用于在消息的生命周期中标识和跟踪消息。
它由消息的生产者设置,并且一旦设置后就不能更改。
messageId允许最多255个字节的UTF-8编码数据,并以未压缩的方式存储在Basic.Properties数据结构中。
messageId的主要用途是在消息系统中唯一地识别消息,特别是在消息流对系统中的各个组件进行耦合时,它使得消息能够在消息头中携带数据,从而唯一地识别该消息。
correlationId:
correlationId用于标识一组相关的消息,它主要用于请求/响应模式中的消息关联。
当一个消费者(或客户端)发送一个请求消息并期望得到一个响应时,它可以在请求消息中设置一个correlationId。
当响应消息返回时,响应消息也会包含相同的correlationId,这样消费者就可以将响应与原始请求关联起来。
correlationId也允许最多255个字节的UTF-8编码数据,并同样以未压缩的方式存储在Basic.Properties数据结构中。
在Spring Boot应用中,当使用RabbitMQ进行消息消费时,你可以通过API设置或获取messageId和correlationId。例如,在发送消息时,你可以使用rabbitTemplate的convertAndSend方法,并在Message对象中设置messageId和correlationId。同样,在接收消息时,你可以从接收到的Message对象中获取这些属性。

需要注意的是,虽然messageId和correlationId都是消息属性,但它们的用途和设置方式有所不同。messageId主要用于唯一标识消息,而correlationId则用于标识相关的一组消息,特别是在请求/响应模式中。

内存预警、磁盘预警、内存换页

内存预警:

默认内存阈值是0.4,当已使用内存高于系统内存*0.4后,会触发系统预警,阻塞客户端链接,停止心跳检查
有两种配置方式
阀值设定(默认0.4)
具体内存设定

设置内存阀值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 命令式设置
# 1.内存阀值设置
sudo rabbitmqctl set_vm_memory_high_watermark [建议0.4到0.7之间]

# 2.设置具体内存值:单位为:KB、MB、GB,根据需求所用
rabbitmqctl set_vm_memory_high_watermark absolute 50MB

# 配置文件设置
# 设置阀值:建议取值在0.4~0.7之间,不建议超过0.7
vm_memory_high_watermark.relative=0.4

# 设置具体值:单位为:KB、MB、GB
vm_memory_high_watermark.absolute=2GB

# 注意:两种方式二选一即可
磁盘预警

当磁盘剩余空间低于确定的阀值(默认50MB)时,RabbitMQ同样会阻塞生产者,目的是避免填充整个磁盘,导致Rabbit中断,可避免因非持久化的消息持续换页而耗尽磁盘空间导致服务器奔溃。
这个阀值可以减小,但不能完全消除因磁盘耗尽而导致奔溃的可能性,比如在两次磁盘空间检查空隙内,第一次检查时:60MB,第二次检查可能就是1MB,就会出现警告

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 命令方式设置
# 1.磁盘阀值设置
sudo rabbitmqctl set_disk_free_limit memory_limit [建议1.0到2.0之间]

# 2.设置具体内存值:单位为:KB、MB、GB,根据需求所用
sudo rabbitmqctl set_disk_free_limit 20GB

# 注意:以上两种方式二选一即可

# 配置文件设置
# 设置阀值:建议取值在1.0~2.0之间
disk_free_limit.relative=2.0

# 设置具体值:单位为:KB、MB、GB
disk_free_limit.absolute=2GB

# 注意:两种方式二选一即可

与内存空间不同的是,磁盘预警的触发机制是:低于下限模式,而内存预警是,高于上限模式

内存换页

磁盘空间远远大于内存空间,因此需要进行资源的置换,不可能等到内存空间触及达到0.4阀值的时候,再把消息写入到磁盘中去
在某个 Broker 节点及内存阻塞生产者之前,它会尝试将队列中的消息,换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入磁盘,其中持久化的消息本身就在磁盘中有一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。

➳ 结论:为了避免内存空间爆满 和 消息的持久化,会将内存中的消息定时写入到磁盘中去。

★ 触发换页机制
默认情况下,内存达到50% 的阈值时就会换页处理,也就是说,在默认情况下该内存的阈值是 0.4 的情况下,当内存超过 0.2 时,就会进行换页动作。

比如:电脑1000MB 内存,内存阀值为0.4,配置的换页阀值是0.5,rabbit 最大能够使用的内存空间是 1000*0.4=400MB,由于配置的换页内存为 0.5,因此使用率在达到极限400MB之前,会把内存中的 200MB 进行转移到磁盘中,从而达到稳健的运行。

★ 配置换页值
可通过conf配置文件设置换页阀值

1
2
vm_memory_high_watermark.ralative = 0.4
vm_memory_high_watermark_paging_ratio = 0.7(设置值小于1)

思考:为什么设置小于1的值,如果设置为1,内部都已经达到极限了,再去换页意义不是很大了

消息过期时间

通过两种设置实现
队列设置过期时间,队列中的所有消息都有同样的过期时间。
消息设置过期时间,指定一条消息的过期时间。
如果同时指定了Message TTL和Queue TTL,则优先时间较小的那一个:

1
2
3
4
5
6
7
8
9
10
@Bean
public Queue expire2MinsQueue(){
return QueueBuilder.durable(EXPIRE1Mins)
//消息10s未被消费则过期
.ttl(10*1000)
.deadLetterExchange("dl.direct")
//到死信队列dl2
.deadLetterRoutingKey("dl2")
.build();
}
1
properties1.setExpiration("6000");

消息推送到队列后,如果指定时间内没有被消费,则会自动过期。上面我绑定了死信队列,过期了消息会在里面。否则消息过期会被丢弃。

注意:
RabbitMQ只会对队列头部的消息进行过期淘汰。如果单独给消息设置TTL,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短。会造成消息不会及时地过期淘汰,导致消息的堆积。

两种设置有效期的删除策略是不同的:

  1. 通过队列设置的,一旦消息过期,就会从队列中抹去,因为过期的消息肯定在队列头部,RabbitMQ只需要定期处理头部过期消息即可。
  2. 而单独设置有效期的,如果要删除则需要遍历整个队列,所以采取消费时判定是否过期处理删除

x-expires、x-message-ttl区别

x-expires
队列有效期
用来控制队列在指定时间未被使用过后删除,未被使用包括以下三点:

  1. 没有任何消费者
  2. 未被重新声明过期时间
  3. 未调用过Basic.Get命令

x-message-ttl
消息有效期

异常

1
ERROR 32664 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'directQueue2' in vhost '/': received the value 'dl.direct' of type 'longstr' but current is none, class-id=50, method-id=10)

解决方法:
未安装延迟队列插件/安装后需要重新绑定交换机

1
【Rabbitmq】【02】确认消息 This operation is only available within the scope of an invoke operation

解决方法:
使用invoke调用

1
2
3
4
boolean res=rabbitTemplate.invoke(t->{
t.convertAndSend("delay.direct",key,message,correlationData);
return t.waitForConfirms(10*1000);
});
评论

:D 一言句子获取中...