zoukankan      html  css  js  c++  java
  • rocketMQ 的 msgId

    producer 在发送消息的时候,会生成一个 "唯一" 的 msgId,broker 会为这个 msgId 创建哈希索引

    UNIQ_KEY 由客户端生成

    org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqID

    msgId 由 前缀 + 内容 组成:
    前缀
    ip 地址,进程号,classLoader 的 hashcode
    内容
    时间差(当前时间减去当月一日),计数器

    static {
        byte[] ip;
        try {
            // 获取 ip 地址,字节数组
            ip = UtilAll.getIP();
        } catch (Exception e) {
            ip = createFakeIP();
        }
        LEN = ip.length + 4 + 4 + 4 + 2;
        ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 4 + 4);
        tempBuffer.position(0);
        // 写入 ip
        tempBuffer.put(ip);
        tempBuffer.position(ip.length);
        // 写入进程号
        tempBuffer.putInt(UtilAll.getPid());
        tempBuffer.position(ip.length + 4);
        // 写入 hashcode
        tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
        // 二进制转十六进制字符串
        FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
        // 设置起始时间为当月1日
        setStartTime(System.currentTimeMillis());
        COUNTER = new AtomicInteger(0);
    }

    byte 数组转十六进制字符串

    // org.apache.rocketmq.common.UtilAll#bytes2string
    // final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
    public static String bytes2string(byte[] src) { char[] hexChars = new char[src.length * 2]; for (int j = 0; j < src.length; j++) { // & 过之后,byte 转成 int int v = src[j] & 0xFF; // 无符号右移 4 位,高位补 0 ,即取字节的高 4 位 hexChars[j * 2] = HEX_ARRAY[v >>> 4]; // 取字节低 4 位 hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F]; } return new String(hexChars); }

    设置开始时间

    // org.apache.rocketmq.common.message.MessageClientIDSetter#setStartTime
    private synchronized static void setStartTime(long millis) {
        Calendar cal = Calendar.getInstance();
        cal.setTimeInMillis(millis);
        cal.set(Calendar.DAY_OF_MONTH, 1);
        cal.set(Calendar.HOUR_OF_DAY, 0);
        cal.set(Calendar.MINUTE, 0);
        cal.set(Calendar.SECOND, 0);
        cal.set(Calendar.MILLISECOND, 0);
        // 当月1日为开始时间
        startTime = cal.getTimeInMillis();
        // 下月1日
        cal.add(Calendar.MONTH, 1);
        nextStartTime = cal.getTimeInMillis();
    }
    // org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqIDBuffer
    private static byte[] createUniqIDBuffer() {
        ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
        long current = System.currentTimeMillis();
        if (current >= nextStartTime) {
            setStartTime(current);
        }
        buffer.position(0);
        // 当前时间减去当月一日
        buffer.putInt((int) (System.currentTimeMillis() - startTime));
        // 计数器
        buffer.putShort((short) COUNTER.getAndIncrement());
        return buffer.array();
    }
    public static String createUniqID() {
        // 1 个字节,8 位,每 4 位一个十六进制字符
        StringBuilder sb = new StringBuilder(LEN * 2);
        // 前缀:ip,pid,hashcode
        sb.append(FIX_STRING);
        // 时间差 + 计数器
        sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
        return sb.toString();
    }

    ByteBufer 写入数据的时候,默认是大端字节序,比如写入一个 int,先写高位字节,再写低位字节。

  • 相关阅读:
    SSH2练习中问题之org.apache.jasper.JasperException: /findAllProductions_list.jsp(31,1) TLD又はタグファイル中のattribute指示子に従って属性itemsはどんな式も受け付けません解决方案
    关于HibernateTempleate模版很多代码可以直接使用,是开发人员不可多得选择
    Java实现权限控制之2
    struts 之 constant
    JSTL表达式之<c:forEach>
    关于事后检查和确认
    关于删除
    jcommander 解析boolean参数
    多线程
    关于proto文件的更新
  • 原文地址:https://www.cnblogs.com/allenwas3/p/12242565.html
Copyright © 2011-2022 走看看