在消息传递的过程中,可能会遇到各种问题,比如网络故障,服务不可用,资源不足等,这些问题都可能会导致消息处理失败.为了解决这些问题,RabbitMQ提供了重试机制,允许消息在处理失败之后重新发送.
我们也可以对重试机制设置重试次数.超过了重试的次数,如果没有对队列进行死信队列的绑定,那么该消息就会发生丢失.
rabbitmq:
host: 182.92.204.253
port: 5672
username: jiangruijia
password: *****
virtual-host: /
listener:
simple:
acknowledge-mode: auto
retry:
initial-interval: 5000ms
enabled: true
max-attempts: 5
public static final String RETRY_QUEUE = "retry_queue";
public static final String RETRY_EXCHANGE = "retry_exchange";
@Bean
public DirectExchange retryExchange(){
return ExchangeBuilder.directExchange(Constant.RETRY_EXCHANGE).durable(true).build();
}
@Bean
public Queue retryQueue(){
return QueueBuilder.durable(Constant.RETRY_QUEUE).build();
}
@Bean
public Binding retryBinding(@Qualifier("retryExchange") DirectExchange exchange,@Qualifier("retryQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("retry");
}
@RequestMapping("/retry")
public String retry(){
rabbitTemplate.convertAndSend(Constant.RETRY_EXCHANGE,"retry","retry消息");
return "发送成功";
}
@Component
public class RetryListener {
@RabbitListener(queues = Constant.RETRY_QUEUE)
public void retryListener(Message message) throws UnsupportedEncodingException {
System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(),"UTF-8")
,message.getMessageProperties().getDeliveryTag());
int i = 3/0;
System.out.println("处理完成");
}
}
使用Postman调用接口,观察控制台信息
@Component
public class RetryListener {
@RabbitListener(queues = Constant.RETRY_QUEUE)
public void retryListener(Message message) throws UnsupportedEncodingException {
System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(),"UTF-8")
,message.getMessageProperties().getDeliveryTag());
try {
int i = 3/0;
}catch (Exception e){
System.out.println("处理失败");
}
System.out.println("处理完成");
}
}
如果我们把确认接收机制改为手动确认机制
@Component
public class RetryListener {
@RabbitListener(queues = Constant.RETRY_QUEUE)
public void retryListener(Message message, Channel channel) throws IOException {
System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(),"UTF-8")
,message.getMessageProperties().getDeliveryTag());
try {
Thread.sleep(1000);
int i = 3/0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
System.out.println("处理失败");
channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);
}
System.out.println("处理完成");
}
}
运行结果:
TTL,即过期时间,RabbitMQ可以对消息和队列设置TTL.
当消息到达存活时间之后,还没有被消费,就会被自动清除.
目前有两种方法可以设置消息的TTL.一是设置队列的TTL,队列中所有消息都有相同的过期时间,二是设置每条消息的过期时间,每条消息都可以有不同的TTL.如果两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准.
首先我们针对每条消息设置TTL.方法就是我们在发送消息的时候对消息的expiretion参数进行设置,单位为毫秒.
我们先来配置交换机和队列.
public static final String TTL_QUEUE = "ttl_queue";
public static final String TTL_EXCHANGE = "ttl_exchange";
@Bean
public DirectExchange ttlExchange(){
return ExchangeBuilder.directExchange(Constant.TTL_EXCHANGE).durable(true).build();
}
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable(Constant.TTL_QUEUE).build();
}
@Bean
public Binding ttlBinding(@Qualifier("ttlExchange") DirectExchange exchange,@Qualifier("ttlQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("ttl");
}
发送消息
@RequestMapping("/ttl")
public String ttl(){
String msg = "ttl消息";
Message message = new Message(msg.getBytes(StandardCharsets.UTF_8));
message.getMessageProperties().setExpiration("10000");//设置消息过期时间为10s
rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl",message);
return "发送信息";
}
运行程序,观察结果:
设置队列TTL的方法是在创建队列时,加入x-message-ttl参数实现,单位为毫秒.
设置队列和绑定关系
@Bean
public Queue ttlQueue2(){
Map<String,Object> map = new HashMap<>();
map.put("x-message-ttl",10000);//设置10s过期
return QueueBuilder.durable(Constant.TTL_QUEUE2).withArguments(map).build();
}
@Bean
public Binding ttl2Binding(@Qualifier("ttlExchange") DirectExchange exchange,@Qualifier("ttlQueue2") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("ttl2");
}
队列的声明也可以简写为:
@Bean
public Queue ttlQueue2(){
return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(10000).build();
}
发送消息:
@RequestMapping("/ttl2")
public String ttl2(){
rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl2","ttl2消息");
return "发送成功";
}
运行程序之后观察结果:
运行之后发现,新增了一个队列,队列有一个TTL标识.
设置队列的ttl属性方法,一旦消息过期,就会立即从队列中删除.如果消息设置ttl,即使消息过期,也不会立即从队列中删除,而是在即将投递到消费者之前进行判定,如果过期了,才进行删除.
这两种方法处理过期消息的原理如下:
设置队列的过期时间,队列中已经过期的消息肯定在队列头部,RabbitMQ只需要定期扫描队头是否有过期的消息即可.
而设置消息的TTL的方式,每条消息的过期时间不同,如果定期扫描整个队列,效率是非常低的,所以不如等到此消息即将被消费时再判定是否过期.
死信简单理解就是因为种种原因,无法被消费的信息,就是死信.
有死信,就会有死信队列,当一个消息在一个队列中变为死信之后,它能被重新被发送到另一个交换机中,这个交换机就是DLX(Dead Letter Exchange),绑定DLX的队列,就被称为死信队列(DLQ,Dead Letter Queue).
消息变为死信一般是由于一下的几种情况:
交换机和队列的声明包含两部分:
public static final String NORMAL_QUEUE = "normal_queue";
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DLX_EXCHANGE = "dlx_exchange";
public static final String DLX_QUEUE = "dlx_queue";
@Bean
public DirectExchange normalExchange(){
return ExchangeBuilder.directExchange(Constant.NORMAL_EXCHANGE).durable(true).build();
}
@Bean
public Queue normalQueue(){
return QueueBuilder.durable(Constant.NORMAL_QUEUE).build();
}
@Bean
public Binding normalBinding(@Qualifier("normalExchange") DirectExchange exchange,@Qualifier("normalQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("normal");
}
@Bean
public DirectExchange dlxExchange(){
return ExchangeBuilder.directExchange(Constant.DLX_EXCHANGE).durable(true).build();
}
@Bean
public Queue dlxQueue(){
return QueueBuilder.durable(Constant.DLX_QUEUE).build();
}
@Bean
public Binding dlxBinding(@Qualifier("dlxExchange") DirectExchange exchange,@Qualifier("dlxQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("dlx");
}
当一个队列中存在死信的时候,RabbitMQ会自动把这个消息重新发布到设置的DLX交换机上,进而被路由到另一个队列,即死信队列.
绑定的时候需要为普通队列设置x-dead-letter-exchange和x-dead-letter-routing-key两个参数,第一个参数是绑定的死信交换机,第二个参数是死信队列的路由键.
@Bean
public Queue normalQueue(){
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",Constant.DLX_EXCHANGE);
map.put("x-dead-letter-routing-key","dlx");
return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(map).build();
}
上面的map可以简写为:
@Bean
public Queue normalQueue(){
return QueueBuilder.durable(Constant.NORMAL_QUEUE).
deadLetterExchange(Constant.DLX_EXCHANGE).
deadLetterRoutingKey("dlx").
build();
}
我们通过设置队列长度或者是设置消息的过期时间来制造死信.如果队列中的消息过期,或者是队列中的消息超过队列的长度,全部会被路由到死信队列.
@Bean
public Queue normalQueue(){
return QueueBuilder.durable(Constant.NORMAL_QUEUE).
deadLetterExchange(Constant.DLX_EXCHANGE).
deadLetterRoutingKey("dlx").
ttl(10000).
maxLength(10L).
build();
}
@RequestMapping("/normal")
public String normal(){
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","normal消息");
return "发送成功";
}
normal_queue有5个标签,则会五个标签分别代表该队列的5个属性:
@Component
public class NormalListener {
@RabbitListener(queues = Constant.NORMAL_QUEUE)
public void normalListener(Message message, Channel channel) throws IOException, InterruptedException {
System.out.printf("接收到消息%s,tag:%d\n",new String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());
try {
System.out.println("处理消息");
int i = 3/0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}catch (Exception e){
System.out.println("消息处理失败");
Thread.sleep(1000);
//拒绝接收之后不放回原队列中,则会放入死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
}
}
}
@Component
public class DLXListener {
@RabbitListener(queues = Constant.DLX_QUEUE)
public void dlxListener(Message message) throws UnsupportedEncodingException {
System.out.printf("死信队列接收到消息:%s,tag:%d\n",new String(message.getBody(),"UTF-8")
,message.getMessageProperties().getDeliveryTag());
}
}
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- baijiahaobaidu.com 版权所有 湘ICP备2023023988号-9
违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务