当前位置: 首页 > news >正文

门户网站的首页模板本站3天更换一次域名yw

门户网站的首页模板,本站3天更换一次域名yw,有什么可以做建筑模型的网站,南充网站建设目录 1.消息可靠性 1.1.生产者消息确认 1.1.1.修改配置 1.1.2.定义Return回调 1.1.3.定义ConfirmCallback 1.2.消息持久化 1.2.1.交换机持久化 1.2.2.队列持久化 1.2.3.消息持久化 1.3.消费者消息确认 1.3.1.演示none模式 1.3.2.演示auto模式 1.4.消费失败重试机制…

目录

1.消息可靠性

1.1.生产者消息确认

1.1.1.修改配置

1.1.2.定义Return回调

1.1.3.定义ConfirmCallback

1.2.消息持久化

1.2.1.交换机持久化

1.2.2.队列持久化

1.2.3.消息持久化

1.3.消费者消息确认

1.3.1.演示none模式

1.3.2.演示auto模式

1.4.消费失败重试机制

1.4.1.本地重试

1.4.2.失败策略

1.5.总结

1.消息可靠性

消息从发送,到消费者接收,会经理多个过程:

其中的每一步都可能导致消息丢失,常见的丢失原因包括:

  • 发送时丢失:

    • 生产者发送的消息未送达exchange

    • 消息到达exchange后未到达queue

  • MQ宕机,queue将消息丢失

  • consumer接收到消息后未消费就宕机

针对这些问题,RabbitMQ分别给出了解决方案:

  • 生产者确认机制

  • mq持久化

  • 消费者确认机制

  • 失败重试机制

下面我们就通过案例来演示每一个步骤。

首先,导入课前资料提供的demo工程:

项目结构如下:

1.1.生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

返回结果有两种方式:

  • publisher-confirm,发送者确认

    • 消息成功投递到交换机,返回ack

    • 消息未投递到交换机,返回nack

  • publisher-return,发送者回执

    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

注意:

1.1.1.修改配置

首先,修改publisher服务中的application.yml文件,添加下面的内容:

spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true

说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:

    • simple:同步等待confirm结果,直到超时

    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback

  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback

  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

1.1.2.定义Return回调

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:

修改publisher服务,添加一个:

package cn.itcast.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 投递失败,记录日志log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要,可以重发消息});}
}

1.1.3.定义ConfirmCallback

ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。

在publisher服务的cn.itcast.mq.spring.SpringAmqpTest类中,定义一个单元测试方法:

public void testSendMessage2SimpleQueue() throws InterruptedException {// 1.消息体String message = "hello, spring amqp!";// 2.全局唯一的消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(result -> {if(result.isAck()){// 3.1.ack,消息成功log.debug("消息发送成功, ID:{}", correlationData.getId());}else{// 3.2.nack,消息失败log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()));// 4.发送消息rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);// 休眠一会儿,等待ack回执Thread.sleep(2000);
}

1.2.消息持久化

生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。

要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

  • 交换机持久化

  • 队列持久化

  • 消息持久化

1.2.1.交换机持久化

RabbitMQ中交换机默认是非持久化的,mq重启后就丢失。

SpringAMQP中可以通过代码指定交换机持久化:

@Bean
public DirectExchange simpleExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange("simple.direct", true, false);
}

事实上,默认情况下,由SpringAMQP声明的交换机都是持久化的。

可以在RabbitMQ控制台看到持久化的交换机都会带上D的标示:

1.2.2.队列持久化

RabbitMQ中队列默认是非持久化的,mq重启后就丢失。

SpringAMQP中可以通过代码指定交换机持久化:

@Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}

事实上,默认情况下,由SpringAMQP声明的队列都是持久化的。

可以在RabbitMQ控制台看到持久化的队列都会带上D的标示:

1.2.3.消息持久化

利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:

  • 1:非持久化

  • 2:持久化

用java代码指定:

默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。

1.3.消费者消息确认

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。

而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。

设想这样的场景:

  • 1)RabbitMQ投递消息给消费者

  • 2)消费者获取消息后,返回ACK给RabbitMQ

  • 3)RabbitMQ删除消息

  • 4)消费者宕机,消息尚未处理

这样,消息就丢失了。因此消费者返回ACK的时机非常重要。

而SpringAMQP则允许配置三种确认模式:

•manual:手动ack,需要在业务代码结束后,调用api发送ack。

•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack

•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

由此可知:

  • none模式下,消息投递是不可靠的,可能丢失

  • auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack

  • manual:自己根据业务情况,判断什么时候该ack

一般,我们都是使用默认的auto即可。

1.3.1.演示none模式

修改consumer服务的application.yml文件,添加下面内容:

spring:rabbitmq:listener:simple:acknowledge-mode: none # 关闭ack

修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {log.info("消费者接收到simple.queue的消息:【{}】", msg);// 模拟异常System.out.println(1 / 0);log.debug("消息处理完成!");
}

测试可以发现,当消息处理抛异常时,消息依然被RabbitMQ删除了。

1.3.2.演示auto模式

再次把确认机制修改为auto:

spring:rabbitmq:listener:simple:acknowledge-mode: auto # 关闭ack

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态):

抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除:

1.4.消费失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:

怎么办呢?

1.4.1.本地重试

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了

  • 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试

  • 重试达到最大次数后,Spring会返回ack,消息会被丢弃

1.4.2.失败策略

在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

1)在consumer服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码:

package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;@Configuration
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

1.5.总结

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列

  • 开启持久化功能,确保消息未消费前在队列中不会丢失

  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack

  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

http://www.15wanjia.com/news/48302.html

相关文章:

  • 滁州做网站优化爱站网收录
  • 怎么做送餐网站谷歌浏览器官网入口
  • 半成品个人在家加工北京谷歌seo
  • 腾讯云服务器网站域名备案正在直播足球比赛
  • 电子产品外包加工项目seo站内优化和站外优化
  • 陕西渭南住房和城乡建设厅网站黄页网站推广效果
  • 做网站西域数码阿里云抖音推广平台联系方式
  • 中山专业做网站公司苏州百度推广代理商
  • 郑州市建设工程造价信息网站武汉seo系统
  • 装修网站怎么做建个网站需要多少钱
  • 长滚动页网站怎么做的怀化网站seo
  • 做网站需要什么域名百度收录官网
  • 中国建设人才服务信息网是不是正规网站永久免费域名注册
  • 自己做网站怎么租服务器2024年最新一轮阳性症状
  • 地方门户网站的分类有什么推广软件
  • 关于教做鞋的网站百度助手下载
  • 做钟点工 网站网站制作企业
  • 河南网站网站制作360官方网站网址
  • 重庆沙坪坝做网站注册域名的步骤
  • 网站建设投诉去哪里投诉免费创建网站
  • 深圳做高端网站建设公司网盘app下载
  • 漳诈网站建设免费域名空间申请网址
  • 建筑公司企业信用分短视频seo搜索优化
  • 深圳单位网站建设服务公司百度贴吧官网app下载
  • 企业做网站的方案百度竞价点击神器奔奔
  • 做ppt网站深圳外包网络推广
  • 手机做任务赚钱的网站武汉seo人才
  • wordpress nicetheme优化设计答案六年级上册
  • 为什么网站建设需要每年续费深圳网络公司推广平台
  • 网站开发都是使用框架吗江苏seo哪家好