在发送消息的时候,生产者设置了RoutingKey为orange,消息就会路由到Q1.
BindingKey其实也属于RoutingKey的一种,我们为了把这两个键混淆,我们可以这样理解:
在交换机和队列绑定的时候,需要的路由键是BindingKey.在发送消息的时候,需要的路由键是RoutingKey.
比如国家地震局发送地震预警,地震发生的时候,需要把预警消息发送给可能有震感地区的所有电子设备.
路由模式是发布订阅模式的变种,在发布订阅模式的基础上,增加了路由key.
相比广播模式,路由模式是Exchange根据RoutingKey的规则,将数据筛选后发给对应的消费者队列.
适合场景: 需要根据特定的规则分发消息的场景.
比如我们在Spring中学习的日志,日志等级分为error,warning,info,debug,就可以通过这种模式,把不同的日志发送到不同的队列.
路由模式的升级版,在routingKey的基础上,增加了通配符的功能,使之更加灵活.其中,一个.是一个节,使用*代表的是一个节,使用#代表的是多个节.Topics和Routing的基本原理相同.不同之处是:routingKey的匹配方式不同,Routing模式是相等匹配,topics模式是通配符匹配.
适用场景: 需要灵活匹配和过滤消息的场景.
在RPC模式中没有生产者和消费者,大概就是通过两个队列实现了一个消息回调的过程.有点类似与我们在网络中学习的"请求和响应",这个功能是MQ的额外功能.
Publish Confirms模式是RabbitMQ提供的一种确保消息可靠发送到RabbitMQ服务器的机制.在这种模式之下,生产者可以等待RabbitMQ服务器确认,可以确保消息已经被服务器接收并处理.
前面我们对这几种工作模式有了简单的了解,接下来我们学习他们的写法.
就是快速上手中的程序,此处忽略.
就是简单模式的增强版,和简单模式下最大的区别就是,工作队列模式支持多个消费者接收消息,消费者之间是竞争关系,每个消息只能被⼀个消费者接收.
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
public class Constant {
public static String HOST = "39.105.137.";
public static int PORT = 5672;
public static String USER_NAME = "jiangruijia";
public static String PASSWORD = "qwe123524";
public static String QUEUE_NAME = "work";
public static String VIRTUAL_HOST = "/";
}
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constant.QUEUE_NAME,true,false,false,null);
for (int i = 0; i < 10 ; i++){//发送10次消息
String message = "hello work~" + i;
channel.basicPublish("",Constant.QUEUE_NAME,null,message.getBytes());
}
channel.close();
connection.close();
}
}
public class Consumer2 {//两个消费者是竞争关系
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constant.QUEUE_NAME,true,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("接收到消息" + message);
}
};
channel.basicConsume(Constant.QUEUE_NAME,true,consumer);
//先不要关闭资源,因为需要先开启消费者,等待生产者发送消息
}
}
另一个消费者和这个消费者相同,直接复制粘贴一份.
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明交换机.
channel.exchangeDeclare(Constant.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true,false,false,null);
//声明队列
channel.queueDeclare(Constant.FANOUT_QUEUE1,true,false,false,null);
channel.queueDeclare(Constant.FANOUT_QUEUE2,true,false,false,null);
//绑定队列与交换机
channel.queueBind(Constant.FANOUT_QUEUE1,Constant.FANOUT_EXCHANGE,"");
channel.queueBind(Constant.FANOUT_QUEUE2,Constant.FANOUT_EXCHANGE,"");
String message = "发送广播消息";
channel.basicPublish(Constant.FANOUT_EXCHANGE,"",null,message.getBytes());
channel.close();
connection.close();
}
}
参数解释:
- exchangeDeclare: 第一个参数是交换机名称,第二个参数是交换机的路由规则(这里指定为FANOUT广播类型),第三个参数是是否持久化,如果设置持久化,那么在重启服务之后,交换机不会被释放,第四个参数是是否自动删除,当没有队列与其绑定的时候,它就会被删除.第五个参数是是否是内部使用的,一般情况下为false,第六个参数是指定相关参数.
- queueDeclare: 声明队列,我们在之前解释过,这里不再赘述.
- queueBind: 绑定队列与交换机,第一个参数是队列名称,第二个参数是交换机名称,第三个参数是交换机和队列之间的路由规则,在这里我们是广播模式,所以我们没有指定路由规则,指定为默认的"".
- basicPublish: 第一个参数是发送消息的交换机,第二个参数是发送消息时的路由关键字,这里为广播模式,所以我们指定为"",第三个参数是一些相关配置,第四个参数是发送的消息.
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//队列声明可以省略,如果队列已经存在,则不会创建队列
channel.queueDeclare(Constant.FANOUT_QUEUE1,true,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("获取到广播消息:"+message);
}
};
channel.basicConsume(Constant.FANOUT_QUEUE1,true,consumer);
}
}
第二个消费者直接复制一份,改掉消费者于队列的绑定和队列的声明即可.
相比于发布订阅模式,交换机和队列不可以是任意绑定了==,而是需要指定一个BindingKey(RoutingKey的一种)==.生产者在向交换机发送消息的时候,也需要指定RoutingKey.这时,Exchange也不再把消息交给每⼀个绑定的key,而是根据消息RoutingKey进行判断,只有队列绑定时的BindingKey和发送消息的RoutingKey完全⼀致,才会接收到消息.
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(Constant.ROUTING_EXCHANGE, BuiltinExchangeType.DIRECT,true,false,false,null);
channel.queueDeclare(Constant.ROUTING_QUEUE1,true,false,false,null);
channel.queueDeclare(Constant.ROUTING_QUEUE2,true,false,false,null);
channel.queueBind(Constant.ROUTING_QUEUE1,Constant.ROUTING_EXCHANGE,"a");
channel.queueBind(Constant.ROUTING_QUEUE2,Constant.ROUTING_EXCHANGE,"a");
channel.queueBind(Constant.ROUTING_QUEUE2,Constant.ROUTING_EXCHANGE,"b");
channel.queueBind(Constant.ROUTING_QUEUE2,Constant.ROUTING_EXCHANGE,"c");
String message1 = "routingKey_a";
String message2 = "routingKey_b";
String message3 = "routingKey_c";
channel.basicPublish(Constant.ROUTING_EXCHANGE,"a",null,message1.getBytes());
channel.basicPublish(Constant.ROUTING_EXCHANGE,"b",null,message2.getBytes());
channel.basicPublish(Constant.ROUTING_EXCHANGE,"c",null,message3.getBytes());
connection.close();
channel.close();
}
}
和上面广播模式不同的是,在绑定队列与交换机的时候,需要指定bindingKey.channel.queueBind(Constant.ROUTING_QUEUE1,Constant.ROUTING_EXCHANGE,"a");比如这一行指定了bindingKey为a.
在发送消息的时候,需要指定消息的RoutingKey,比如:channel.basicPublish(Constant.ROUTING_EXCHANGE,"a",null,message1.getBytes());.
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constant.ROUTING_QUEUE1,true,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("接收到消息:"+message);
}
};
channel.basicConsume(Constant.ROUTING_QUEUE1,true,consumer);
}
}
Topic和Routing模式的区别就是:
1. Topic模式使用的交换机类型为topic(Routing模式使⽤的交换机类型为direct).
2. Topic类型的交换机在匹配规则上进行了扩展,bindingKey支持通配符匹配.
在Topic类型的交换机在匹配的规则上,有一些要求:
1. RoutingKey是一系列由.分割的单词,比如"a.b.c".
2. BindingKey和RoutingKey一样,也是点.分割的字符串.
3. BindingKey中可以存在两种特殊字符串,用于模糊匹配.其中*表示一个单词,#表示多个单词.
比如:
• BindingKey为"d.a.b"会同时路由到Q1和Q2.
• BindingKey为"d.a.f"会路由到Q1.
• BindingKey为"c.e.f"会路由到Q2.
• BindingKey为"d.b.f"会被丢弃,或者返回给⽣产者(需要设置mandatory参数).
接下来我们就来实现Topic模式:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constant.TOPIC_QUEUE1,true,false,false,null);
channel.queueDeclare(Constant.TOPIC_QUEUE2,true,false,false,null);
channel.exchangeDeclare(Constant.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true,false,false,null);//交换机类型不同
channel.queueBind(Constant.TOPIC_QUEUE1,Constant.TOPIC_EXCHANGE,"*.a.*");
channel.queueBind(Constant.TOPIC_QUEUE2,Constant.TOPIC_EXCHANGE,"*.*.b");
channel.queueBind(Constant.TOPIC_QUEUE2,Constant.TOPIC_EXCHANGE,"c.#");//bindingKey不同
String message1 = "hello_b.a.c";
String message2 = "hello_a.c.b";
String message3 = "hello_c.a.b";
channel.basicPublish(Constant.TOPIC_EXCHANGE,"b.a.c",null,message1.getBytes());
channel.basicPublish(Constant.TOPIC_EXCHANGE,"c.a.b",null,message3.getBytes());
channel.basicPublish(Constant.TOPIC_EXCHANGE,"a.c.b",null,message2.getBytes());
channel.basicPublish(Constant.TOPIC_EXCHANGE,"c.a.b",null,message3.getBytes());
channel.close();
connection.close();
}
}
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constant.TOPIC_QUEUE1,true,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("收到消息: "+s);
}
};
channel.basicConsume(Constant.TOPIC_QUEUE1,true,consumer);
}
}
第二个消费者直接修改一下队列名称就可以.
RPC通信,是远程过程调用,它是一种发送请求,得到响应的模式,有点类似与我们之前学习网络时候的http协议.
RabbitMQ实现RPC通信的过程,大概是通过两个队列实现⼀个可回调的过程.
在这个模式中,没有明确的生产者和消费者,在发送请求的时候,客户端是生产者,服务端是消费者,在接收响应的时候,服务端是生产者,客户端是消费者.
大概的流程如下:
接下来我们来实现PCR模式:
public class Client {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constant.SRC_REQUEST_QUEUE,true,false,false,null);
channel.queueDeclare(Constant.SRC_RESPONSE_QUEUE,true,false,false,null);
//发送请求
String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.correlationId(corrId)
.replyTo(Constant.SRC_RESPONSE_QUEUE)//指定相关属性
.build();
String request = "发送请求";
//如果没有交换机的时候,RoutingKey就是队列的名称
channel.basicPublish("",Constant.SRC_REQUEST_QUEUE,properties,request.getBytes());
//接收请求
BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if(corrId.equals(properties.getCorrelationId())){//判断corrId是否相等
queue.offer(new String(body));
}
}
};
channel.basicConsume(Constant.SRC_RESPONSE_QUEUE,true,consumer);
String ret = queue.take();
System.out.println("收到请求:" + ret);
channel.close();
connection.close();
}
}
public class Server {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constant.SRC_REQUEST_QUEUE,true,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("成功接收到请求:"+new String(body));
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
String message = "返回响应";
channel.basicPublish("",Constant.SRC_RESPONSE_QUEUE,basicProperties,message.getBytes());
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(Constant.SRC_REQUEST_QUEUE,false,consumer);//设置收到消息之后不自动应答,在发送响应之后手动应答
}
}
RabbitMQ消息确定的机制:
在RabbitMQ中,basicConsumer方法的autoAck参数用于指定消费者是否应该自动向消息队列确认消息.
自动确认(autoAck=true):消息队列在将消息发送给消费者后,会立即从内存中删除该消息.这意味着,如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费
手动确认(autoAck=false):消息队列在将消息发送给消费者后,需要消费者显式地调用basicAck方法来确认消息.手动确认提供了更高的可靠性,确保消息不会被意外丢失,适用于消息处理重要且需要确保每个消息都被正确处理的场景.
作为消息中间件,都会面临丢失的问题.
消息丢失大概分为三种类型:
Channel channel = connection.createChannel();
channel.confirmSelect();//开启信道确认模式
首先我们需要建立连接,建立连接需要放入try语句中,所以我们可以把建立连接单独提出一个静态方法.
private static Connection createConnection() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
return connectionFactory.newConnection();
}
之后在主方法中调用三种策略,
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
publishingMessagesIndividually();
publishMessagesInBatch();
handlePublishConfirmsAsynchronously();
}
之后我们来编写单条确认模式:
/**
* 单条确认
*/
private static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {
try (Connection connection = createConnection()){//首先建立连接
Channel channel = connection.createChannel();
channel.confirmSelect();//开启信道确认模式
channel.queueDeclare(Constant.CONFIRM_QUEUE1,true,false,false,null);//创建队列
String message = "发送信息";
for (int i = 0; i < 200; i++){
channel.basicPublish("",Constant.CONFIRM_QUEUE1,null,(message+i).getBytes());
channel.waitForConfirms(5000);//等待消息确认,如果超过规定的等待时间还没有确认,则抛出异常
}
}
}
在编写代码之前,我们首先要使用try语句与服务器建立连接.
之后在创建Channel之后,需要开启信道的确认模式.channel.confirmSelect().
这里我们在发送消息之后,需要做的最重要的一件事就是等待消息的确认channel.waitForConfirms(5000),在这个方法中可以指定阻塞时间,如果在指定的时间内消息被确认,这个方法就会立即返回,如果在指定时间之内没有确认消息,则会抛出异常.
/**
* 批量确认
*/
private static void publishMessagesInBatch() throws IOException, TimeoutException, InterruptedException {
try (Connection connection = createConnection()){
Channel channel = connection.createChannel();
channel.confirmSelect();//设置为确认模式
channel.queueDeclare(Constant.CONFIRM_QUEUE2,true,false,false,null);
String message = "批量确认发送消息";
int batchSize = 100;//每次批量发送的消息条数
int outstandingMessageCount = 0;//记录已经发送的条数
for (int i = 0 ; i < 200 ; i++){
channel.basicPublish("",Constant.CONFIRM_QUEUE2,null,(message+i).getBytes());
outstandingMessageCount++;//每发送一条消息,参数就进行++
if(outstandingMessageCount == batchSize){//达到了一次性批量发送的指定数量,等待确认,确认完成之后,将参数清零
channel.waitForConfirms(5000);
outstandingMessageCount = 0;
}
}
//如果发送的消息不是100的倍数,就还有消息没有确认
if (outstandingMessageCount > 0){
channel.waitForConfirms(5000);
}
}
}
这里需要注意的几点就是,在发送的消息达到一次性最大的批量数量的时候,就要确认,如果确认成功之后,需要把记录的发送数量清零.
之后,就是在出循环之后,如果发送的消息条数不是batchSize的整数倍的时候,这时候不满足循环之内的if条件,还是有一些消息没有确认完成,就需要在循环之外再次进行确认.
异步确认就是,生产者在发送消息的同时,还可以确认消息是否收到.
Channel接口中为我们提供了一个方法,addConfirmListener.这个方法可以添加ConfirmListener回调接口.
ConfirmListener中包含两个方法: handleAck(long deliveryTag, boolean multiple) 和handleNack(long deliveryTag, boolean multiple),分别对应处理的是MQ发送给生产者的ack和nack.其中ack代表的是消息确认成功,即消息都收到的情况下,nack指的是消息在确认的时候出现了一些问题.deliveryTag表示的是发送消息的序号,multiple表示是否批量确认.
这里我们还需要一个有序集合来存储为确认的消息.
/**
* 异步确认
*/
private static void handlePublishConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException {
try (Connection connection = createConnection()){
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.queueDeclare(Constant.CONFIRM_QUEUE3,true,false,false,null);
SortedSet<Long> set = Collections.synchronizedSortedSet(new TreeSet<>());//设置一个有序集合,用来存储未确认消息的序号
channel.addConfirmListener(new ConfirmListener() {//为信道添加,监听消息的确认情况
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {//表示消息被成功确认
if (multiple){//判断消息的处理是否批量
set.headSet(deliveryTag+1).clear();//将小于deliveryTag的消息全部清除,证明这批消息已经被ack了
}else {
set.remove(set.last());//如果不是批量,清除最后一个即可
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple){//判断消息的处理是否批量
set.headSet(deliveryTag+1).clear();//将小于deliveryTag的消息全部清除,证明这批消息已经被ack了
}else {
set.remove(set.last());//如果不是批量,清除最后一个即可
}
//这里在消息确认不成功的时候,需要重传,这里省略
}
});
String message = "异步确认发送消息";
for (int i = 0 ;i < 200 ;i++){
long nextPublishSeqNo = channel.getNextPublishSeqNo();//获取到消息发送的序号
channel.basicPublish("",Constant.CONFIRM_QUEUE3,null,(message+i).getBytes());
set.add(nextPublishSeqNo);//把这些消息都添加到集合中
}
while (!set.isEmpty()){//等待集合中的消息都被确认完成
Thread.sleep(1000);
}
}
}
这里我们在消息确认成功之后,即handleAck方法被调用的时候,需要把这些消息都从集合中清除掉.一种是批量的情况,直接清除掉deliveryTag之前所有的消息,另一种是没有批量的情况,直接清除掉最后一个元素即可.当然在没有确认成功的情况下,我们需要根据具体的业务逻辑进行消息的重发.在给队列中发送消息的时候,我们需要从Channel中获取到下次发送消息开始的序号,之后我们把开始的序号放入set中,代表这些消息还没有被处理过.最后我们需要等待消息确认完成,只要存放未确认消息的set中不为空,就证明还有消息没有被确认,我们就进行阻塞等待.
上面三种方式中,假如发送的消息较多,这三种策略的执行时间:单个确认>批量确认>异步确认.
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- baijiahaobaidu.com 版权所有 湘ICP备2023023988号-9
违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务