zoukankan      html  css  js  c++  java
  • Flume的Avro Sink和Avro Source研究之二 : Avro Sink

    啊,AvroSink要复杂好多:《

    好吧,先确定主要问题:

    1. AvroSink为啥这么多代码?有必要吗?它都有哪些逻辑需要实现?

      你看,avro-rpc-quickstart里是这么建client,然后进行RPC的

      

            NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(65111));
            // client code - attach to the server and send a message
            Mail proxy = (Mail) SpecificRequestor.getClient(Mail.class, client);
         proxy.send(message);
    

      

      那么,AvroSink为啥不是这么简单?它会启动多个线程,并发的RPC? 它会使用连接池?它自己实现了一个?

      AvroSink继承自 AbstractRpcSink. AbstractRpcSink对Sink接口的process方法的实现为,由自己持有的RpcClient对象来对消息进行实际处理,即 client.appendBatch(batch);。而AvroSink实现AbstractRpcSink中的虚方法 "protected abstract RpcClient initializeRpcClient(Properties props)" 来提供一个可用的RpcClient。它的实现为:

      

      protected RpcClient initializeRpcClient(Properties props) {
        logger.info("Attempting to create Avro Rpc client.");
        return RpcClientFactory.getInstance(props);
      }
    

      而RpcClientFactory的getInstance方法当“client.type"参数为空时,返回默认的RpcClient,即 NettyAvroRpcClient。


    NettyAvroRpcClient

    在它的"private void connect(long timeout, TimeUnit tu) throws FlumeException"方法中,实始化进行RPC所需要的代理,即此类中avroClient域。

          transceiver = new NettyTransceiver(this.address, socketChannelFactory, tu.toMillis(timeout));
          avroClient = SpecificRequestor.getClient(AvroSourceProtocol.Callback.class, transceiver);
    

      avroClient可以代理AvroSourceProtocol.Callback.class,这个AvroSourceProtocol.Callback.class 定义了跟AvroSourceProtocol相似的接口,不过增加了一个参数用来进行回调。

      

    @org.apache.avro.specific.AvroGenerated
    public interface AvroSourceProtocol {
      public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{"protocol":"AvroSourceProtocol","namespace":"org.apache.flume.source.avro","doc":"* Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing,\n * software distributed under the License is distributed on an\n * \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n * KIND, either express or implied. See the License for the\n * specific language governing permissions and limitations\n * under the License.","types":[{"type":"enum","name":"Status","symbols":["OK","FAILED","UNKNOWN"]},{"type":"record","name":"AvroFlumeEvent","fields":[{"name":"headers","type":{"type":"map","values":"string"}},{"name":"body","type":"bytes"}]}],"messages":{"append":{"request":[{"name":"event","type":"AvroFlumeEvent"}],"response":"Status"},"appendBatch":{"request":[{"name":"events","type":{"type":"array","items":"AvroFlumeEvent"}}],"response":"Status"}}}");
      org.apache.flume.source.avro.Status append(org.apache.flume.source.avro.AvroFlumeEvent event) throws org.apache.avro.AvroRemoteException;
      org.apache.flume.source.avro.Status appendBatch(java.util.List<org.apache.flume.source.avro.AvroFlumeEvent> events) throws org.apache.avro.AvroRemoteException;

    @SuppressWarnings("all")
      public interface Callback extends AvroSourceProtocol {
        public static final org.apache.avro.Protocol PROTOCOL = org.apache.flume.source.avro.AvroSourceProtocol.PROTOCOL;
        void append(org.apache.flume.source.avro.AvroFlumeEvent event, org.apache.avro.ipc.Callback<org.apache.flume.source.avro.Status> callback) throws java.io.IOException;
        void appendBatch(java.util.List<org.apache.flume.source.avro.AvroFlumeEvent> events, org.apache.avro.ipc.Callback<org.apache.flume.source.avro.Status> callback) throws java.io.IOException;
      }
    }

    下边看下NettyAvroRpcClient是怎么实现其RpcClient接口的append和appendBatch方法的。

    它所override的append(Event event)方法中,把消息的处理交给自己的append(Event event, long timeout, TimeUnit tu)来处理。

    append方法首先将Event对象转化为用于RPC的AvroFlumeEvent对象。然后把RPC的动作提交给一个线程池。

    try {
      // due to AVRO-1122, avroClient.append() may block
      handshake = callTimeoutPool.submit(new Callable<Void>() {

        @Override
        public Void call() throws Exception {
          avroClient.append(avroEvent, callFuture);
        return null;
        }
      });
    } catch (RejectedExecutionException ex) {
      throw new EventDeliveryException(this + ": Executor error", ex);
    }

    avroClient.append中有一个callFuture参数,future代表了一个异步执行的结果,所以它所被期望的行为是append方法会立即返回,然后另一个线程通过callFuture对象来获取执行的结果。但是实际上,由于avro RPC 之前有个handshake的过程用于确认双方持有的shema是否合适,这个handshake的过程会阻塞client端RPC调用的方法,即会阻塞client端的append方法,使得在提交任务后,直接使用callFuture的get(timeout),若append方法执行完返回了结果,那这个timeout实际上可能包括了handshake的时长加上server端实际执行append方法的时长。因此AvroSink把这两个时长都设为可配置的,即用户可以设定handshake的花的时长,以及等待server端处理请求的时长。但是这个handshake只在client和server第一次通信时进行。所以后续的client端的append RPC调用会立即返回,不再需要等待handshake。

    参见AVRO-1122

       看一下AvroSink的配置选项。

    connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request.
    request-timeout 20000 Amount of time (ms) to allow for requests after the first.

    connect-timeout的设置是通过下面的代码实现的。等待这个Callable执行完,如果超时,就取消这个Callable.去掉异常处理后的代码是这样子:

    handshake.get(connectTimeout, TimeUnit.MILLISECONDS);
    finally {
          if (!handshake.isDone()) {
            handshake.cancel(true);
          }
        }
    
    waitForStatusOK(callFuture, timeout, tu);
    

      而waitForStatusOK是这样子:

        try {
          Status status = callFuture.get(timeout, tu);
          if (status != Status.OK) {
            throw new EventDeliveryException(this + ": Avro RPC call returned " +
                "Status: " + status);
          }
    

      即, append方法会根据在flume配置文件里设置的超时参数进行等待。调用append方法的线程还是会阻塞到这个消息处理完毕。

      

      由于handshake会阻塞RPC调用,而handshake花的时间是不确定的,所以才不得不使用一个线程池,即callTimeoutPool来将append这个RPC调用放在单独的Callable里执行,用Future对RPC的执行情况进行监控,如果append进行过长时间的等待,就通过future取消这个任务。真是用心良苦……

  • 相关阅读:
    [Leetcode]-- Largest Rectangle in Histogram
    Trapping Rain Water
    JNI和JNA性能比较
    Visual Studio开发Linux程序的方法
    Linux查看机器的硬件信息
    各语言的代码混淆工具
    类型转换:static_cast、dynamic_cast、reinterpret_cast和const_cast区别
    内存泄露的监测工具
    我们三十以后才明白
    我们三十以后才明白
  • 原文地址:https://www.cnblogs.com/devos/p/3618033.html
Copyright © 2011-2022 走看看