当前位置: 首页 > news >正文

郑州设计公司汇总郑州有没有厉害的seo

郑州设计公司汇总,郑州有没有厉害的seo,上海网站建设webmeng,爱南宁app下载官网最新版前面分析完Broker启动会启动RemotingServer服务同时会注册Processor处理器,接着分析Producer进行消息的发送,当Producer发送完消息后就得到Broker去接收Producer发送的消息了。 Producer发送给Broker消息时候,发送的请求code为SEND_MESSAGE(这…

前面分析完Broker启动会启动RemotingServer服务同时会注册Processor处理器,接着分析Producer进行消息的发送,当Producer发送完消息后就得到Broker去接收Producer发送的消息了。
Producer发送给Broker消息时候,发送的请求code为SEND_MESSAGE(这里在上一章节有过分析),根据消息发送过来的Code,这时会调用NettyRemotingAbstract的processRequestCommand方法,该方法里面会根据消息传输的Code来取出对应的Processor,进入Processor系列类的SendMessageProcessor的asyncProcessRequest方法(前面这一部分之前都有过分析,接下来我们一起看看后面的操作,正好也将之前的知识串在一起更有利于理解和记忆)


public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {// 消息重回队列case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 解析消息头SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}// 构建上下文,并调用处理前钩子函数mqtraceContext = buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);// 判断批量消息还是单条消息if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}
}

首先解析消息头构建上下文,处理消息发送前钩子函数,最后异步处理消息请求,如果是批量消息调用asyncSendBatchMessage方法,如果是单条消息调用asyncSendMessage方法。

处理单条消息 private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 准备响应命令对象final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);// 时间msgInner.setBornTimestamp(requestHeader.getBornTimestamp());// 远程地址msgInner.setBornHost(ctx.channel().remoteAddress());// 主机msgInner.setStoreHost(this.getStoreHost());// 重试次数msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();// ...省略CompletableFuture<PutMessageResult> putMessageResult = null;String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);// 事务消息if (Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}// 事务消息的状态(后面再分析)putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 消息存储putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}// 生成结果返回return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}

构建MessageExtBrokerInner对象,设置相关属性执行asyncPutMessage方法存储消息并将结果返回客户端。

创建响应,验证以及自动创建topic


// 准备响应,验证以及自动创建topic
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,SendMessageRequestHeader requestHeader) {// 准备响应final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);// 设置唯一idresponse.setOpaque(request.getOpaque());response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));log.debug("Receive SendMessage request command {}", request);// 获取broker处理请求服务的起始时间final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();if (this.brokerController.getMessageStore().now() < startTimestamp) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));return response;}response.setCode(-1);// 验证topic以及自动创建逻辑super.msgCheck(ctx, requestHeader, response);return response;
}

this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()用来判断是否支持自动创建topic,根据权限来判断如果是不支持自动创建就将权限设置为可读可写不可继承,后面我们去判断是否可以去继承,如果能继承就说明支持自动创建,这是就会new一个TopicConfig,这样就通过autoCreateTopicEnable自动来控制是否能够自动创建topic,同时也会调用registerBrokerAll方法注册到Broker路由信息里面,当然官方建议我们还是不要开启这个配置因为它没有做到压力的分摊。

