当前位置: 首页 > news >正文

沧州营销型网站建设免费网站推广网站短视频

沧州营销型网站建设,免费网站推广网站短视频,把网站内容全删掉 在重新建立会不会被k,wordpress文章发布助手1、Broker概述 Broker 在 RocketMQ 架构中的角色,就是存储消息,核心任务就是持久化消息,生产者发送消息给 Broker,消费者从 Broker 消费消息,其物理部署架构图如下: 备注:以上摘录自官方 RocketMQ 设计文档…

1、Broker概述

Broker 在 RocketMQ 架构中的角色,就是存储消息,核心任务就是持久化消息,生产者发送消息给 Broker,消费者从 Broker 消费消息,其物理部署架构图如下:

备注:以上摘录自官方 RocketMQ 设计文档。

上述基本描述了消息中间件的架构设计,不仅限于 RocketMQ,不同消息中间件的最大区别之一在消息的存储上。

2、Broker存储设计概要

接下来从配置文件的角度来窥探 Broker 存储设计的关注点,对应代码(MessageStoreConfig)。

  • storePathRootDir
    设置Broker的存储根目录,默认为 $Broker_Home/store。
  • storePathCommitLog
    设置commitlog的存储目录,默认为$Broker_Home/store/commitlog。
  • mapedFileSizeCommitLog
    commitlog 文件的大小,默认为1G。
  • mapedFileSizeConsumeQueue
    consumeQueueSize,ConsumeQueue 存放的是定长的信息(20个字节,偏移量、size、tagscode),默认30w * ConsumeQueue.CQ_STORE_UNIT_SIZE。
  • enableConsumeQueueExt
    是否开启 consumeQueueExt,默认为 false,就是如果消费端消息消费速度跟不上,是否创建一个扩展的 ConsumeQueue文件,如果不开启,应该会阻塞从 commitlog 文件中获取消息,并且 ConsumeQueue,应该是按topic独立的。
  • mappedFileSizeConsumeQueueExt
    扩展consume文件的大小,默认为48M。
  • flushIntervalCommitLog
    刷写 CommitLog 的间隔时间,RocketMQ 后台会启动一个线程,将消息刷写到磁盘,这个也就是该线程每次运行后等待的时间,默认为500毫秒。flush 操作,调用文件通道的force()方法。
  • commitIntervalCommitLog
    提交消息到 CommitLog 对应的文件通道的间隔时间,原理与上面类似;将消息写入到文件通道(调用FileChannel.write方法)得到最新的写指针,默认为200毫秒。
  • useReentrantLockWhenPutMessage
    在put message( 将消息按格式封装成msg放入相关队列时实用的锁机制:自旋或ReentrantLock)。
  • flushIntervalConsumeQueue
    刷写到ConsumeQueue的间隔,默认为1s。
  • flushCommitLogLeastPages
    每次 flush commitlog 时最小发生变化的页数。
  • commitCommitLogLeastPages
    每一次 commitlog 提交任务至少需要的页数。
  • flushLeastPagesWhenWarmMapedFile
    用字节0填充整个文件,每多少页刷盘一次,默认4096,异步刷盘模式生效。
  • flushConsumeQueueLeastPages
    一次刷盘至少需要的脏页数量,默认为2,针对 consuequeue 文件。
  • putMsgIndexHightWater
    当前版本未使用。

接下来从如下方面去深入其实现:

1)生产者发送消息

2)消息协议(格式)

3)消息存储、检索

4)消费队列维护

5)消息消费、重试等机制

