生产者发送消息之后,到达消费端之后,可能会有以下情况:一种是消息处理成功,一种是消息处理异常.
autoAck参数为true.RabbitMQ的队列会把发送出去的消息自动置为确认,然后自动从内存或者硬盘中删除,而不管消费者是否真正地消费到了这些消息.autoAck参数为false.RabbitMQ的队列会等待消费者显示调用Basic.Ack命令,回复确认信号之后才从内存或者硬盘中进行删除.消费者在收到消息之后,可以选择确认,也可以选择直接拒绝或者跳过,RabbitMQ也提供了不同的确认应答的方式.一共有以下三种:
下面我们通过SpringBoot来演示消息确认的机制.
Spring-AMQP对消息确认的机制提供了三种策略.
public enum AcknowledgeMode {
NONE,
MANUAL,
AUTO;
}
basicAck方法来确认消息,如果消息未被确认spring:
application:
name: rabbitmq-spring
rabbitmq:
host: 39.105.137.
port: 5672
username: jiangruijia
password: ******
virtual-host: /
listener:
simple:
acknowledge-mode: none
public static final String ACK_QUEUE = "ack_queue";
public static final String ACK_EXCHANGE = "ack_exchange";
@Bean
public DirectExchange ackExchange(){
return ExchangeBuilder.directExchange(Constant.ACK_EXCHANGE).durable(true).build();
}
@Bean
public Queue ackQueue(){
return QueueBuilder.durable(Constant.ACK_QUEUE).build();
}
@Bean
public Binding ackBinding(@Qualifier("ackExchange") DirectExchange exchange,@Qualifier("ackQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("ack");
}
通过接口发送消息:
@RequestMapping("/ack")
public String ack(){
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE,"ack","Ack消息");
return "发送成功";
}
spring:
application:
name: rabbitmq-spring
rabbitmq:
host: 182.92.204.253
port: 5672
username: jiangruijia
password: *****
virtual-host: /
listener:
simple:
acknowledge-mode: auto
listener:
simple:
acknowledge-mode: manual
@Component
public class AckListener {
@RabbitListener(queues = Constant.ACK_QUEUE)
public void listener(Message message, Channel channel) throws IOException{
System.out.printf("接收到消息:%s,消息tag:%d\n",new String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());
System.out.println("处理消息");
// int i = 3/0;//消息处理异常
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
调用接口发送消息:
我们看到运行结果是正常的.队列中的消息成功被消费.
接下来我们把异常放开:
@Component
public class AckListener {
@RabbitListener(queues = Constant.ACK_QUEUE)
public void listener(Message message, Channel channel) throws IOException{
System.out.printf("接收到消息:%s,消息tag:%d\n",new String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());
System.out.println("处理消息");
int i = 3/0;//消息处理异常
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
我们发现队列中的消息发生了积压,消息直接被退回的队列,控制台抛出了异常,消息没有被处理成功.
之后我们可以把代码改为出现异常的时候拒绝接收,其中basicReject的requeue属性配置为true,让消息拒绝之后重新入队.
@Component
public class AckListener {
@RabbitListener(queues = Constant.ACK_QUEUE)
public void listener(Message message, Channel channel) throws IOException{
System.out.printf("接收到消息:%s,消息tag:%d\n",new String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());
try {
System.out.println("处理消息");
Thread.sleep(1000);
int i = 3/0;//消息处理异常
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}catch (Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
运行结果: 消费异常会被捕捉,拒绝接收回到队列之后,会不停进行重试.
这里与直接发生异常回到队列的结果不一样,发生异常回到队列不会反复重新处理,如果捕捉到异常之后拒绝接收,后续还是会不停地重新处理,与atuo的结果差不多.
前面的消息确认机制是保证了消息从队列中到达消费者的过程中不发生丢失.但是如果RabbitMQ由于某种异常情况以外退出或者崩溃,交换机,队列或者消息可能会发生丢失.
RabbitMQ中的持久化分为三个部分: 交换机持久化,队列持久化,消息持久化.
我们之前在进行RabbitMQ配置的时候,就曾经反复使用交换机的持久化.他是在声明交换机的时候,通过durable方法的参数设置为true来实现的.
相当于将交换机的属性在服务器内部保存,当MQ的服务器发生意外或关闭之后,重启 RabbitMQ 时不需要重新去建立交换机,交换机会自动建立,相当于一直存在.
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange(Constant.DIRECT_EXCHANGE).durable(true).build();
}
和交换机持久化一样,我们在之前也一直使用queue的持久化.持久化是通过在声明队列的时候,使用durable方法把其中的属性设置为队列名来实现的.
public Queue directQueue1(){
return QueueBuilder.durable(Constant.DIRECT_QUEUE1).build();
}
当我们在查看队列持久化的源码的时候,我们发现队列的durable属性默认为true.
public static QueueBuilder durable(String name) {
return (new QueueBuilder(name)).setDurable();
}
private QueueBuilder setDurable() {
this.durable = true;
return this;
}
我们也可以使用nonDurable方法来创建非持久化的队列.
public Queue directQueue1(){
return QueueBuilder.nonDurable(Constant.DIRECT_QUEUE1).build();
}
消息实现持久化,需要把消息的投递模式设置为2(也就是MessageProperties中的deliveryMode设置为MessageDeliveryMode.PERSISTENT),MessageDeliveryMode为一个枚举类.
public enum MessageDeliveryMode {
NON_PERSISTENT,//⾮持久化
PERSISTENT;//持久化
只有我们同时设置了队列和消息的持久化,才可以保证RabbitMQ服务在重启之后,消息依然是存在的.如果只是设置了队列的持久化或者是消息的持久化,重启之后消息依然会丢失.
接下来我们让生产者来发送一条持久化的消息:
交换机和队列的配置还是我们之前的配置
@RequestMapping("/ack")
public String ack(){
String s = "hello ack";
Message message = new Message(s.getBytes(StandardCharsets.UTF_8),new MessageProperties());
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE,"ack",message);
return "发送成功";
}
在默认的条件下,消息是持久化,除非队列被声明为非持久化或者是在发送消息的时候消息被标记为持久化.
如果所有的消息都被标记为了持久化,会严重影响RabbitMQ的性能.这是因为写硬盘会拖慢速度.对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量.在选择是否要将消息持久化时,需要在可靠性和吐吞量之间做⼀个权衡.
如果把交换机,队列,消息都设置为持久化,就可以保证百分之百数据不丢失了吗?答案是不是的.
在使用RabbitMQ的时候,可以通过持久化来解决RabbitMQ宕机而导致的消息丢失的问题,如果有一种情况,消息在到达服务器之前就已经发生了丢失,这时候消息根本没有到达RabbitMQ,持久化也解决不了这个问题.
RabbitMQ为我们提供了两种方案,一种是事务机制,一种是发送方确认机制.
由于事务机制比较消耗性能,一般情况下,我们不会使用事务.我们主要介绍发送方确认的机制.
RabbitMQ为我们提供了两个方式来控制消息可靠性的投递:
rabbitmq:
host: 182.92.204.253
port: 5672
username: jiangruijia
password: ******
virtual-host: /
listener:
simple:
acknowledge-mode: manual
publisher-confirm-type: correlated #消息发送确认
ConfirmCallback中的confirm方法.如果消息成功发送到Broker,则ack为true.如果发送失败,ack为false,并且cause提供失败的原因.setConfirmCallback来设置发送方确认,其中使用匿名内部类的方式重写confirm方法,在其中编写确认成功和确认失败的逻辑.@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {//设置消息队列的确认机制
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){//如果确认成功,ack为true
System.out.println("发送方确认成功:"+correlationData.getId());//correlationData中包含消息的id
}else {
System.out.println("发送方确认失败:"+correlationData.getId()+",原因:"+cause);
}
}
});
return rabbitTemplate;
}
confirm中的三个参数的意思分别是:
- correlationData:消息发送时候的一些附加信息,其中包含一个id属性,通常用于在确认回调中识别特定的消息.
- ack: 交换机是否收到消息发送方的信息,收到为true,未收到为false.
- cause: 当消息确认失败的时候,这个字符串会提供失败的原因.
注意: 在我们在SpringIoC容器中自定义一个RabbitTemplate的Bean对象的时候,在我们对RabbitTemplate对象进行DI注入的时候,由于Spring的Bean的优先级管理机制,Spring不再会调用RabbitMQ原生的Bean对象,而是调用我们自定义的Bean对象.
配置交换机和队列
@Bean
public DirectExchange confirmExchange(){
return ExchangeBuilder.directExchange(Constant.CONFIRM_EXCHANGE).durable(true).build();
}
@Bean
public Queue confirmQueue(){
return QueueBuilder.durable(Constant.CONFIRM_QUEUE).build();
}
@Bean
public Binding confirmBinding(@Qualifier("confirmExchange") DirectExchange exchange,@Qualifier("confirmQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("confirm");
}
之后在生产者使用@Resource注解注入我们写好的具有发送者确认机制的RabbitTemplate.之后进行消息发送.
@Resource
private RabbitTemplate confirmRabbitTemplate;
@RequestMapping("/confirm")
public String confirm(){
CorrelationData correlationData = new CorrelationData("1");//指定消息id,用于生产者进行消息确认
confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE,"confirm","confirm消息",correlationData);
return "发送成功";
}
在发送消息的时候.我们需要指定消息的correlationData中的id,这个id可以让生产者进行对应消息的确认.
我们在之前的文章中也提到过发布确认模式,其中包含了一个确认,叫做ConfirmListener接口,其中提供了handleAck和handleNack,一个用于处理消息确认成功时候的业务逻辑,一个是消息否定确认时候的业务逻辑.和ConfirmCallback.confirm方法中的ack参数类似.
3. 测试
我们使用Postman对接口进行调用,观察控制台.
我们看到发送方成功确认了消息的接收,说明了消息已经成功到达了交换机.
如果我们把交换机的名称改掉.
@RequestMapping("/confirm")
public String confirm(){
CorrelationData correlationData = new CorrelationData("1");//指定消息id,用于生产者进行消息确认
confirmRabbitTemplate.convertAndSend("Constant.CONFIRM_EXCHANGE","confirm","confirm消息",correlationData);
return "发送成功";
}
我们看到发送方消息确认失败了,说明消息没有正确地到达交换机.
消息到达exchange之后,会根据路由规则进行匹配,把消息放入Queue中.Exchange到Queue的过程,如果一条消息服务被任何队列消费,可以选择把消息发回给生产者,我们可以设置一个一个返回回调方法,对消息进行处理.
@Bean
public RabbitTemplate returnRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("消息被退回:"+returned);
}
});
return rabbitTemplate;
}
这里在使用RabbitTemplate中的setMandatory方法的时候,如果设置为true,这个属性就是告诉Broker,如果一条消息没有被任何一条队列消费,那么就触发ReturnsCallback.
其中ReturnedMessage中有以下属性:
public class ReturnedMessage {
private final Message message;
private final int replyCode;
private final String replyText;
private final String exchange;
private final String routingKey;
message表示返回消息的对象,包含了消息体和消息的属性.
replyCode表示Broker提供的回复码,表示消息无法路由的原因,有点类似与错误码.
replyText表示无法路由消息的额外信息和错误描述.
exchange,routingKey分别表示交换机的名称和路由键.
发送消息
@Resource
private RabbitTemplate returnRabbitTemplate;
@RequestMapping("/return")
public String confirmReturn(){
CorrelationData correlationData = new CorrelationData("2");
returnRabbitTemplate.convertAndSend(Constant.RETURN_EXCHANGE,"return","return消息",correlationData);
return "发送成功";
}
@RequestMapping("/return")
public String confirmReturn(){
CorrelationData correlationData = new CorrelationData("2");
returnRabbitTemplate.convertAndSend(Constant.RETURN_EXCHANGE,"return1","return消息",correlationData);
return "发送成功";
}
我们再次进行测试,发现消息被回退:
这一个板块,会涉及到一个非常常见的面试题.就是如何保证RabbitMQ消息的可靠性.
我们可以根据消息可能丢失的场景来解决:
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- baijiahaobaidu.com 版权所有 湘ICP备2023023988号-9
违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务