SpringMQ发送者的可靠性
发送者的可靠性
首先,我们一起分析一下消息丢失的可能性有哪些。消息从发送者发送消息,到消费者处理消息,需要经过的流程是这样的:
消息从生产者到消费者的每一步都可能导致消息丢失:
-
发送消息时丢失:
-
生产者发送消息时连接MQ失败
-
生产者发送消息到达MQ后未找到
Exchange
-
生产者发送消息到达MQ的
Exchange
后,未找到合适的Queue
-
消息到达MQ后,处理消息的进程发生异常
-
-
MQ导致消息丢失:
- 消息到达MQ,保存到队列后,尚未消费就突然宕机
-
消费者处理消息时:
-
消息接收后尚未处理突然宕机
-
消息接收后处理过程中抛出异常
-
-
综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:
-
确保生产者一定把消息发送到MQ
-
确保MQ不会将消息弄丢
-
确保消费者一定要处理消息
这一章我们先来看如何确保生产者一定能把消息发送到MQ。
-
生产者重试机制
首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。
为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate
与MQ连接超时后,多次重试。
修改publisher
模块的application.yaml
文件,添加下面的内容:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /David
username: Spike
password: '#Alone117'
listener:
simple:
retry:
enabled: true # 开启超时重试机制
max-attempts: 5 # 最大重试次数
initial-interval: 2000ms # 失败后的初始等待时间
multiplier: 1.5 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-interval: 10000ms
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
connection-timeout: 1s # 设置MQ的连接超时时间
我们利用命令停掉RabbitMQ服务:
docker stop rabbitmq
然后测试发送一条消息,会发现会每隔1秒重试1次,总共重试了3次。消息发送的超时重试机制配置成功了!
2024-11-22T20:14:55.833+08:00 DEBUG 198162 --- [ main] o.s.retry.support.RetryTemplate : Retry: count=0
2024-11-22T20:14:55.836+08:00 INFO 198162 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2024-11-22T20:14:55.840+08:00 DEBUG 198162 --- [ main] o.s.r.backoff.ExponentialBackOffPolicy : Sleeping for 1000
2024-11-22T20:14:56.840+08:00 DEBUG 198162 --- [ main] o.s.retry.support.RetryTemplate : Checking for rethrow: count=1
2024-11-22T20:14:56.840+08:00 DEBUG 198162 --- [ main] o.s.retry.support.RetryTemplate : Retry: count=1
2024-11-22T20:14:56.840+08:00 INFO 198162 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2024-11-22T20:14:56.841+08:00 DEBUG 198162 --- [ main] o.s.r.backoff.ExponentialBackOffPolicy : Sleeping for 1000
2024-11-22T20:14:57.841+08:00 DEBUG 198162 --- [ main] o.s.retry.support.RetryTemplate : Checking for rethrow: count=2
2024-11-22T20:14:57.841+08:00 DEBUG 198162 --- [ main] o.s.retry.support.RetryTemplate : Retry: count=2
2024-11-22T20:14:57.841+08:00 INFO 198162 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2024-11-22T20:14:57.842+08:00 DEBUG 198162 --- [ main] o.s.retry.support.RetryTemplate : Checking for rethrow: count=3
2024-11-22T20:14:57.842+08:00 DEBUG 198162 --- [ main] o.s.retry.support.RetryTemplate : Retry failed last attempt: count=3
Retry failed last attempt: count=3 总共尝试了3次
注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
生产者确认机制
一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。
不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:
-
MQ内部处理消息的进程发生了异常
-
生产者发送消息到达MQ后未找到
Exchange
-
生产者发送消息到达MQ的
Exchange
后,未找到合适的Queue
,因此无法路由
针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm
和Publisher Return
两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。
具体如图所示:
总结如下:
-
当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
-
临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
-
持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
-
其它情况都会返回NACK,告知投递失败
其中ack
和nack
属于Publisher Confirm机制,ack
是投递成功;nack
是投递失败。而return
则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。
实现生产者确认
开启生产者确认
在publisher模块的application.yaml
中添加配置:
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制
这里publisher-confirm-type
有三种模式可选:
-
none
:关闭confirm机制 -
simple
:同步阻塞等待MQ的回执 -
correlated
:MQ异步回调返回回执
一般我们推荐使用correlated
,回调机制。
定义ReturnCallback
RabbitMQ 的 Return Callback 用于处理消息在投递到交换机后,因未找到匹配的队列而返回的情况。
每个RabbitTemplate
只能配置一个ReturnCallback
,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:
内容如下:
import jakarta.annotation.PostConstruct;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}
定义ConfirmCallback
由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
这里的CorrelationData中包含两个核心的东西:
-
id
:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆 -
SettableListenableFuture
:回执结果的Future对象
将来MQ的回执就会通过这个Future
来返回,我们可以提前给CorrelationData
中的Future
添加回调函数来处理消息回执:
我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback
:
- 创建一个对象CorrelationData
- 调用异步方法getFuture()
@Test
void testPublisherConfirm() {
// 1. 创建 CorrelationData 对象
CorrelationData correlationData = new CorrelationData("12345");
// 2. 添加 ConfirmCallback
correlationData.getFuture().whenComplete((result, ex) -> {
if (ex != null) {
// Future 发生异常时的处理逻辑
log.error("消息发送失败,异常信息:", ex);
} else if (result.isAck()) {
// Future 接收到 ACK 回执
log.info("消息发送成功,收到 ACK");
} else {
// Future 接收到 NACK 回执
log.error("消息发送失败,收到 NACK,原因:{}", result.getReason());
}
});
// 3. 发送消息
rabbitTemplate.convertAndSend("Spike.exchage", "x", "Hello, RabbitMQ!", correlationData);
log.info("消息已发送,等待回执...");
}
- 队列错误的日志
2024-11-22T20:46:50.101+08:00 DEBUG 216383 --- [ main] o.s.retry.support.RetryTemplate : Retry: count=0
2024-11-22T20:46:50.104+08:00 INFO 216383 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2024-11-22T20:46:50.132+08:00 INFO 216383 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2503ec73:0/SimpleConnection@53982523 [delegate=amqp://Spike@127.0.0.1:5672//David, localPort=54554]
2024-11-22T20:46:50.134+08:00 DEBUG 216383 --- [ main] o.s.retry.support.RetryTemplate : Retry: count=0
2024-11-22T20:46:50.134+08:00 DEBUG 216383 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Initializing declarations
2024-11-22T20:46:50.135+08:00 DEBUG 216383 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Nothing to declare
2024-11-22T20:46:50.143+08:00 DEBUG 216383 --- [ main] o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from PublisherCallbackChannelImpl: AMQChannel(amqp://Spike@127.0.0.1:5672//David,1)
2024-11-22T20:46:50.154+08:00 DEBUG 216383 --- [ main] o.s.a.r.c.PublisherCallbackChannelImpl : Added listener org.springframework.amqp.rabbit.core.RabbitTemplate@2f0ccb34
2024-11-22T20:46:50.155+08:00 DEBUG 216383 --- [ main] o.s.amqp.rabbit.core.RabbitTemplate : Added publisher confirm channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://Spike@127.0.0.1:5672//David,1), conn: Proxy@5fb5ad40 Shared Rabbit Connection: SimpleConnection@53982523 [delegate=amqp://Spike@127.0.0.1:5672//David, localPort=54554] to map, size now 1
2024-11-22T20:46:50.155+08:00 DEBUG 216383 --- [ main] o.s.amqp.rabbit.core.RabbitTemplate : Executing callback RabbitTemplate$$Lambda$1056/0x00007574185952b0 on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://Spike@127.0.0.1:5672//David,1), conn: Proxy@5fb5ad40 Shared Rabbit Connection: SimpleConnection@53982523 [delegate=amqp://Spike@127.0.0.1:5672//David, localPort=54554]
2024-11-22T20:46:50.175+08:00 DEBUG 216383 --- [ main] o.s.amqp.rabbit.core.RabbitTemplate : Publishing message [(Body:'"Hello, RabbitMQ!"' MessageProperties [headers={spring_listener_return_correlation=c2117d2b-6d87-43c3-8e08-f5fe3afb3c28, spring_returned_message_correlation=12345, __TypeId__=java.lang.String}, messageId=e48ea850-2798-4cb5-bd3d-363eb9a85b14, contentType=application/json, contentEncoding=UTF-8, contentLength=18, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [Spike.exchage], routingKey = [x]
2024-11-22T20:46:50.187+08:00 DEBUG 216383 --- [ 127.0.0.1:5672] o.s.a.r.c.PublisherCallbackChannelImpl : Return PublisherCallbackChannelImpl: AMQChannel(amqp://Spike@127.0.0.1:5672//David,1)
2024-11-22T20:46:50.188+08:00 INFO 216383 --- [ main] org.example.publisher.SpringAMPtest : 消息已发送,等待回执...
2024-11-22T20:46:50.188+08:00 ERROR 216383 --- [nectionFactory1] org.example.publisher.config.MqConfig : 触发return callback,
2024-11-22T20:46:50.191+08:00 DEBUG 216383 --- [ 127.0.0.1:5672] o.s.a.r.c.PublisherCallbackChannelImpl : PublisherCallbackChannelImpl: AMQChannel(amqp://Spike@127.0.0.1:5672//David,1) PC:Ack:1:false
2024-11-22T20:46:50.192+08:00 INFO 216383 --- [ 127.0.0.1:5672] org.example.publisher.SpringAMPtest : 消息发送成功,收到 ACK
可以看到,由于传递的RoutingKey
是错误的,路由失败后,触发了return callback
,同时也收到了ACK。
- 队列/交换机正确日志
2024-11-22T20:41:16.057+08:00 DEBUG 213009 --- [ main] o.s.retry.support.RetryTemplate : Retry: count=0
2024-11-22T20:41:16.059+08:00 INFO 213009 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2024-11-22T20:41:16.083+08:00 INFO 213009 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#95eb320:0/SimpleConnection@7ac058a0 [delegate=amqp://Spike@127.0.0.1:5672//David, localPort=52584]
2024-11-22T20:41:16.084+08:00 DEBUG 213009 --- [ main] o.s.retry.support.RetryTemplate : Retry: count=0
2024-11-22T20:41:16.084+08:00 DEBUG 213009 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Initializing declarations
2024-11-22T20:41:16.085+08:00 DEBUG 213009 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Nothing to declare
2024-11-22T20:41:16.093+08:00 DEBUG 213009 --- [ main] o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from PublisherCallbackChannelImpl: AMQChannel(amqp://Spike@127.0.0.1:5672//David,1)
2024-11-22T20:41:16.100+08:00 DEBUG 213009 --- [ main] o.s.a.r.c.PublisherCallbackChannelImpl : Added listener org.springframework.amqp.rabbit.core.RabbitTemplate@cbf1997
2024-11-22T20:41:16.101+08:00 DEBUG 213009 --- [ main] o.s.amqp.rabbit.core.RabbitTemplate : Added publisher confirm channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://Spike@127.0.0.1:5672//David,1), conn: Proxy@2becfd4c Shared Rabbit Connection: SimpleConnection@7ac058a0 [delegate=amqp://Spike@127.0.0.1:5672//David, localPort=52584] to map, size now 1
2024-11-22T20:41:16.101+08:00 DEBUG 213009 --- [ main] o.s.amqp.rabbit.core.RabbitTemplate : Executing callback RabbitTemplate$$Lambda$1056/0x000078d76c594158 on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://Spike@127.0.0.1:5672//David,1), conn: Proxy@2becfd4c Shared Rabbit Connection: SimpleConnection@7ac058a0 [delegate=amqp://Spike@127.0.0.1:5672//David, localPort=52584]
2024-11-22T20:41:16.111+08:00 DEBUG 213009 --- [ main] o.s.amqp.rabbit.core.RabbitTemplate : Publishing message [(Body:'"Hello, RabbitMQ!"' MessageProperties [headers={spring_listener_return_correlation=81e21b29-ab6d-41f0-aa93-0ba6fd9a2f7c, spring_returned_message_correlation=12345, __TypeId__=java.lang.String}, messageId=13d73520-94a9-4161-9ea1-d5b4672472ad, contentType=application/json, contentEncoding=UTF-8, contentLength=18, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [Spike.exchage], routingKey = [blue]
2024-11-22T20:41:16.119+08:00 DEBUG 213009 --- [ 127.0.0.1:5672] o.s.a.r.c.PublisherCallbackChannelImpl : PublisherCallbackChannelImpl: AMQChannel(amqp://Spike@127.0.0.1:5672//David,1) PC:Ack:1:false
2024-11-22T20:41:16.123+08:00 INFO 213009 --- [ main] org.example.publisher.SpringAMPtest : 消息已发送,等待回执...
2024-11-22T20:41:16.123+08:00 INFO 213009 --- [ 127.0.0.1:5672] org.example.publisher.SpringAMPtest : 消息发送成功,收到 ACK
当我们修改为正确的RoutingKey
以后,就不会触发return callback
了,只收到ACK。
- 交换机错误日志
2024-11-22T21:03:21.696+08:00 DEBUG 225130 --- [ main] o.s.retry.support.RetryTemplate : Retry: count=0
2024-11-22T21:03:21.698+08:00 INFO 225130 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2024-11-22T21:03:21.724+08:00 INFO 225130 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#7c2dfa2:0/SimpleConnection@2315052d [delegate=amqp://Spike@127.0.0.1:5672//David, localPort=36570]
2024-11-22T21:03:21.725+08:00 DEBUG 225130 --- [ main] o.s.retry.support.RetryTemplate : Retry: count=0
2024-11-22T21:03:21.725+08:00 DEBUG 225130 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Initializing declarations
2024-11-22T21:03:21.726+08:00 DEBUG 225130 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Nothing to declare
2024-11-22T21:03:21.733+08:00 DEBUG 225130 --- [ main] o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from PublisherCallbackChannelImpl: AMQChannel(amqp://Spike@127.0.0.1:5672//David,1)
2024-11-22T21:03:21.743+08:00 DEBUG 225130 --- [ main] o.s.a.r.c.PublisherCallbackChannelImpl : Added listener org.springframework.amqp.rabbit.core.RabbitTemplate@51e8d066
2024-11-22T21:03:21.744+08:00 DEBUG 225130 --- [ main] o.s.amqp.rabbit.core.RabbitTemplate : Added publisher confirm channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://Spike@127.0.0.1:5672//David,1), conn: Proxy@50cbcca7 Shared Rabbit Connection: SimpleConnection@2315052d [delegate=amqp://Spike@127.0.0.1:5672//David, localPort=36570] to map, size now 1
2024-11-22T21:03:21.744+08:00 DEBUG 225130 --- [ main] o.s.amqp.rabbit.core.RabbitTemplate : Executing callback RabbitTemplate$$Lambda$1056/0x00007e0bbc594bf8 on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://Spike@127.0.0.1:5672//David,1), conn: Proxy@50cbcca7 Shared Rabbit Connection: SimpleConnection@2315052d [delegate=amqp://Spike@127.0.0.1:5672//David, localPort=36570]
2024-11-22T21:03:21.758+08:00 DEBUG 225130 --- [ main] o.s.amqp.rabbit.core.RabbitTemplate : Publishing message [(Body:'"Hello, RabbitMQ!"' MessageProperties [headers={spring_listener_return_correlation=caa034df-45f5-41b9-8b0a-c5854b0138c5, spring_returned_message_correlation=12345, __TypeId__=java.lang.String}, messageId=74393c11-0dcd-473f-bb94-53d5d42ce70b, contentType=application/json, contentEncoding=UTF-8, contentLength=18, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [Spike.exchage.x], routingKey = [x]
2024-11-22T21:03:21.776+08:00 DEBUG 225130 --- [ main] o.s.a.r.c.PublisherCallbackChannelImpl : Closing AMQChannel(amqp://Spike@127.0.0.1:5672//David,1)
2024-11-22T21:03:21.777+08:00 ERROR 225130 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'Spike.exchage.x' in vhost '/David', class-id=60, method-id=40)
2024-11-22T21:03:21.777+08:00 INFO 225130 --- [ main] org.example.publisher.SpringAMPtest : 消息已发送,等待回执...
2024-11-22T21:03:21.777+08:00 DEBUG 225130 --- [nectionFactory1] o.s.a.r.c.PublisherCallbackChannelImpl : PublisherCallbackChannelImpl: AMQChannel(amqp://Spike@127.0.0.1:5672//David,1) PC:Nack:(close):1
2024-11-22T21:03:21.778+08:00 ERROR 225130 --- [nectionFactory1] org.example.publisher.SpringAMPtest : 消息发送失败,收到 NACK,原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'Spike.exchage.x' in vhost '/David', class-id=60, method-id=40)
2024-11-22T21:03:21.783+08:00 DEBUG 225130 --- [nectionFactory1] o.s.amqp.rabbit.core.RabbitTemplate : Removed publisher confirm channel: PublisherCallbackChannelImpl: AMQChannel(amqp://Spike@127.0.0.1:5672//David,1) from map, size now 0
2024-11-22T21:03:21.783+08:00 DEBUG 225130 --- [nectionFactory1] o.s.a.r.c.PublisherCallbackChannelImpl : PendingConfirms cleared
2024-11-22T21:03:21.783+08:00 DEBUG 225130 --- [nectionFactory2] o.s.a.r.c.PublisherCallbackChannelImpl : PendingConfirms cleared
而如果连交换机都是错误的,则只会收到NACK。
注意:
开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:
- 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
- 交换机名称错误:同样是编程错误导致
- MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。