当前位置: 首页 > news >正文

怎么免费建网站政府免费培训 面点班

怎么免费建网站,政府免费培训 面点班,天元建设集团有限公司 企查查,无锡网站制作优化排名RabbitMq的确认机制和延时通知 一、消息发送确认 在RabbitConfig中两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback; 1、交换机确认:ConfirmCallback方法 ConfirmCallback 是一个回调接口,消息发送到…

RabbitMq的确认机制和延时通知

一、消息发送确认

在RabbitConfig中两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;

1、交换机确认:ConfirmCallback方法

ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中。

我们需要在生产者的配置中添加下面配置,表示开启发布者确认。

spring.rabbitmq.publisher-confirm-type=correlated # 新版本
spring.rabbitmq.publisher-confirms=true # 老版本

实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationData、ack、cause。

  • correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
  • ack:消息投递到broker 的状态,true表示成功。
  • cause:表示投递失败的原因。

2、队列确认:ReturnCallback方法

交换机接收到消息后可以判断当前的路径发送没有问题,但是不能保证消息能够发送到路由队列的。而发送者是不知道这个消息有没有送达队列的,因此,我们需要在队列中进行消息确认。这就是回退消息。

实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。

添加以下配置:

spring.rabbitmq.publisher-returns=true

3、消息发送确认代码实现

在rabbitConfig中实现接口

package com.it520.bookkeeping.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author : huliupan* @CreateTime : 2022-10-13* @Description : RabbitMQ的config**/
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);System.out.println("ConfirmCallback:     "+"确认情况:"+ack);System.out.println("ConfirmCallback:     "+"原因:"+cause);}});rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("ReturnCallback:     "+"消息:"+returnedMessage.getMessage());System.out.println("ReturnCallback:     "+"回应码:"+returnedMessage.getReplyCode());System.out.println("ReturnCallback:     "+"回应信息:"+returnedMessage.getReplyText());System.out.println("ReturnCallback:     "+"交换机:"+returnedMessage.getExchange());System.out.println("ReturnCallback:     "+"路由键:"+returnedMessage.getRoutingKey());}});return rabbitTemplate;}}

4、回调的触发情况

那么以上这两种回调函数都是在什么情况会触发呢?

①消息推送到server,但是在server里找不到交换机
②消息推送到server,找到交换机了,但是没找到队列
③消息推送到server,交换机和队列啥都没找到
④消息推送成功

那么我先写几个接口来分别测试和认证下以上4种情况,消息确认触发回调函数的情况:

  • 消息推送到server,但是在server里找不到交换机
    写个测试接口,把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的):
@GetMapping("/TestMessageAck")
public String TestMessageAck() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: non-existent-exchange test message ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);return "ok";
}

调用接口,查看rabbitmq-provuder项目的控制台输出情况(原因里面有说,没有找到交换机’non-existent-exchange’):

ConfirmCallback:     相关数据:null
ConfirmCallback:     确认情况:false
ConfirmCallback:     原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)
2022-10-14 16:15:58.721 ERROR 4868 --- [.98.153.34:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)

​ 结论: ①这种情况触发的是 ConfirmCallback 回调函数。

  • 消息推送到server,找到交换机了,但是没找到队列
    这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作:
@Bean
DirectExchange lonelyDirectExchange() {return new DirectExchange("lonelyDirectExchange");
}

然后写个测试接口,把消息推送到名为‘lonelyDirectExchange’的交换机上(这个交换机是没有任何队列配置的):

@GetMapping("/TestMessageAck2")
public String TestMessageAck2() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: lonelyDirectExchange test message ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);return "ok";
}

调用接口,查看rabbitmq-provuder项目的控制台输出情况:

ConfirmCallback:     相关数据:null
ConfirmCallback:     确认情况:true
ConfirmCallback:     原因:null
ReturnCallback:     消息:(Body:'{createTime=2022-10-14 16:17:47, messageId=c001cbbe-7792-465f-b6bd-cb6f3bdde27b, messageData=message: lonelyDirectExchange test message }' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback:     回应码:312
ReturnCallback:     回应信息:NO_ROUTE
ReturnCallback:     交换机:lonelyDirectExchange
ReturnCallback:     路由键:TestDirectRouting

可以看到这种情况,两个函数都被调用了;
这种情况下,消息是推送成功到服务器了的,所以ConfirmCallback对消息确认情况是true;
而在RetrunCallback回调函数的打印参数里面可以看到,消息是推送到了交换机成功了,但是在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。
结论:这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。

  • 消息推送到sever,交换机和队列啥都没找到
 @GetMapping("/TestMessageAck2")public String TestMessageAck2() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: lonelyDirectExchange test message ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);return "ok";}

返回结果:和没有交换机的一样

