zoukankan      html  css  js  c++  java
  • 4 分布式消息队列的协调者

    NameServer的功能主要维护RouteInfoManager对象里的这5个状态

        private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
        private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
        private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
        private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
        private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    public class QueueData implements Comparable<QueueData> {
        private String brokerName;
        private int readQueueNums;
        private int writeQueueNums;
        private int perm;
        private int topicSynFlag;

     private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    public class BrokerData implements Comparable<BrokerData> {
        private String cluster;
        private String brokerName;
        private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

    
        private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;


    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    class BrokerLiveInfo {
        private long lastUpdateTimestamp;
        private DataVersion dataVersion;
        private Channel channel;
        private String haServerAddr;

    清除时间默认是120秒

     private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    NameServer 如何维护各个Bro ker 的实时状态,如
    何根据Broker 的情况更新各种集群的属性数据。因为其他角色会主动向
    Name Server 上报状态,所以NameServer 的主要逻辑在DefaultRequestProcessor
    类中,根据上报消息里的请求码做相应的处理, 更新存储的对应
    信息。
    连接断开的事件也会触发状态更新,具体逻辑在BrokerHousekeepingService类中

    各个角色间的交互流程

    updateTopic的选项

    所以我们用producer发消息时写一个新的topic,这个topic会在该集群下的每个master broker下创建Message Queue。

    创建一个topic的过程是,

    client先构建CreateTopicRequestHeader,命令是UPDATE_AND_CREATE_TOPIC,发送给broker包

    broker包下的AdminBrokerProcessor类接到命令后,先更新本地的topicConfig,之后添加REGISTER_BROKER命令,发送给NameSrv,

    NameSrv包下的DefaultRequestProcessor类接到命令后执行RouteInfoManager的registerBroker方法,更新topic信息。

    更新逻辑如下:

     代码如下:

    public RegisterBrokerResult registerBroker(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final Channel channel) {
            RegisterBrokerResult result = new RegisterBrokerResult();
            try {
                try {
                    this.lock.writeLock().lockInterruptibly();
    
                    Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                    if (null == brokerNames) {
                        brokerNames = new HashSet<String>();
                        this.clusterAddrTable.put(clusterName, brokerNames);
                    }
                    brokerNames.add(brokerName);
    
                    boolean registerFirst = false;
    
                    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                    if (null == brokerData) {
                        registerFirst = true;
                        brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                        this.brokerAddrTable.put(brokerName, brokerData);
                    }
                    Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                    //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
                    //The same IP:PORT must only have one record in brokerAddrTable
                    Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<Long, String> item = it.next();
                        if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                            it.remove();
                        }
                    }
    
                    String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                    registerFirst = registerFirst || (null == oldAddr);
    
                    if (null != topicConfigWrapper
                        && MixAll.MASTER_ID == brokerId) {
                        if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                            || registerFirst) {
                            ConcurrentMap<String, TopicConfig> tcTable =
                                topicConfigWrapper.getTopicConfigTable();
                            if (tcTable != null) {
                                for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                    this.createAndUpdateQueueData(brokerName, entry.getValue());
                                }
                            }
                        }
                    }
    
                    BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                        new BrokerLiveInfo(
                            System.currentTimeMillis(),
                            topicConfigWrapper.getDataVersion(),
                            channel,
                            haServerAddr));
                    if (null == prevBrokerLiveInfo) {
                        log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                    }
    
                    if (filterServerList != null) {
                        if (filterServerList.isEmpty()) {
                            this.filterServerTable.remove(brokerAddr);
                        } else {
                            this.filterServerTable.put(brokerAddr, filterServerList);
                        }
                    }
    
                    if (MixAll.MASTER_ID != brokerId) {
                        String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                        if (masterAddr != null) {
                            BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                            if (brokerLiveInfo != null) {
                                result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                                result.setMasterAddr(masterAddr);
                            }
                        }
                    }
                } finally {
                    this.lock.writeLock().unlock();
                }
            } catch (Exception e) {
                log.error("registerBroker Exception", e);
            }
    
            return result;
        }

    RocketMQ 中复杂的通信过程,被RemotingCommand
    统一起来,大部分的逻辑都是通过发送、接受并处理C ommand 来完成的。

      解码

    //前面的代表长度的4个字节已经读取了
    public static RemotingCommand decode(final ByteBuffer byteBuffer) { int length = byteBuffer.limit(); int oriHeaderLen = byteBuffer.getInt();//读取4个字节,head的前8位代表类型,后24位代表headLength int headerLength = getHeaderLength(oriHeaderLen);//headerLength长度为24位,也就是说header最大16MB byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; } public static int getHeaderLength(int length) { return length & 0xFFFFFF; } private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) { switch (type) { case JSON: RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class); resultJson.setSerializeTypeCurrentRPC(type); return resultJson; case ROCKETMQ: RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData); resultRMQ.setSerializeTypeCurrentRPC(type); return resultRMQ; default: break; } return null; } public static SerializeType getProtocolType(int source) { return SerializeType.valueOf((byte) ((source >> 24) & 0xFF)); }

    编码

    public ByteBuffer encode() {
            // 1> header length size
            int length = 4;
    
            // 2> header data length
            byte[] headerData = this.headerEncode();
            length += headerData.length;
    
            // 3> body data length
            if (this.body != null) {
                length += body.length;
            }
    
            ByteBuffer result = ByteBuffer.allocate(4 + length);
    
            // length
            result.putInt(length);
    
            // header length
            result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
    
            // header data
            result.put(headerData);
    
            // body data;
            if (this.body != null) {
                result.put(this.body);
            }
    
            result.flip();
    
            return result;
        }
      public static byte[] markProtocolType(int source, SerializeType type) {
            byte[] result = new byte[4];
    
            result[0] = type.getCode();
            result[1] = (byte) ((source >> 16) & 0xFF);
            result[2] = (byte) ((source >> 8) & 0xFF);
            result[3] = (byte) (source & 0xFF);
            return result;
        }

  • 相关阅读:
    airprobe 安装 part2
    USRP Daugherboard: DBSRX
    电赛又见电赛!2011电赛之我见
    USRP Experiment 1: Data transmission
    How to Switch Between GDM and KDM on Ubuntu
    USRP Daugherboard: BasicRX
    Oracle Analyze 命令 详解
    Oracle SQL优化 总结
    Oracle SQL优化 总结
    Oracle 用拼接字符串更新表 测试
  • 原文地址:https://www.cnblogs.com/lakeslove/p/13047531.html
Copyright © 2011-2022 走看看