zoukankan      html  css  js  c++  java
  • RocketMQ-生产者

    简述

    rocketmq发送的消息有以下类型,普通并发消息,分区有序消息,全局有序消息(topic分区数=1),延迟消息和事务消息。

    发送方式分为同步发送,异步发送,指定队列发送,单向发送,批量发送。

    DefaultMQProducerImpl#start

        public void start(final boolean startFactory) throws MQClientException {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
    
                    this.checkConfig();
    
                    if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                        // 当前应用的进程ID
                        this.defaultMQProducer.changeInstanceNameToPID();
                    }
                    // 一个clientId 对应  一个MQclientInstance实例  缓存到  FactoryManager的 factoryTable里面
                    this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
    
                    //一个client实例里面保存着一个服务提供者组对应一个服务提供者的实现
                    boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                    if (!registerOK) {
                        this.serviceState = ServiceState.CREATE_JUST;
                        throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                                + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                                null);
                    }
    
                    // 在服务提供者的实现里面保存一份默认的服务发布者的路由信息
                    this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
    
                    if (startFactory) {
                        // client实例的启动  ScheduledTask任务启动  需要加锁
                        mQClientFactory.start();
                    }
    
                    log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                            this.defaultMQProducer.isSendMessageWithVIPChannel());
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                case START_FAILED:
                case SHUTDOWN_ALREADY:
                    throw new MQClientException("The producer service state not OK, maybe started once, "//
                            + this.serviceState//
                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                            null);
                default:
                    break;
            }
    
            // 心跳
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        }

    DefaultMQProducerImpl#send

    发送消息的过程相对来说比较简单,主要是通过负载获取到一个messageQueue,然后发送消息,如果是异步,或者单向消息返回null

        private SendResult sendDefaultImpl(//
                                           Message msg, //
                                           final CommunicationMode communicationMode, //
                                           final SendCallback sendCallback, //
                                           final long timeout//
        ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            this.makeSureStateOK();
            Validators.checkMessage(msg, this.defaultMQProducer);
    
            final long invokeID = random.nextLong();
            long beginTimestampFirst = System.currentTimeMillis();
            long beginTimestampPrev = beginTimestampFirst;
            long endTimestamp = beginTimestampFirst;
            // 获取namesrv路由信息
            TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
            if (topicPublishInfo != null && topicPublishInfo.ok()) {
                MessageQueue mq = null;
                Exception exception = null;
                SendResult sendResult = null;
                // 同步请求默认重试3次  其他的默认重试次数为1次
                int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
                int times = 0;
                String[] brokersSent = new String[timesTotal];
                for (; times < timesTotal; times++) {
                    String lastBrokerName = null == mq ? null : mq.getBrokerName();
                    // producer 负载均衡
                    MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                    if (tmpmq != null) {
                        mq = tmpmq;
                        brokersSent[times] = mq.getBrokerName();
                        try {
                            beginTimestampPrev = System.currentTimeMillis();
                            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                            endTimestamp = System.currentTimeMillis();
                            // 更新brokerName的延迟
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            switch (communicationMode) {
                                case ASYNC:
                                    return null;
                                case ONEWAY:
                                    return null;
                                case SYNC:
                                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                        if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                            continue;
                                        }
                                    }
    
                                    return sendResult;
                                default:
                                    break;
                            }
                        } catch (RemotingException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
                            exception = e;
                            continue;
                        } catch (MQClientException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
                            exception = e;
                            continue;
                        } catch (MQBrokerException e) {
                            // private SendResult processSendResponse   在MQApiClient中的这个方法中处理
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
                            exception = e;
                            switch (e.getResponseCode()) {
                                case ResponseCode.TOPIC_NOT_EXIST:
                                case ResponseCode.SERVICE_NOT_AVAILABLE:
                                case ResponseCode.SYSTEM_ERROR:
                                case ResponseCode.NO_PERMISSION:
                                case ResponseCode.NO_BUYER_ID:
                                case ResponseCode.NOT_IN_CURRENT_UNIT:
                                    continue;
                                default:
                                    if (sendResult != null) {
                                        return sendResult;
                                    }
    
                                    throw e;
                            }
                        } catch (InterruptedException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
    
                            log.warn("sendKernelImpl exception", e);
                            log.warn(msg.toString());
                            throw e;
                        }
                    } else {
                        break;
                    }
                }
    
                if (sendResult != null) {
                    return sendResult;
                }
    
                String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                        times,
                        System.currentTimeMillis() - beginTimestampFirst,
                        msg.getTopic(),
                        Arrays.toString(brokersSent));
    
                info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
    
                MQClientException mqClientException = new MQClientException(info, exception);
                if (exception instanceof MQBrokerException) {
                    mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
                } else if (exception instanceof RemotingConnectException) {
                    mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
                } else if (exception instanceof RemotingTimeoutException) {
                    mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
                } else if (exception instanceof MQClientException) {
                    mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
                }
    
                throw mqClientException;
            }

     对于同步请求 如下,具体的实现细节可以参考通信层

        private SendResult sendMessageSync(//
                                           final String addr, //
                                           final String brokerName, //
                                           final Message msg, //
                                           final long timeoutMillis, //
                                           final RemotingCommand request//
        ) throws RemotingException, MQBrokerException, InterruptedException {
            RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
            assert response != null;
            return this.processSendResponse(brokerName, msg, response);
        }

    服务提供者负载均衡

    MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

        public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
            if (this.sendLatencyFaultEnable) {
                try {
                    // 每一个TopicPublishInfo里面ThreadLocalIndex  维护着当前获取messagequeue的index
                    // 在重试的过程中,能够获取到上一次index的下一个
                    int index = tpInfo.getSendWhichQueue().getAndIncrement();
                    for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                        int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                        if (pos < 0)
                            pos = 0;
                        MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                        // 这里边做了一些策略  针对不同的延迟,设置不同时间的保护期,如果对延迟相对较高的broker,在相对较长的时间内不希望被请求
                        // 过了这个保护期  则broker Available
                        if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                            // 如果获取到的mq和上次的mq是一样的  则直接返回
                            if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                                return mq;
                        }
                    }
    
                    /**
                     * 因为topic可能分布在不同的brokername上,在发送完数据后  一版都会统计到brokername的延迟,
                     * 排序后,在相对延迟较低的前一半broker中获取到一个brokerName,或许这个brokername并不是延迟最低的
                     */
    
                    final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                    int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                    if (writeQueueNums > 0) {
                        final MessageQueue mq = tpInfo.selectOneMessageQueue();
                        if (notBestBroker != null) {
                            mq.setBrokerName(notBestBroker);
                            mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                        }
                        return mq;
                    } else {
                        // 没有可写的queue  则移除
                        latencyFaultTolerance.remove(notBestBroker);
                    }
                } catch (Exception e) {
                    log.error("Error occurred when selecting message queue", e);
                }
    
                // 轮询下一个
                return tpInfo.selectOneMessageQueue();
            }
    
            // 轮询选择一个非上次的brokerName
            return tpInfo.selectOneMessageQueue(lastBrokerName);
        }
  • 相关阅读:
    SQLITE3在php中的运用
    C# DllImport用法和路径问题
    ExtJs2.0学习系列(9)Ext.TabPanel之第一式
    ExtJs2.0学习系列(11)Ext.XTemplate
    ExtJs2.0学习系列(12)Ext.TreePanel之第一式
    ExtJs2.0学习系列(10)Ext.TabPanel之第二式
    ExtJs2.0学习系列(6)Ext.FormPanel之第三式(ComboBox篇)
    ExtJs2.0学习系列(15)extjs换肤
    ExtJs2.0学习系列(8)Ext.FormPanel之第五式(综合篇)
    ExtJs2.0学习系列(5)Ext.FormPanel之第二式
  • 原文地址:https://www.cnblogs.com/gaojy/p/15077189.html
Copyright © 2011-2022 走看看