当前位置: 首页 > news >正文

南昌网站建设优化公司排名网站开发用什么架构

南昌网站建设优化公司排名,网站开发用什么架构,铆钉机 东莞网站建设,做策划的人经常浏览的网站RocketMQ的Broker分为Master和Slave两个角色,为了保证高可用性,Master角色的机器接收到消息后,要把内容同步到Slave机器上,这样一旦Master宕机,Slave机器依然可以提供服务。下面分析Master和Slave角色机器间同步功能实…

        RocketMQ的Broker分为Master和Slave两个角色,为了保证高可用性,Master角色的机器接收到消息后,要把内容同步到Slave机器上,这样一旦Master宕机,Slave机器依然可以提供服务。下面分析Master和Slave角色机器间同步功能实现的源码。

1 同步属性信息

Slave需要和Master同步的不只是消息本身,一些元数据信息也需要同步,比如TopicConfig信息、ConsumerOffset信息、DelayOffset和SubscriptionGroupConfig信息。Broker在启动的时候,判断自己的角色是否是Slave,是的话就启动定时同步任务,如代码清单12-1所示。

代码清单12-1 Slave角色定时同步元数据信息

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
        this.updateMasterHAServerAddrPeriodically = false;
    } else {
        this.updateMasterHAServerAddrPeriodically = true;
    }
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                BrokerController.this.slaveSynchronize.syncAll();
            } catch (Throwable e) {
                log.error("ScheduledTask syncAll slave exception", e);
            }
        }
    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}

 

在syncAll函数里,调用syncTopicConfig()、getAllConsumerOffset()、syncDelayOffset()和syncSubscriptionGroupConfig()进行元数据同步。我们以syncConsumerOffset为例,来看看底层的具体实现,如代码清单12-2所示。

代码清单12-2 getAllConsumerOffset具体实现

public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
    final String addr) throws InterruptedException, RemotingTimeoutException,
    RemotingSendRequestException, RemotingConnectException, MQBroker-Exception {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
        }
        default:
            break;
    }
    throw new MQBrokerException(response.getCode(), response.getRemark());
}

 

getAllConsumerOffset()的基本逻辑是组装一个RemotingCommand,底层通过Netty将消息发送到Master角色的Broker,然后获取Offset信息。

2 同步消息体

下面介绍Master和Slave之间同步消息体内容的方法,也就是同步CommitLog内容的方法。CommitLog和元数据信息不同:首先,CommitLog的数据量比元数据要大;其次,对实时性和可靠性要求也不一样。元数据信息是定时同步的,在两次同步的时间差里,如果出现异常可能会造成Master上的元数据内容和Slave上的元数据内容不一致,不过这种情况还可以补救(手动调整Offset,重启Consumer等)。CommitLog在高可靠性场景下如果没有及时同步,一旦Master机器出故障,消息就彻底丢失了。所以有专门的代码来实现Master和Slave之间消息体内容的同步。

主要的实现代码在Broker模块的org.apache.rocketmq.store.ha包中,里面包括HAService、HAConnection和WaitNotifyObject这三个类。

HAService是实现commitLog同步的主体,它在Master机器和Slave机器上执行的逻辑不同,默认是在Master机器上执行,见代码清单12-3。

代码清单12-3 根据Broker角色,确定是否设置HaMasterAddress

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig
        .getHaMasterAddress().length() >= 6) {
        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
        this.updateMasterHAServerAddrPeriodically = false;
    } else {
        this.updateMasterHAServerAddrPeriodically = true;
    }

 

当Broker角色是Slave的时候,MasterAddr的值会被正确设置,这样HAService在启动的时候,在HAClient这个内部类中,connectMaster会被正确执行,如代码清单12-4所示。

代码清单12-4 Slave角色连接Master

private boolean connectMaster() throws ClosedChannelException {
    if (null == socketChannel) {
        String addr = this.masterAddress.get();
        if (addr != null) {
            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) {
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel != null) {
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                }
            }
        }
        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

        this.lastWriteTimestamp = System.currentTimeMillis();
    }
    return this.socketChannel != null;
}

 

从代码中可以看出,HAClient试图通过Java NIO函数去连接Master角色的Broker。Master角色有相应的监听代码,如代码清单12-5所示。

代码清单12-5 监听Slave的HA连接

