zoukankan      html  css  js  c++  java
  • 【RocketMQ】msgId与offsetMsgId

    一. 概念

    1. msgId(uniqId)

    producer客户端 生成,调用方法MessageClientIDSetter.createUniqID()生成全局唯一的Id

    
        private static final int LEN;
        private static final String FIX_STRING;
        private static final AtomicInteger COUNTER;
        private static long startTime;
        private static long nextStartTime;
    
        public static String createUniqID() {
    
            StringBuilder sb = new StringBuilder(LEN * 2);
            sb.append(FIX_STRING);   // 固定值,程序启动时生成
            sb.append(UtilAll.bytes2string(createUniqIDBuffer())); // 变化值 根据时间差+自增长生成
            return sb.toString();
        }
    
    1. FIX_STRING 固定前缀
    2. 生成规则:客户端的IP、进程ID、MessageClientIDSetter类加载器的hashcode
        static {
            byte[] ip;
            try {
                ip = UtilAll.getIP();
            } catch (Exception e) {
                ip = createFakeIP();
            }
            LEN = ip.length + 2 + 4 + 4 + 2;
            ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);
            tempBuffer.put(ip);                                 // 客户端IP
            tempBuffer.putShort((short) UtilAll.getPid());      // 进程ID
            tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); // 类加载器hashCode
            FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
            setStartTime(System.currentTimeMillis());
            COUNTER = new AtomicInteger(0);
        }
    
         private synchronized static void setStartTime(long millis) {
            Calendar cal = Calendar.getInstance();
            cal.setTimeInMillis(millis);
            cal.set(Calendar.DAY_OF_MONTH, 1);
            cal.set(Calendar.HOUR_OF_DAY, 0);
            cal.set(Calendar.MINUTE, 0);
            cal.set(Calendar.SECOND, 0);
            cal.set(Calendar.MILLISECOND, 0);
            startTime = cal.getTimeInMillis();
            cal.add(Calendar.MONTH, 1);
            nextStartTime = cal.getTimeInMillis();
        }
    
        // 获取进程ID
        public static int getPid() {
            RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
            String name = runtime.getName(); // format: "pid@hostname"
            try {
                return Integer.parseInt(name.substring(0, name.indexOf('@')));
            } catch (Exception e) {
                return -1;
            }
        }
    
    1. createUniqIDBuffer 变化值
    2. 生成规则:当前时间与系统启动时间的差值,以及自增序号
    
       private static byte[] createUniqIDBuffer() {
            ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
            long current = System.currentTimeMillis();
    
            // 每月1号重新计算 startTime,避免时间戳差值无限增加
            if (current >= nextStartTime) {
                setStartTime(current);
            }
    
            // 时间差值(当前时间戳-系统启动时时间戳)
            buffer.putInt((int) (System.currentTimeMillis() - startTime));
    
             // 1. short:毫秒内自增长,几万够了 
             // 2. COUNTER达到最大值后,会从最大的负数开始递增,如-2147483648 -2147483647
    
            buffer.putShort((short) COUNTER.getAndIncrement());
            return buffer.array();
        }
    

    2. offsetMsgId

    1. Broker 服务端将消息追加到内存后会返回其物理偏移量,即在 commitlog 文件中的偏移量,然后会生成一个id,代码中虽然也叫 msgId,其实是 offsetMsgId
    2. 组成规则:Broker 服务器的 IP 与端口号、消息的物理偏移量
    • ByteBuffer input: 用来存放 offsetMsgId 的字节缓存区
    • ByteBuffer addr: Broker 服务器的 IP 地址与端口号,即通过解析 offsetMsgId 从而得到消息服务器的地址信息
    • long offset:消息的物理偏移量
        类:org.apache.rocketmq.common.message.MessageDecoder
    
        public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
            input.flip();
            int msgIDLength = addr.limit() == 8 ? 16 : 28;
            input.limit(msgIDLength);
    
            input.put(addr);
            input.putLong(offset);
    
            return UtilAll.bytes2string(input.array());
        }
    

    二. 消息发送

    其中包含了msgId、offsetMsgId
    image

    三. 消息消费

    image

    1. 消费客户端返回的对象是 MessageClientExt ,继承自 MessageExt ,MessageExt继承自 Message
    2. 如果消息消费失败需要重试,RocketMQ 的做法是将消息重新发送到 Broker 服务器,此时 msgId 是不会发生变化的,但该消息的 offsetMsgId 会发生变化,因为其存储在 Broker 服务器中的位置发生了变化
    3. 在调用 MessageClientExt 中的 getMsgId() 方法时,先返回消息属性中的MsgId,不存在则返回消息的 offsetMsgId
    public String getMsgId() {
        String uniqID = MessageClientIDSetter.getUniqID(this);
        if (uniqID == null) {
            return this.getOffsetMsgId();
        } else {
            return uniqID;
        }
    }
    
    1. MessageExt 的toString 方法
        public String toString() {
            return "MessageExt [brokerName=" + brokerName + ", queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset
                + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost
                + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId
                + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes="
                + reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset
                + ", toString()=" + super.toString() + "]";
        }
    
    1. MessageClientExt类( 本类没有重写toString()方法,调用的是父类MessageExt的方法)
    public class MessageClientExt extends MessageExt {
    
        public String getOffsetMsgId() {
            return super.getMsgId();
        }
    
        public void setOffsetMsgId(String offsetMsgId) {
            super.setMsgId(offsetMsgId);
        }
    
        @Override
        public String getMsgId() {
            String uniqID = MessageClientIDSetter.getUniqID(this);
            if (uniqID == null) {
                return this.getOffsetMsgId();
            } else {
                return uniqID;
            }
        }
    
        public void setMsgId(String msgId) {
            //DO NOTHING
            //MessageClientIDSetter.setUniqID(this);
        }
    }
    

    四. Dashboard根据ID查询

    1. 界面显示的是 msgId,即uniqId
    2. RocketMQ会先调用queryMsgById方法,先通过msgId查询,查询无结果再通过 offsetMsgId 查询
    3. 界面新增显示 offsetMsgId 字段
        MessageServiceImpl.viewMessage
        
        String offsetMsgId = ((MessageClientExt) messageExt).getOffsetMsgId();
        messageView.setOffsetMsgId(offsetMsgId);
    
  • 相关阅读:
    Leetcode 118:Pascal's Triangle 杨辉三角
    Leetcode 498:对角线遍历Diagonal Traverse(python3、java)
    Leetcode加一 (java、python3)
    Leetcode747至少是其他数字两倍的最大数
    python之爬虫(爬取.ts文件并将其合并为.MP4文件——以及一些异常的注意事项)
    杭电oj1717——小数化分数(java实现)
    杭电oj_1713——相遇周期(java实现)
    杭电oj1995——汉诺塔V(java实现)
    杭电oj 2098——分拆素数和(包含如何判断质数及优化),java实现
    杭电oj————2057(java)
  • 原文地址:https://www.cnblogs.com/gossip/p/15268995.html
Copyright © 2011-2022 走看看