022-10-14 16:19:11.882 ERROR 4868 --- [.98.153.34:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)
ConfirmCallback:     相关数据:null
ConfirmCallback:     确认情况:false
ConfirmCallback:     原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)
  • 消息推送成功

那么测试下,按照正常调用之前消息推送的接口就行,就调用下 /sendFanoutMessage接口,可以看到控制台输出:

ConfirmCallback:     相关数据:null
ConfirmCallback:     确认情况:true
ConfirmCallback:     原因:null

结论: 这种情况触发的是 ConfirmCallback 回调函数。

二、消息消费确认机制

Springboot 的确认模式有三种,配置如下:

spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • NONE : 不确认 :
    • 1、默认所有消息消费成功,会不断的向消费者推送消息
    • 2、因为 rabbitmq 认为所有消息都被消费成功。所以队列中存在丢失消息风险。
  • AUTO:自动确认
    • 1、根据消息处理逻辑是否抛出异常自动发送 ack(正常)和nack(异常)给服务端,如果消费者本身逻辑没有处理好这条数据就存在丢失消息的风险。
    • 2、使用自动确认模式时,需要考虑的另一件事情就是消费者过载。
  • MANUAL:手动确认
    • 1、手动确认在业务失败后进行一些操作,消费者调用 ack、nack、reject 几种方法进行确认,如果消息未被 ACK 则发送到下一个消费者或重回队列。
    • 2、ack 用于肯定确认;nack 用于 否定确认 ;reject 用于否定确认(一次只能拒绝单条消息)
1、自动确认

自动确认是指消费者在消费消息的时候,当消费者收到消息后,消息就会被 RabbitMQ 从队列中删除掉。这种模式认为 “发送即成功”。这是不安全的,因为消费者可能在业务中并没有成功消费完就中断了

2、手动确认

手动确认又分为肯定确认和否定确认。

2.1 basicAck 方法(肯定确认)

basicAck 方法用于确认当前消息,Channel 类中的 basicAck 方法定义如下:

void basicAck(long deliveryTag, boolean multiple) throws IOException;

参数说明:

long deliveryTag:唯一标识 ID,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。

boolean multiple:是否批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。

2.2 basicNack 方法(否定确认)

basicNack 方法用于否定当前消息。 由于 basicReject 方法一次只能拒绝一条消息,如果想批量拒绝消息,则可以使用 basicNack 方法。消费者客户端可以使用 channel.basicNack 方法来实现,方法定义如下:

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

参数说明:

long deliveryTag:唯一标识 ID。

boolean multiple:是否批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。

boolean requeue:如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,而不会把它发送给新的消费者。

2.3 basicReject 方法(否定确认)

basicReject 方法用于明确拒绝当前的消息而不是确认。 RabbitMQ 在 2.0.0 版本开始引入 Basic.Reject 命令,消费者客户端可以调用与其对应的 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息。

Channel 类中的basicReject 方法定义如下:

void basicReject(long deliveryTag, boolean requeue) throws IOException;

参数说明:

long deliveryTag:唯一标识 ID。

boolean requeue:上面已经解释。

利用之前的Fanout交换机的消息发送来测试消息确认

package com.it520.bookkeeping.receiver;import com.it520.bookkeeping.config.FanoutRabbitConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @Author : huliupan* @CreateTime : 2022/10/13* @Description :Fanout交换机的消费者**/
@Component
public class FanoutReceiver {@RabbitHandler@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_A)public void processA(Map testMessage, Message message, Channel channel) throws IOException, ClassNotFoundException {String consumerQueueName = message.getMessageProperties().getConsumerQueue();long deliveryTag = message.getMessageProperties().getDeliveryTag();if (FanoutRabbitConfig.FANOUT_QUEUE_A.equals(consumerQueueName)) {/*** 确认消息,参数说明:* long deliveryTag:唯一标识 ID。* boolean multiple:是否批处理,当该参数为 true 时,* 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。*/channel.basicAck(deliveryTag, true);System.out.println("fanout.a收到肯定确认了" + deliveryTag);}}@RabbitHandler@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_B)public void processB(Map testMessage, Message message, Channel channel) throws IOException {String consumerQueueName = message.getMessageProperties().getConsumerQueue();long deliveryTag = message.getMessageProperties().getDeliveryTag();if (FanoutRabbitConfig.FANOUT_QUEUE_B.equals(consumerQueueName)) {/*** 否定消息,参数说明:* long deliveryTag:唯一标识 ID。* boolean multiple:是否批处理,当该参数为 true 时,* 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。* boolean requeue:如果 requeue 参数设置为 true,* 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;* 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,* 而不会把它发送给新的消费者。*/channel.basicNack(deliveryTag, true, false);System.out.println("fanout.B收到否定确认了" + deliveryTag + "未重新放入队列 ");}}@RabbitHandler@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_C)public void processC(Map testMessage, Message message, Channel channel) throws IOException, InterruptedException {String consumerQueueName = message.getMessageProperties().getConsumerQueue();long deliveryTag = message.getMessageProperties().getDeliveryTag();Thread.sleep(5000);if (FanoutRabbitConfig.FANOUT_QUEUE_C.equals(consumerQueueName)) {/*** 拒绝消息,参数说明:* long deliveryTag:唯一标识 ID。* boolean requeue:如果 requeue 参数设置为 true,* 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;* 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,* 而不会把它发送给新的消费者。*/channel.basicReject(deliveryTag, true);System.out.println("fanout.C收到否定确认了" + deliveryTag + "重新放入队列 ");}}/*** Fanout.c的QUEUE的第二个消费者,避免无限循环* @param testMessage* @param message* @param channel* @throws IOException* @throws InterruptedException*/@RabbitHandler@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_C)public void processC2(Map testMessage, Message message, Channel channel) throws IOException, InterruptedException {String consumerQueueName = message.getMessageProperties().getConsumerQueue();long deliveryTag = message.getMessageProperties().getDeliveryTag();Thread.sleep(5000);if (FanoutRabbitConfig.FANOUT_QUEUE_C.equals(consumerQueueName)) {channel.basicAck(deliveryTag, true);System.out.println("fanout.C2收到keng定确认了" + deliveryTag );}}}