public void beginAccept() throws Exception {
    this.serverSocketChannel = ServerSocketChannel.open();
    this.selector = RemotingUtil.openSelector();
    this.serverSocketChannel.socket().setReuseAddress(true);
    this.serverSocketChannel.socket().bind(this.socketAddressListen);
    this.serverSocketChannel.configureBlocking(false);
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

CommitLog的同步,不是经过netty command的方式,而是直接进行TCP连接,这样效率更高。连接成功以后,通过对比Master和Slave的Offset,不断进行同步。

3 sync_master和async_master

sync_master和async_master是写在Broker配置文件里的配置参数,这个参数影响的是主从同步的方式。从字面意思理解,sync_master是同步方式,也就是Master角色Broker中的消息要立刻同步过去;async_master是异步方式,也就是Master角色Broker中的消息是通过异步处理的方式同步到Slave角色的机器上的。下面结合代码来分析,sync_master下的消息同步如代码清单12-6所示。

代码清单12-6 sync_master下的消息同步

public void handleHA(AppendMessageResult result,
    PutMessageResult putMessageResult, MessageExt messageExt) {
    if (BrokerRole.SYNC_MASTER == this.defaultMessageStore
        .getMessageStoreConfig().getBrokerRole()) {
        HAService service = this.defaultMessageStore.getHaService();
        if (messageExt.isWaitStoreMsgOK()) {
            // Determine whether to wait
            if (service.isSlaveOK(result.getWroteOffset() + result
                .getWroteBytes())) {
                GroupCommitRequest request = new GroupCommitRequest
                    (result.getWroteOffset() + result
                    .getWroteBytes());
                service.putRequest(request);
                service.getWaitNotifyObject().wakeupAll();
                boolean flushOK =
                    request.waitForFlush(this.defaultMessageStore
                        .getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do sync transfer other node, wait return, " +
                        "but failed, topic: " + messageExt
                        .getTopic() + " tags: "
                        + messageExt.getTags() + " client address: " +
                        messageExt.getBornHostNameString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus
                        .FLUSH_SLAVE_TIMEOUT);
                }
            }
            // Slave problem
            else {
                // Tell the producer, slave not available
                putMessageResult.setPutMessageStatus(PutMessageStatus
                    .SLAVE_NOT_AVAILABLE);
            }
        }
    }
}

 

在CommitLog类的putMessage函数末尾,调用handleHA函数。代码中的关键词是wakeupAll和waitForFlush,在同步方式下,Master每次写消息的时候,都会等待向Slave同步消息的过程,同步完成后再返回,如代码清单12-7所示。(putMessage函数比较长,仅列出关键的代码)。

代码清单12-7 putMessage中调用handleHA

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // Set the storage time
    msg.setStoreTimestamp(System.currentTimeMillis());
    // Set the message body BODY CRC (consider the most appropriate setting
    // on the client)
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    // Back to Results
    AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore
        .getStoreStatsService();

    String topic = msg.getTopic();
    int queueId = msg.getQueueId();

  ……

    handleDiskFlush(result, putMessageResult, msg);
    handleHA(result, putMessageResult, msg);

    return putMessageResult;
}

 

http://www.15wanjia.com/news/163743.html

相关文章:

  • 襄阳微信网站建设备案 个人网站名称
  • 网站服务器升级一般多久网站跟系统的区别是
  • 网站开发 技术问题微网站 地图
  • 自微网站襄阳网站推广优化技巧
  • 曼斯特(北京)网站建设公司免费外贸网站模板
  • 上海营销平台网站建设查询网站空间的服务商
  • 网站服务合同模板东莞英文网站设计
  • 松江做网站需要多少钱全自动推广软件
  • 福建网站开发公司电话新品发布会ppt
  • 做电影网站都需要什么工具176网站入口
  • 鼓楼徐州网站开发wordpress小说主题模板
  • 网站开发江西wordpress做门户
  • 宝洁公司网站做的怎么样移动端开发语言
  • 哪些网站用python做服务框架奥林匹克做校服的网站
  • ui设计的网站有哪些WordPress国外打赏
  • 二环建设部网站手机端关键词排名优化
  • 网站建设团队山西省经济建设投资公司网站
  • 关于旅行的网站怎样做网站建设课程的建议
  • 万网怎么建立网站wordpress文章导出
  • 架设个人网站二级域名免费发放
  • 上海网站建设021360甘肃做网站价格
  • 家居公司网站建设方案ppt专做美食的网站
  • 怎么给自己做个网站吗外贸推广服务公司
  • 长春网站制作机构wordpress 大图 主题
  • 中国工程建设招标网官方网站做产品类网站
  • 小公司做网站需要注意什么广州网站建设哪个平台好
  • 官网建站模板库上海网站建设熊掌号
  • 吉林省科瑞建设项目管理有限公司网站网站网址有哪些
  • 专做外贸的网站如何创建网站
  • 大学生商品网站建设移动互联网开发方向包含哪些课程