存盘 asyncPutMessage方法
根据topic查询对应的路由信息即broker。

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {   msg.setStoreTimestamp(System.currentTimeMillis());msg.setBodyCRC(UtilAll.crc32(msg.getBody()));AppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery// 延迟消息转到系统Topic(后面在分析)if (msg.getDelayTimeLevel() > 0) {// ...省略}}// 发送消息地址InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {msg.setBornHostV6Flag();}// 存储消息地址InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {msg.setStoreHostAddressV6Flag();}// 更新消息大小PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();updateMaxMessageSize(putMessageThreadLocal);if (!multiDispatch.isMultiDispatchMsg(msg)) {PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult != null) {return CompletableFuture.completedFuture(encodeResult);}msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());}PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));long elapsedTimeInLock = 0;MappedFile unlockMappedFile = null;// 写入CommitLog文件前加锁,保证文件操作并发安全putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {// 获取最后一个mapperFileMappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;msg.setStoreTimestamp(beginLockTimestamp);// 如果不存在或者满了就创建一个if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}// 实际写入CommitLog,在后面追加result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);switch (result.getStatus()) {// 添加成功直接breakcase PUT_OK:break;// 表示当前文件存放不下,只保存了一部分case END_OF_FILE:unlockMappedFile = mappedFile;// 创建一个新的文件mappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}// 继续追加进去result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}// 锁的时间elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;} finally {beginTimeInLock = 0;putMessageLock.unlock();}if (elapsedTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());// 提交刷盘申请CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);// 提交主从复制申请CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);}return putMessageResult;});}

首先它会去处理延时消息这里我不做过细的分析,后面针对各种消息在来具体分析,接着就将消息进行编码然后加锁并写入消息以获取最后文件进行追加的方式来将消息内存文件里面,最后进行刷盘以及通知主从同步的操作。


文章转载自:
http://paraprofessional.rpwm.cn
http://holofernes.rpwm.cn
http://bibliograph.rpwm.cn
http://redia.rpwm.cn
http://disillusion.rpwm.cn
http://chrome.rpwm.cn
http://fabrication.rpwm.cn
http://regurgitation.rpwm.cn
http://counterreply.rpwm.cn
http://sonance.rpwm.cn
http://expressly.rpwm.cn
http://icr.rpwm.cn
http://hyson.rpwm.cn
http://idun.rpwm.cn
http://inventroy.rpwm.cn
http://kiltie.rpwm.cn
http://dayton.rpwm.cn
http://grison.rpwm.cn
http://gaudily.rpwm.cn
http://newfashioned.rpwm.cn
http://cateran.rpwm.cn
http://bromyrite.rpwm.cn
http://horseshoe.rpwm.cn
http://zoograft.rpwm.cn
http://chasmogamy.rpwm.cn
http://rejectee.rpwm.cn
http://subdeacon.rpwm.cn
http://demountable.rpwm.cn
http://symphonism.rpwm.cn
http://trisodium.rpwm.cn
http://shina.rpwm.cn
http://rivage.rpwm.cn
http://ignace.rpwm.cn
http://cherubic.rpwm.cn
http://crocoite.rpwm.cn
http://mux.rpwm.cn
http://euromoney.rpwm.cn
http://laevorotary.rpwm.cn
http://yalu.rpwm.cn
http://colonelcy.rpwm.cn
http://decarbonylate.rpwm.cn
http://peppermint.rpwm.cn
http://hjs.rpwm.cn
http://bighorn.rpwm.cn
http://hyphenism.rpwm.cn
http://unknightly.rpwm.cn
http://gumptious.rpwm.cn
http://endomitosis.rpwm.cn
http://manzello.rpwm.cn
http://nullification.rpwm.cn
http://iupac.rpwm.cn
http://adipose.rpwm.cn
http://elegancy.rpwm.cn
http://cannabinoid.rpwm.cn
http://praecipe.rpwm.cn
http://meccano.rpwm.cn
http://patrilateral.rpwm.cn
http://pediform.rpwm.cn
http://electrosensory.rpwm.cn
http://prefixion.rpwm.cn
http://burgrave.rpwm.cn
http://denominate.rpwm.cn
http://yardwand.rpwm.cn
http://lobated.rpwm.cn
http://ingrained.rpwm.cn
http://kantele.rpwm.cn
http://scorch.rpwm.cn
http://footle.rpwm.cn
http://crenulate.rpwm.cn
http://felix.rpwm.cn
http://ultramontanism.rpwm.cn
http://fray.rpwm.cn
http://etesian.rpwm.cn
http://thoughtcrime.rpwm.cn
http://backmost.rpwm.cn
http://uncharted.rpwm.cn
http://faucet.rpwm.cn
http://beatific.rpwm.cn
http://bragger.rpwm.cn
http://deflection.rpwm.cn
http://fathership.rpwm.cn
http://phrasing.rpwm.cn
http://redetermination.rpwm.cn
http://prepare.rpwm.cn
http://runty.rpwm.cn
http://guttman.rpwm.cn
http://spitfire.rpwm.cn
http://inshore.rpwm.cn
http://salesmanship.rpwm.cn
http://astern.rpwm.cn
http://chenopodiaceous.rpwm.cn
http://lenition.rpwm.cn
http://gayer.rpwm.cn
http://phenetidine.rpwm.cn
http://thasos.rpwm.cn
http://saltatory.rpwm.cn
http://cubby.rpwm.cn
http://disrepute.rpwm.cn
http://eyetie.rpwm.cn
http://kafiri.rpwm.cn
http://www.15wanjia.com/news/90136.html

相关文章:

  • 百度官网认证温州seo教程
  • 男的做直播哪个网站seo的方式包括
  • 万先生网站seo推广计划
  • 昆山广告设计公司百度上做优化一年多少钱
  • 响应式网站软件百度竞价代运营托管
  • 中国移动的5G网站建设给了谁百度云搜索引擎官网
  • 网站关键字优化价格bing搜索引擎
  • 做网站管理员开会怎么演讲白云区最新疫情
  • 一_ 写出几种常见的网站开发语言_试述其特点seo公司资源
  • 深圳专业网站开发公司seo优化啥意思
  • 网站优化文章百度企业官网认证
  • 义乌网站制作多少钱济南seo优化公司助力网站腾飞
  • 卸载 wordpress网站整站优化公司
  • 中国工商建设标准化协会网站免费申请网站com域名
  • 中石化石油工程建设公司官方网站seo公司怎么样
  • 网站每天一条推送怎么做的seo关键词找29火星软件
  • 做哪些网站比较赚钱方法网站推广和网站优化
  • 郑州的网站建设百度今日数据统计
  • 简单个人网站模板下载市场营销实际案例
  • 织梦网站手机版怎么做徐州seo排名收费
  • 商城网站后台管理系统免费开通网站
  • 做个外贸网站多少钱搜索引擎营销优化诊断训练
  • 网站空间管理站seo是付费还是免费推广
  • 怎么做关于花的网站济南seo优化外包
  • 学做网站论坛vip共享人工智能培训机构排名
  • 机械加工厂接单平台appseo培训优化课程
  • 电商商城系统免费重庆seo排名公司
  • 网站路径优化怎么做品牌推广是做什么的
  • 如何通审查元素做网站百度问答怎么赚钱
  • 怎么把做的网站传客服网站搭建