zoukankan      html  css  js  c++  java
  • pull类型消息中间件-消息发布者(一)

    消息集群架构

    对于发送方来说的关键几要素

    • topic

    消息的主题,由用户定义。类似于知乎的话题,Producer发送消息的时候需要指定发送到某一个topic下面,Consumer从某一个topic下面消费消息。

    • tag

    每次发送一条消息的时候,给消息加一个Tag,方便Consumer过滤消息

    • message

    消息,负载发送的消息的信息。在生产者,服务端和 消费者之间传输

    • queue

    queue就是metaq中具体用来存数消息的数据结构,每一个topic下面对应多个queue,以目录的形式分开存储在磁盘上。我们在最初的网络结构中就描述了brokerserver集群,然而对应到具体某一个brokerserver的存储,就是queue+commitlog

    之前介绍brokerserver的时候有简单提过DefaultMessageStore类,这是metaq消息存储的默认实现类,里面存储的具体配置,也包含了commitlog,consumequeue,关于消息加载,刷盘,恢复,清理等存储设计都在这里。

    • Group

    消费者可以是多个消费者共同消费一个 topic 下的消息,每个消费者消费部分消息。这些消费者就组成一个分组,拥有同一个分组名称,通常也称为消费者集群。

    集群消费,一条消息只会被同一个group里一个消费者消费。 不同group之间相互不影响。
    广播消费,一条消息会被同一个group里每一个消费端消费

    消息发送者实例化

    Producer 消息生产者,负责产生消息,一般由业务系统负责产生消息。
    一般一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例。
    消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了 tags, 消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。

    我们可以看出start()方法主要做这么几件事:

    1. 检查配置是否正确,checkConfig();
    2. 注册produce,registerProducer();
    3. mQClientFactory.start();
    4. 向所有的broker发送的Heartbeat。

    发送方的核心服务:

     //Start request-response channel
     this.mQClientAPIImpl.start();
     //Start various schedule tasks
     this.startScheduledTask();
     //Start pull service
     this.pullMessageService.start();
     //Start rebalance service
     this.rebalanceService.start();
     //Start push service
     this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
    

    消息发送流程

    metaQ的底层通信还是基于netty的。

    消息载体

    private String topic;
    private int flag;
    private Map<String, String> properties;
    private byte[] body;
    
    private int queueId;
    private int storeSize;
    private long queueOffset;
    private int sysFlag;
    private long bornTimestamp;
    private SocketAddress bornHost;
    private long storeTimestamp;
    private SocketAddress storeHost;
    private String msgId;
    private long commitLogOffset;
    private int bodyCRC;
    private int reconsumeTimes;
    private long preparedTransactionOffset;
    

    这里需要强调的是,Message是为了便于metaq逻辑操作而定义的偏向于业务逻辑的类,实际上的网络传输类是RemotingCommand,结构如下

     public ByteBuffer encode() {
            int length = 4;
            byte[] headerData = this.buildHeader();
            length += headerData.length;
            if (this.body != null) {
                length += body.length;
            }
            ByteBuffer result = ByteBuffer.allocate(4 + length);
            result.putInt(length);
            result.putInt(headerData.length);
            result.put(headerData);
            if (this.body != null) {
                result.put(this.body);
            }
            result.flip();
            return result;
        }
    

    producer的send()方法:


    客户端从zookeeper上获取publish的topic对应的broker和分区列表(按照brokerId和partition的顺序排列组织成一个有序的分区列表),生产者在发送消息的时候必须选择一个分区来发送消息,发送的时候按照从头到尾循环往复的方式选择来发送消息。

    默认调用的是同步发送方法。
    调用netty中的 com.alibaba.rocketmq.remoting.netty.NettyRemotingClient#invokeSync

      public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
                throws InterruptedException, RemotingConnectException, RemotingSendRequestException,
                RemotingTimeoutException {
            final Channel channel = this.getAndCreateChannel(addr);
            if (channel != null && channel.isActive()) {
                try {
                    if (this.rpcHook != null) {
                        this.rpcHook.doBeforeRequest(addr, request);
                    }
                    RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
                    if (this.rpcHook != null) {
                        this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel),
                            request, response);
                    }
                    return response;
                }
                catch (RemotingSendRequestException e) {
                    log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                    this.closeChannel(addr, channel);
                    throw e;
                }
                catch (RemotingTimeoutException e) {
                    log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                    throw e;
                }
            }
            else {
                this.closeChannel(addr, channel);
                throw new RemotingConnectException(addr);
            }
        }
    

    同时可以设置调用异步发送方法。
    调用com.alibaba.rocketmq.remoting.netty.NettyRemotingClient#invokeAsync

     public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis,
                InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
                RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
            final Channel channel = this.getAndCreateChannel(addr);
            if (channel != null && channel.isActive()) {
                try {
                    if (this.rpcHook != null) {
                        this.rpcHook.doBeforeRequest(addr, request);
                    }
                    this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
                }
                catch (RemotingSendRequestException e) {
                    log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
                    this.closeChannel(addr, channel);
                    throw e;
                }
            }
            else {
                this.closeChannel(addr, channel);
                throw new RemotingConnectException(addr);
            }
        }
    

    这两个方法一个为同步,一个为异步。而同步与异步的主要实现则来自于ResponseFuture类。通过countDownLatch的使用,完成了同步操作(详情参看此类的putResponse方法及waitResponse方法);通过传递invokeCallback完成异步操作

  • 相关阅读:
    C# 全局热键
    Frida hook 初识
    xposed hook 复杂函数参数问题
    C# http post 中文乱码问题
    Fiddler 抓包https 问题
    C# HttpWebRequest 多线程超时问题
    Android Studio 无 Generate signed apk 菜单选项问题
    c#调用c++ dll const char* String类型转换问题。传值,与接收返回值问题
    C++中GB2312字符串和UTF-8之间的转换
    The underlying connection was closed: An unexpected error occurred on a send
  • 原文地址:https://www.cnblogs.com/zhulongchao/p/5792766.html
Copyright © 2011-2022 走看看