返回的结果

fanout.a收到肯定确认了1
fanout.B收到否定确认了1未重新放入队列 
fanout.C收到否定确认了1重新放入队列 
fanout.C2收到keng定确认了1

processC和processC2随机消费fanout.C队列,多个消费者避免一个消费者重新放入队列造成的循环

三、实现延时队列

RabbitMQ本身是不支持延时队列的,但是延时队列的使用还是很广泛的,例如一个订单下单之后有30分钟的支付时间,30分钟后要对订单的支付状态进行判断,这时候就需要用到延时队列。

目前基于RabbitMq实现延时队列的方法有两种:

1、死信队列

特性1、Time To Live(TTL)

RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)

RabbitMQ针对队列中的消息过期时间有两种方法可以设置。

  • A: 通过队列属性设置,队列中所有消息都有相同的过期时间。

  • B: 对消息进行单独设置,每条消息TTL可以不同。

    如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter

特性2、Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。

  • x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
  • x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送

队列出现dead letter的情况有:

  • 消息或者队列的TTL过期
  • 队列达到最大长度
  • 消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false

综合上述两个特性,设置了TTL规则之后当消息在一个队列中变成死信时,利用DLX特性它能被重新转发到另一个Exchange或者Routing Key,这时候消息就可以重新被消费了。

死信队列的配置

package com.it520.bookkeeping.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @Author : huliupan* @CreateTime : 2022/10/13* @Description :直连交换机的配置**/
@Configuration
public class DirectRabbitConfig {public static final String DIRECT_EXCHANGE_NAME = "TestDirectExchange";public static final String DIRECT_QUEUE_NAME = "TestDirectQueue";public static final String DIRECT_QUEUE_NAME1 = "TestDirectQueue1";public static final String DIRECT_EXCHANGE_ROUTE_KEY = "TestDirectRouting";public static final String DIRECT_DELAY_EXCHANGE_NAME = "TestDirectDelayExchange";public static final String DIRECT_DELAY_QUEUE_NAME = "TestDirectDelayQueue";public static final String DIRECT_DELAY_QUEUE_NAME1 = "TestDirectDelayQueue1";public static final String DIRECT_DELAY_EXCHANGE_ROUTE_KEY = "TestDirectDelayRouting";//队列 起名:TestDirectQueue//声明用于失效的队列的延时失效属性,并绑定到对应的死信交换机@Beanpublic Queue TestDirectQueue() {Map<String, Object> args = new HashMap<>(2);// x-dead-letter-exchange    这里声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", DIRECT_DELAY_EXCHANGE_NAME);// x-dead-letter-routing-key  这里声明当前队列的死信路由keyargs.put("x-dead-letter-routing-key", DIRECT_DELAY_EXCHANGE_ROUTE_KEY);// x-message-ttl  声明队列的TTLargs.put("x-message-ttl", 1000 * 10);return new Queue(DIRECT_QUEUE_NAME, true, false, false, args);}//队列 起名:TestDirectQueue1@Beanpublic Queue TestDirectQueue1() {Map<String, Object> args = new HashMap<>(2);// x-dead-letter-exchange    这里声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", DIRECT_DELAY_EXCHANGE_NAME);// x-dead-letter-routing-key  这里声明当前队列的死信路由keyargs.put("x-dead-letter-routing-key", DIRECT_DELAY_EXCHANGE_ROUTE_KEY);// x-message-ttl  声明队列的TTLargs.put("x-message-ttl", 1000 * 5);return new Queue(DIRECT_QUEUE_NAME1, true, false, false, args);}//Direct交换机 起名:TestDirectExchange@BeanDirectExchange TestDirectExchange() {//  return new DirectExchange("TestDirectExchange",true,true);return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);}//声明延时交换机@BeanDirectExchange createDelayExchange() {return new DirectExchange(DIRECT_DELAY_EXCHANGE_NAME, true, false);}//声明死信队列@BeanQueue delayQueue() {return new Queue(DIRECT_DELAY_QUEUE_NAME, true);}//声明死信队列@BeanQueue delayQueue1() {return new Queue(DIRECT_DELAY_QUEUE_NAME1, true);}//延时交换机和死信队列绑定@BeanBinding bindingDelayQueue() {return BindingBuilder.bind(delayQueue()).to(createDelayExchange()).with(DIRECT_DELAY_EXCHANGE_ROUTE_KEY);}//延时交换机和死信队列绑定@BeanBinding bindingDelayQueue1() {return BindingBuilder.bind(delayQueue1()).to(createDelayExchange()).with(DIRECT_DELAY_EXCHANGE_ROUTE_KEY);}//绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting@BeanBinding bindingDirect() {return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(DIRECT_EXCHANGE_ROUTE_KEY);}@BeanBinding bindingDirect1() {return BindingBuilder.bind(TestDirectQueue1()).to(TestDirectExchange()).with(DIRECT_EXCHANGE_ROUTE_KEY);}@BeanDirectExchange lonelyDirectExchange() {return new DirectExchange("lonelyDirectExchange");}}

消息发送者;

