zoukankan      html  css  js  c++  java
  • RocketMQ源码 — 三、 Producer消息发送过程

    Producer

    消息发送

    producer start

    producer启动过程如下图

    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
    
                // check  GroupName
                this.checkConfig();
                // 改变ClientConfig.instanceName为pid
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
    
                // 初始化mQClientFactory为MQClientInstance,并将该实例加入factoryTable
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
    
                // 将producer注册到MQClientInstance.producerTbale
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                            null);
                }
    
                // 保存topic对应的routeInfo
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
    
                if (startFactory) {
                    // 启动MQClientInstance
                    mQClientFactory.start();
                }
    
                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                        this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "//
                        + this.serviceState//
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
            default:
                break;
        }
    
        // 启动的时候向所有的broker发送heartbeat
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    }
    

    消息发送流程

    消息发送过程:先由producer封装通过netty发送到broker,然后由broker进行保存,过程如下

    ============================= producer发送消息 =============================
    主要逻辑在DefaultMQProducerImpl.sendDefaultImpl

    1. 获取topicRouteInfo
      DefaultMQProducerImpl.tryToFindTopicPublishInfo
      ->
      MQClientInstance.updateTopicRouteInfoFromNameServer
    private final ConcurrentHashMap<String/* group */, MQProducerInner> producerTable:放置所有的producer
    updateTopicRouteInfoFromNameServer
    - 获取topicRouteInfo
    - 然后遍历producerTable,更新所有producer的topicRouteInfo
    - 遍历consumerTable,更新所有的consumer的topicSubscribeInfo
    - 加入topicRouteTable
    

    ->
    MQClientAPIImpl.getTopicRouteInfoFromNameServer
    通过netty向nameServer发出请求,请求topicRouteInfo
    2. 选取一个messageQueue
    从messageQueueList取一个MessageQueue
    3. netty发送消息
    DefaultMQProducer.sendKernelImpl,

    ============================= broker接收消息 =============================

    因为使用netty作为网络通信工具,broker也是先使用netty接收到信息,然后调用注册的processor处理

    1. parseRequestHeader
      利用反射构造requestHeader
    2. 构造SendMessageContext
    3. 执行beforeHook
    4. 发送消息
      • 构造MessageExtBrokerInner
      • 保存消息DefaultMessageStore.putMessage
        • CommitLog.putMessage
          • 从mapedFileQueue中取出一个mapedFile,appendMessag使用directBuffer的方式写入commitLog
          • 同步或者异步刷盘
          • 同步双写
    5. 执行afterHook

    issue

    broker怎么会使用SendMessageProcessor来处理producer发来的消息?

    // 在broker初始化的时候会注册所有的processor,registerProcessor
    SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
    sendProcessor.registerSendMessageHook(sendMessageHookList)
    sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
    
    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
     
    

    broker怎么接收消息并写入文件

    1. 找到需要写入的mapedFile(最后一个,或者新建一个)
    2. 使用mapedFile.appendMessage

    怎么获取需要写入的mapedFile

    见时序图:RocketMQ.asta(Broker收到sendRequest)

    mapedFile.appendMessage的过程

    见时序图:RocketMQ.asta(Broker收到sendRequest)

    消息发送过程中涉及到的类

    DefaultMQProducerImpl

    topicPublishInfoTable

    里面存放topic对应的messageQueue等信息

    // topic对应的消息是否有序
    private boolean orderTopic = false;
    // 有没有routerInfo
    private boolean haveTopicRouterInfo = false;
    // topic对应的messageQueue
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    // 消息发往哪一个queue
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0);
    private TopicRouteData topicRouteData;
    

    private final ConcurrentHashMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();
    put:

    1. 在producer start的时候会根据当前producer 的topic新建一个TopicPushlishInfo放进去
    2. 在发送信息之前会获取topic对应的topicPublishInfo,这个时候会去nameServer 查询最新的信息并更新table中所有的记录

    defaultMQProducer

    private final DefaultMQProducer defaultMQProducer;
    发送消息的类,在DefaultMQProducer的构造函数里new DefaultMQProducerImpl传入DefaultMQProducer自身,所以这里DefaultMQProducerImpl.defaultMQProducer默认就是DefaultMQProducer

    mQClientFactory

    private MQClientInstance mQClientFactory;
    MQClientManager是单例,包含两个属性

    // 用来生成MQClientInstance的id,每个递增
    private AtomicInteger factoryIndexGenerator = new AtomicInteger();
    // 包含clientId到MQClientInstance的映射,管理client
    private ConcurrentHashMap<String/* clientId */, MQClientInstance> factoryTable =
                new ConcurrentHashMap<String, MQClientInstance>();
    

    通过getInstance获取实例之后调用getAndCreateMQClientInstance创建MQClientInstance。
    在DefaultMQProducerImpl.start的时候调用mQClientFactory.start启动

    MQClientInstance

    字段

    // 在MQClientManager中new MQCLientInstance的时候,传入DefaultMQProducer,因为DefaultMQProducer继承了ClientConfig
    private final ClientConfig clientConfig;
    // new 的时候传入,由MQClientManager生成
    private final int instanceIndex;
    // new的时候传入,由clientConfig.buildMQClientId生成,形式为:ip@instanceName
    private final String clientId;
    
    // 每个group对应的MQProducerInner
    // 在producer启动的时候注册到这儿
    // 在每次发送message的时候从nameServer获取topicRouteData并更新每个producer对应的信息
    private final ConcurrentHashMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    // 每个group对应的MQConsumerInner
    // consumer start的时候注册到这儿
    // 在每次发送message的时候从nameServer获取topicRouteData并更新每个consumer对应的信息
    private final ConcurrentHashMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    // 每个group对应的adminExtInner,在NameServer 启动的时候会注册DefaultMQAdminExt
    private final ConcurrentHashMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
    
    // topic对应的TopicRouteTable
    // 在每次发送message的时候从nameServer获取topicRouteData并更新对应的信息
    private final ConcurrentHashMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
    
    // 所有broker的地址
    // 在每次发送message的时候从nameServer获取topicRouteData并更新对应的信息
    private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
                new ConcurrentHashMap<String, HashMap<Long, String>>();
    
    // 定时任务线程池,包括:fetchNameServerAddr,updateTopicRouteInfoFromNameServer等            
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "MQClientFactoryScheduledThread");
        }
    });
    
    // service
    private final PullMessageService pullMessageService;
    private final RebalanceService rebalanceService;
    
    private final NettyClientConfig nettyClientConfig;
    // 用来client远程通信,使用netty
    private final MQClientAPIImpl mQClientAPIImpl;
    

    关键方法

    public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }
    
    private void startScheduledTask() {
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        // 获取nameServer的地址
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    } catch (Exception e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }
    
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                try {
                    // 更新topicRouteData
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS);
    
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                try {
                    // 清理掉线的broker
                    MQClientInstance.this.cleanOfflineBroker();
                    // 给broker发送心跳
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                } catch (Exception e) {
                    log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                }
            }
        }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
    
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                try {
                    // 保存consumerOffset
                    MQClientInstance.this.persistAllConsumerOffset();
                } catch (Exception e) {
                    log.error("ScheduledTask persistAllConsumerOffset exception", e);
                }
            }
        }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                try {
                    // 根据processQueueTable的大小决定是否需要增加或者减少threadPool的大小
                    // 目前尚未实现具体的增加或者减少的逻辑
                    MQClientInstance.this.adjustThreadPool();
                } catch (Exception e) {
                    log.error("ScheduledTask adjustThreadPool exception", e);
                }
            }
        }, 1, 1, TimeUnit.MINUTES);
    }
    
    

    DefaultMessageStore

    字段

    private final MessageStoreConfig messageStoreConfig;
    // CommitLog
    private final CommitLog commitLog;
    
    // topic和queueId唯一确定一个consumeQueue
    // put:
    // 1. 在getMessage的时候会调用findConsumeQueue,如果consumeQueueTable不存在对应的(topic,queueId),则新建一个加入table
    // 2. 在启动的时候load
    private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
    
    /** service */
    // 消息刷盘,依次循环consumeQueueTable,将每个consumeQueue中的mapedFileQueue commit
    private final FlushConsumeQueueService flushConsumeQueueService;
    // 删除过时的commitlog文件
    private final CleanCommitLogService cleanCommitLogService;
    // 删除consumeQueue文件
    private final CleanConsumeQueueService cleanConsumeQueueService;
    // 在新建DefaultMessageStore的时候新建new
    // 在messageStore start的时候start
    // 在run方法中循环取出requestQueue(PriorityBlokingQueue,take的时候是阻塞的)里面的request处理——即新建mapedFile
    private final AllocateMapedFileService allocateMapedFileService;
    // TODO 暂时还不能理解indexservice
    private final IndexService indexService;
    // TODO
    private final ReputMessageService reputMessageService;
    // HA 高可用,同步双写,异步复制
    private final HAService haService;
    
    private final ScheduleMessageService scheduleMessageService;
    
    private final StoreStatsService storeStatsService;
    

    关键方法

    public void start() throws Exception {
        this.flushConsumeQueueService.start();
        this.commitLog.start();
        this.storeStatsService.start();
    
    
        if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
            this.scheduleMessageService.start();
        }
    
        if (this.getMessageStoreConfig().isDuplicationEnable()) {
            this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
        } else {
            this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
        }
        this.reputMessageService.start();
    
        this.haService.start();
    
        this.createTempFile();
        this.addScheduleTask();
        this.shutdown = false;
    }
    

    CommitLog

    消息写入内存,保存文件的地方

    字段

    // 消息文件队列,包含所有保存在磁盘上的文件
    private final MapedFileQueue mapedFileQueue;
    // 
    private final DefaultMessageStore defaultMessageStore;
    // 消息刷盘
    private final FlushCommitLogService flushCommitLogService;
    // 添加消息的回调,在doAppend方法中追加消息到内存
    private final AppendMessageCallback appendMessageCallback;
    // 记录topic对应每个队列的offset
    private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
    

    MapedFileQueue

    管理mapedFile,新建、获取、删除mapedFile将消息写入文件

    字段

    // 每次删除文件个数,作为是否删除的一个参数
    private static final int DeleteFilesBatchMax = 10;
    // 文件的存储路径
    private final String storePath;
    // 每个文件的大小
    private final int mapedFileSize;
    // 所有文件列表
    private final List<MapedFile> mapedFiles = new ArrayList<MapedFile>();
    // 读锁
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    // 新建mapedFile的地方,将新建mapedFile的 request添加到requesttable和requestQueue中
    // 在MessageStore启动的时候会启动AllocateMapedFileService这个线程,执行requestQueue里面的request,新建mapedFile
    private final AllocateMapedFileService allocateMapedFileService;
    

    MapedFile

    和文件一对一关系

    字段

    // 保存消息的文件名
    private final String fileName;
    // 该文件的全局offset,也就是文件名的前缀
    private final long fileFromOffset;
    // 文件大小
    private final int fileSize;
    // 文件对象
    private final File file;
    // 文件映射为的内存
    private final MappedByteBuffer mappedByteBuffer;
    // 文件写的位置
    private final AtomicInteger wrotePostion = new AtomicInteger(0);
    // 刷盘之后的位置
    private final AtomicInteger committedPosition = new AtomicInteger(0);
    // nio阻塞
    private FileChannel fileChannel;
    
    private volatile long storeTimestamp = 0;
    private boolean firstCreateInQueue = false;
    

    方法

    public int commit(final int flushLeastPages) {
        // 判断是否需要flush
        if (this.isAbleToFlush(flushLeastPages)) {
            //判断文件是否被占用,也就是说每次commit的时候不一定成功
            if (this.hold()) {
                // 如果没有被占用则将内存刷到磁盘上
                int value = this.wrotePostion.get();
                // 将内存强制写入磁盘
                this.mappedByteBuffer.force();
                this.committedPosition.set(value);
                // 释放占用
                this.release();
            } else {
                // 尝试占用失败,重置committedPosition
                log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
                this.committedPosition.set(this.wrotePostion.get());
            }
        }
    
        return this.getCommittedPosition();
    }
    
    public AppendMessageResult appendMessage(final Object msg, final AppendMessageCallback cb) {
        assert msg != null;
        assert cb != null;
    
        // file当前position
        int currentPos = this.wrotePostion.get();
    
    
        if (currentPos < this.fileSize) {
            // 获取DirectByteBuffer的一个分片,重置了mark,position,limit是剩下的大小,和原来的buffer共享同一块内存
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result =
                    cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
            this.wrotePostion.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
    
    
        log.error("MapedFile.appendMessage return null, wrotePostion: " + currentPos + " fileSize: "
                + this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }
    
  • 相关阅读:
    固态硬盘 每秒1.5G
    editPlus
    juqery 插入一行
    BufferedReader读取UTF8文件中文乱码
    jquery table 操作
    noSql
    javascript 开发规范
    同一服务器上安装多个tomcat
    100 个搜索引擎
    firefox 开发sdk
  • 原文地址:https://www.cnblogs.com/sunshine-2015/p/6291116.html
Copyright © 2011-2022 走看看