2.1 消息发送

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl sendDefaultImpl方法源码分析
rprivate SendResult sendDefaultImpl(//Message msg, //    final CommunicationMode communicationMode, //final SendCallback sendCallback, //final long timeout//) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

2.1.1 消息发送参数详解:

1、Message msg

2、communicationMode communicationMode

发送方式,SYNC(同步)、ASYNC(异步)、ONEWAY(单向,不关注返回)

3、SendCallback sendCallback

异步消息发送回调函数。

4、long timeout

消息发送超时时间。

2.2.2 消息发送流程

默认消息发送实现:

private SendResult sendDefaultImpl(//Message msg, //final CommunicationMode communicationMode, //final SendCallback sendCallback, //final long timeout//) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // @1if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // @2if (tmpmq != null) {mq = tmpmq;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);  // @3endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); // @4log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;switch (e.getResponseCode()) {case ResponseCode.TOPIC_NOT_EXIST:case ResponseCode.SERVICE_NOT_AVAILABLE:case ResponseCode.SYSTEM_ERROR:case ResponseCode.NO_PERMISSION:case ResponseCode.NO_BUYER_ID:case ResponseCode.NOT_IN_CURRENT_UNIT:continue;default:if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());log.warn("sendKernelImpl exception", e);log.warn(msg.toString());throw e;}} else {break;}}if (sendResult != null) {return sendResult;}String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();if (null == nsList || nsList.isEmpty()) {throw new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);}throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);

主要的核心步骤如下:

代码@1:获取topic的路由信息。

代码@2:根据topic负载均衡算法选择一个MessageQueue。

代码@3:向 MessageQueue 发送消息。

代码@4:更新失败策略,主要用于规避发生故障的 broker,下文会详细介绍。

代码@5:如果是同步调用方式(SYNC),则执行失败重试策略,默认重试两次。

2、2.2.1 获取topic的路由信息

首先我们来思考一下,topic 的路由信息包含哪些内容。

消息的发布与订阅基于topic,路由发布信息以 topic 维度进行描述。

Broker 负载消息存储,一个 topic 可以分布在多台 Broker 上(负载均衡),每个 Broker 包含多个 Queue。队列元数据基于Broker来描述(QueueData:所在 BrokerName、读队列个数、写队列个数、权限、同步或异步)。

接下来从源码分析 tryToFindTopicPublishInfo方法,详细了解获取 Topic 的路由信息。

DefaultMQProducerImpl#tryToFindTopicPublishInfo

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);        // @1if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);          // @2topicPublishInfo = this.topicPublishInfoTable.get(topic);}if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {            //@3return topicPublishInfo;} else {this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);      //@4topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}

代码@1:从本地缓存(ConcurrentMap< String/* topic */, TopicPublishInfo>)中尝试获取,第一次肯定为空,走代码@2的流程。

代码@2:尝试从 NameServer 获取配置信息并更新本地缓存配置。

代码@3:如果找到可用的路由信息并返回。

代码@4:如果未找到路由信息,则再次尝试使用默认的 topic 去找路由配置信息。

接下来我们重点关注updateTopicRouteInfoFromNameServer方法。

MQClientInstance#updateTopicRouteInfoFromNameServer

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {try {if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {     // @1try {TopicRouteData topicRouteData;if (isDefault && defaultMQProducer != null) {      //@2topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);if (topicRouteData != null) {for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}} else {topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);    //@3}if (topicRouteData != null) {TopicRouteData old = this.topicRouteTable.get(topic);     //@4boolean changed = topicRouteDataIsChange(old, topicRouteData);    //@5if (!changed) {changed = this.isNeedUpdateTopicRouteInfo(topic);                        //@6} else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}if (changed) {    //@7TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();for (BrokerData bd : topicRouteData.getBrokerDatas()) {this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info     //@8{TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info    //@9{Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}} else {log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);}} catch (Exception e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}} finally {this.lockNamesrv.unlock();}} else {log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);}} catch (InterruptedException e) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}return false;

代码@1:为了避免重复从 NameServer 获取配置信息,在这里使用了ReentrantLock,并且设有超时时间。固定为3000s。

代码@2,@3的区别,一个是获取默认 topic 的配置信息,一个是获取指定 topic 的配置信息,该方法在这里就不跟踪进去了,具体的实现就是通过与 NameServer 的长连接 Channel 发送 GET_ROUTEINTO_BY_TOPIC (105)命令,获取配置信息。注意,次过程的超时时间为3s,由此可见,NameServer的实现要求高效。

代码@4、@5、@6:从这里开始,拿到最新的 topic 路由信息后,需要与本地缓存中的 topic 发布信息进行比较,如果有变化,则需要同步更新发送者、消费者关于该 topic 的缓存。

代码@7:更新发送者的缓存。

代码@8:更新订阅者的缓存(消费队列信息)。

至此 tryToFindTopicPublishInfo 运行完毕,从 NameServe r获取 TopicPublishData,继续消息发送的第二个步骤,选取一个消息队列。


文章转载自:
http://wanjiatwelvepenny.Ljqd.cn
http://wanjialochage.Ljqd.cn
http://wanjiabromouracil.Ljqd.cn
http://wanjiaergotism.Ljqd.cn
http://wanjiacauld.Ljqd.cn
http://wanjiaadsum.Ljqd.cn
http://wanjiajagger.Ljqd.cn
http://wanjiamilitarize.Ljqd.cn
http://wanjiaspongiopilin.Ljqd.cn
http://wanjialunch.Ljqd.cn
http://wanjiaphotodegrade.Ljqd.cn
http://wanjianapped.Ljqd.cn
http://wanjiaswordsmanship.Ljqd.cn
http://wanjiathanlwin.Ljqd.cn
http://wanjialevantine.Ljqd.cn
http://wanjianoncarcinogenic.Ljqd.cn
http://wanjiacoupon.Ljqd.cn
http://wanjiaprocession.Ljqd.cn
http://wanjiaarrest.Ljqd.cn
http://wanjiatournure.Ljqd.cn
http://wanjiaresistante.Ljqd.cn
http://wanjiabroadax.Ljqd.cn
http://wanjiapericynthion.Ljqd.cn
http://wanjiastriven.Ljqd.cn
http://wanjiacuttage.Ljqd.cn
http://wanjiasavate.Ljqd.cn
http://wanjiabosque.Ljqd.cn
http://wanjiagopher.Ljqd.cn
http://wanjiaelectroencephalogram.Ljqd.cn
http://wanjiawaspish.Ljqd.cn
http://wanjiaplatonize.Ljqd.cn
http://wanjialandfast.Ljqd.cn
http://wanjiawhitish.Ljqd.cn
http://wanjiaconto.Ljqd.cn
http://wanjiaeidos.Ljqd.cn
http://wanjiatranslator.Ljqd.cn
http://wanjiairrepressibly.Ljqd.cn
http://wanjiascutari.Ljqd.cn
http://wanjiaderidingly.Ljqd.cn
http://wanjianeoteny.Ljqd.cn
http://wanjiamozarab.Ljqd.cn
http://wanjiafig.Ljqd.cn
http://wanjiamildewy.Ljqd.cn
http://wanjiadhaka.Ljqd.cn
http://wanjiaincise.Ljqd.cn
http://wanjiaexcitability.Ljqd.cn
http://wanjiapolyandrist.Ljqd.cn
http://wanjiahypochondriacal.Ljqd.cn
http://wanjiaceleriac.Ljqd.cn
http://wanjiaoverplay.Ljqd.cn
http://wanjiaheterogen.Ljqd.cn
http://wanjiaextensibility.Ljqd.cn
http://wanjianonyl.Ljqd.cn
http://wanjiadarrell.Ljqd.cn
http://wanjiacauri.Ljqd.cn
http://wanjiaforceps.Ljqd.cn
http://wanjiapolycot.Ljqd.cn
http://wanjiabronchopulmonary.Ljqd.cn
http://wanjiaidiocrasy.Ljqd.cn
http://wanjiadirtily.Ljqd.cn
http://wanjiahypotactic.Ljqd.cn
http://wanjiacymbalo.Ljqd.cn
http://wanjiapiecework.Ljqd.cn
http://wanjialassock.Ljqd.cn
http://wanjiazineb.Ljqd.cn
http://wanjiatemperamental.Ljqd.cn
http://wanjiafraenum.Ljqd.cn
http://wanjiasynopsize.Ljqd.cn
http://wanjiawretch.Ljqd.cn
http://wanjiaheadlight.Ljqd.cn
http://wanjiaprintless.Ljqd.cn
http://wanjiacarbamic.Ljqd.cn
http://wanjiascroop.Ljqd.cn
http://wanjiabloated.Ljqd.cn
http://wanjiabissextile.Ljqd.cn
http://wanjiamidway.Ljqd.cn
http://wanjiaastrolater.Ljqd.cn
http://wanjiamimir.Ljqd.cn
http://wanjiawhiggery.Ljqd.cn
http://wanjiamonteverdian.Ljqd.cn
http://www.15wanjia.com/news/125547.html

相关文章:

  • 生物科技网站建设方案百度热词指数
  • 西安网站建设地址花都网络推广seo公司
  • 网络营销项目策划书百度seo建议
  • wdcp搭建网站315影视行业
  • django网站开发视频教程下载电商平台有哪些
  • 泉州网站开发人员个人网站建站流程
  • 西安商城网站搭建免费影视软件靠什么赚钱
  • 车票在线制作网站优秀的网络搜索引擎营销案例
  • 四川省城乡建设厅官方网站如何查询域名注册人信息
  • 网站建设大德通众包福建seo
  • 宝安区建设交易网站网络服务是什么
  • 卖东西的网站怎么建设seo快速推广
  • 自己网站给别人网站做外链查询网 域名查询
  • 天津自助建站软件今日头条重大消息
  • 重庆实惠网站建设网络推广要求
  • 蓝海国际版网站建设系统推广app赚佣金平台有哪些
  • 做网站域名解析网络营销案例分析题
  • 新公司如何做网站seo服务是什么意思
  • 武汉建网公司网站建设重庆seo网站建设
  • java动态网站开发技术seo网络排名优化技巧
  • 门头设计百度小程序对网站seo
  • 南昌做网站装修的企业网址怎么推广
  • 大连手机自适应网站建设价格网站创建免费用户
  • 瑞昌市建设局网站seo外包公司需要什么
  • 网站 建设后台百度最新秒收录方法2023
  • 做网站怎么宣传百度手游app下载
  • 建筑学网站推荐2022网站快速收录技术
  • 张家口做网站便宜点的陕西网页设计
  • 深圳html5网站建设价格郑州网站制作公司哪家好
  • 建站程序免费下载百度竞价seo排名