    @GetMapping("/sendDirectMessage")public String sendDirectMessage() {String messageData = "test message, hello!";//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchangerabbitTemplate.convertAndSend(DirectRabbitConfig.DIRECT_EXCHANGE_NAME, DirectRabbitConfig.DIRECT_EXCHANGE_ROUTE_KEY, messageData);return "ok";}

死信消息的接收者

  @RabbitHandler@RabbitListener(queues = DirectRabbitConfig.DIRECT_DELAY_QUEUE_NAME)//监听的队列名称 TestDirectDelayQueuepublic void processDelay(String testMessage, Message message , Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("死信消息properties" + message.getMessageProperties());System.out.println("DirectReceiver Delay消费者收到消息  : " + testMessage.toString());channel.basicAck(deliveryTag , false);}@RabbitHandler@RabbitListener(queues = DirectRabbitConfig.DIRECT_DELAY_QUEUE_NAME1)//监听的队列名称 TestDirectDelayQueue1public void processDelay1(String testMessage, Message message , Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("死信消息properties" + message.getMessageProperties());System.out.println("DirectReceiver Delay1消费者收到消息  : " + testMessage.toString());channel.basicAck(deliveryTag , false);}
2、RabbitMq的插件

插件实现参考:https://blog.csdn.net/u014308482/article/details/53036770

http://www.15wanjia.com/news/16933.html

相关文章:

  • 摄影作品欣赏网站搜索引擎 磁力吧
  • 音乐外链网站bing搜索 国内版
  • 国际业务网站有哪些网络广告代理
  • 做景观设计比赛的网站长春头条新闻今天
  • 临汾做网站电话宁波网络优化seo
  • 高密网站制作汕头seo优化培训
  • 互联网推广方法南京seo整站优化技术
  • 网站建设的用户体验适合seo软件
  • 商城网站建设需要多少钱谷歌优化推广
  • 一站式网站建设顾问百度指数首页
  • 网站建设驻地开发合同推广方案如何写
  • 推广b2c网站百度广告竞价排名
  • 企业网站建设和管理十大经典案例
  • 酒庄企业网站win10系统优化工具
  • 丹江口做网站蜜雪冰城网络营销案例分析
  • 电子商务网站建设规划书厦门seo厦门起梦
  • 佛山seo扣费优化师和运营区别
  • 教学网站前台模板推广管理
  • wordpress 固定链接 无法访问厦门网站优化公司
  • wordpress静态网站博客郑州网站建设公司排行榜
  • 佛山网站建设有限公司外包公司被辞退有补偿吗
  • 南京中建乡旅建设投资有限公司网站seo优化诊断工具
  • 温州高端网站建设公司市场调研报告ppt
  • 浩森宇特北京网站建设营销网课
  • 专门做电商的招聘网站seo教程seo优化
  • 公司注册费用多少百度seo营销公司
  • php网站开发pdf优化大师官网入口
  • 在海口注册公司需要什么条件长沙seo工作室
  • 整个网站的关键词苏州百度推广公司地址
  • 网站公告弹窗源码网络营销的四大基础理论