zoukankan      html  css  js  c++  java
  • Flume-NG源码阅读之AvroSink

      org.apache.flume.sink.AvroSink是用来通过网络来传输数据的,可以将event发送到RPC服务器(比如AvroSource),使用AvroSink和AvroSource可以组成分层结构。它继承自AbstractRpcSink  extends AbstractSink implements Configurable这跟其他的sink一样都得extends AbstractSink implements Configurable,所以重点也在confgure、start、process、stop这四个方法,实现了initializeRpcClient(Properties props)方法。

      一、configure(Context context)方法,先获取配置文件中的主机hostname和端口port;设置clientProps的属性hosts=h1,hosts.h1=hostname:port;然后将配置信息中的所有信息放入clientProps中;获取cxnResetInterval表示重复建立连接的时间间隔,默认是0就是不重复建立连接。

      二、start()方法是调用createConnection()建立连接,如果出现异常就调用destroyConnection()掐断连接,避免资源泄漏。createConnection()方法主要是初始化client = initializeRpcClient(clientProps)以及创建一个线程,并执行在给定延迟cxnResetInterval后执行一次销毁链接destroyConnection(),由于默认cxnResetInterval=0,所以是不会执行这个线程的。这点不是很明白,为什么要销毁???initializeRpcClient(clientProps)方法会根据配置文件中的信息进行构造相应的RpcClient:首先会获取"client.type"参数指定的类型可用的有四种(NettyAvroRpcClient(如果没有"client.type"则使用这个作为默认Client)、FailoverRpcClient、LoadBalancingRpcClient、ThriftRpcClient),实例化之后需要对其在进行必要的配置执行client.configure(properties)进行配置:

      (1)NettyAvroRpcClient.configure(Properties properties)方法首先会获取锁,检查connState连接状态要保证是没有配置过的;其次获取"batch-size"设置batchSize,如果配置的小于1则使用默认值100;获取“hosts”,如果配置了多个hosts则只使用第一个;获取"hosts."前缀,如果有多个则使用第一个,再解析出hostname和port,构建一个InetSocketAddress的对象address;获取连接超时时间"connect-timeout",设置connectTimeout,如果配置的小于1000则使用默认值20000,单位是ms;获取相应时间"request-timeout",设置requestTimeout,如果配置的小于1000,则使用默认值20000,单位ms;获取压缩类型"compression-type",如果有配置压缩还需要获取压缩的等级compressionLevel;最后调用connect()链接RPC服务器。

      实际的链接在connect(long timeout, TimeUnit tu)方法中,先构造一个线程池callTimeoutPool;然后根据是否有压缩构造相应的工厂类CompressionChannelFactory(有压缩配置)或者NioClientSocketChannelFactory(无压缩配置);构造一个

    NettyTransceiver(this.address,socketChannelFactory,tu.toMillis(timeout))收发器对象transceiver;根据transceiver返回一个avroClient;最后设置链接状态为READY。

      (2)FailoverRpcClient.configure(Properties properties)方法会调用configureHosts(Properties properties)方法,这个方法会获取配置文件中的host列表hosts;获取最大尝试次数"max-attempts",设置maxTries,默认是hosts的大小;获取批量大小

    "batch-size",设置batchSize,如果配置的小于1则使用默认大小100;将此client置为活动的isActive=true。可以看出这个client可以使用多个host。

      (3)LoadBalancingRpcClient.configure(Properties properties)会获取配置文件中的host列表hosts,且不允许少于两个,否则爆异常;获取主机选择器"host-selector",有两种内置的选择器:LoadBalancingRpcClient.RoundRobinHostSelector和LoadBalancingRpcClient.RandomOrderHostSelector,默认是ROUND_ROBIN(即RoundRobinHostSelector)轮询的方式(也可以自定义,要实现LoadBalancingRpcClient.HostSelector接口);获取"backoff",设置backoff(是否使用推迟算法,就是sink.process出问题后对这个sink设置惩罚时间,在此期间不再认为其可活动)的boolean值(默认false就是不启用);获取最大推迟时间"maxBackoff",设置maxBackoff;然后根据选择器是ROUND_ROBIN还是RANDOM选择对应的类并实例化selector,最后设置主机selector.setHosts(hosts)。

      这两个内置选择器:RoundRobinHostSelector实际使用的是RoundRobinOrderSelector;RandomOrderHostSelector实际使用的是RandomOrderSelector,这两个都在Flume-NG源码阅读之SinkGroups和SinkRunner 这篇文章中有介绍,这里不再说明。

      (4)ThriftRpcClient.configure(Properties properties)会获取状态锁stateLock.lock();获取配置文件中的host列表中的第一个,只需要一个;获取批量大小"batch-size",设置batchSize,如果配置的小于1则使用默认大小100;获取主机名hostname和端口port;获取响应时间requestTimeout,如果小于1000设置为默认的20000ms;获取连接池大小"maxConnections",设置connectionPoolSize,如果大小小于1则设置为默认的值5;创建连接池管理对象connectionManager= new ConnectionPoolManager(connectionPoolSize);设置连接状态为READY,connState = State.READY;最后状态锁解锁stateLock.unlock()。

      这四个Client都是extends AbstractRpcClient implements RpcClient。

      三、process()方法,代码如下:

     1   public Status process() throws EventDeliveryException {
     2     Status status = Status.READY;
     3     Channel channel = getChannel();    //获得channel
     4     Transaction transaction = channel.getTransaction();    //创建事务
     5 
     6     try {
     7       transaction.begin();    //事务开始
     8 
     9       verifyConnection();    //确保存在链接且处于活动状态,如果链接处于非活动状态销毁并重建链接
    10 
    11       List<Event> batch = Lists.newLinkedList();
    12 
    13       for (int i = 0; i < client.getBatchSize(); i++) {    //保证这批次的event数量不可能超过客户端批量处理的最大处理数量
    14         Event event = channel.take();
    15 
    16         if (event == null) {        //表示channel中没有数据了
    17           break;
    18         }
    19 
    20         batch.add(event);    //加入event列表
    21       }
    22 
    23       int size = batch.size();    //获取这批次取得的event的数量
    24       int batchSize = client.getBatchSize();        //获取客户端可以批量处理的大小
    25 
    26       if (size == 0) {
    27         sinkCounter.incrementBatchEmptyCount();
    28         status = Status.BACKOFF;
    29       } else {
    30         if (size < batchSize) {
    31           sinkCounter.incrementBatchUnderflowCount();
    32         } else {
    33           sinkCounter.incrementBatchCompleteCount();
    34         }
    35         sinkCounter.addToEventDrainAttemptCount(size);
    36         client.appendBatch(batch);        //批量处理event
    37       }
    38 
    39       transaction.commit();        //事务提交
    40       sinkCounter.addToEventDrainSuccessCount(size);
    41 
    42     } catch (Throwable t) {
    43       transaction.rollback();    //事务回滚
    44       if (t instanceof Error) {
    45         throw (Error) t;
    46       } else if (t instanceof ChannelException) {
    47         logger.error("Rpc Sink " + getName() + ": Unable to get event from" +
    48             " channel " + channel.getName() + ". Exception follows.", t);
    49         status = Status.BACKOFF;
    50       } else {
    51         destroyConnection();        //销毁链接
    52         throw new EventDeliveryException("Failed to send events", t);
    53       }
    54     } finally {
    55       transaction.close();    //事务关闭
    56     }
    57 
    58     return status;
    59   }

      即使本批次event的数量达不到client.getBatchSize()(channel中没数据了)也会立即发送到RPC服务器。verifyConnection()方法是确保存在链接且处于活动状态,如果链接处于非活动状态销毁并重建链接。如果本批次没有event,则不会想RPC发送任何数据。client.appendBatch(batch)方法是批量发送event。

      (1)NettyAvroRpcClient.appendBatch(batch)方法会调用appendBatch(events, requestTimeout, TimeUnit.MILLISECONDS)方法,该方法会首先确认链接处于READY状态,否则报错;然后将每个event重新封装成AvroFlumeEvent,放入avroEvents列表中;然后构造一个CallFuture和avroEvents一同封装成一个Callable放入线程池 handshake = callTimeoutPool.submit(callable)中去执行,其call方法内容是avroClient.appendBatch(avroEvents, callFuture)就是在此批量提交到RPC服务器;然后handshake.get(connectTimeout, TimeUnit.MILLISECONDS)在规定时间等待执行的返回结果以及等待append的完成waitForStatusOK(callFuture, timeout, tu),详细的可看这里Flume的Avro Sink和Avro Source研究之二 : Avro Sink ,有对于这两个future更深入的分析。一个批次传输的event的数量是min(batchSize,events.size())

      (2)FailoverRpcClient.appendBatch(batch)方法会做最多maxTries次尝试直到获取到可以正确发送events的Client,通过localClient=getClient()--》getNextClient()来获取client,这个方法每次会获取hosts中的下一个HostInfo,并使用NettyAvroRpcClient来作为RPC Client,这就又回到了(1)中,这个方法还有一个要注意的就是会先从当前的lastCheckedhost+1位置向后找可以使用的Client,如果不行会再从开始到到lastCheckedhost再找,再找不到就报错。使用localClient.appendBatch(events)来处理events,可参考(1)。

      (3)LoadBalancingRpcClient.appendBatch(batch)方法,首先会获取可以发送到的RPC服务器的迭代器Iterator<HostInfo> it = selector.createHostIterator();然后取一个HostInfo,RpcClient client = getClient(host)这个Client和(2)一样都是NettyAvroRpcClient,但是getClient方法会设置一个保存名字和client映射的clientMap;client.appendBatch(events)执行之后就会跳出循环,下一次appendBatch会选择下一个client执行。

      (4)ThriftRpcClient.appendBatch(batch)方法,从connectionManager.checkout()获取一个client,ConnectionPoolManager类主要维护俩对象availableClients用来存放可用的client(是一个ClientWrapper,维护一个ThriftSourceProtocol.Client client 是用来批量处理event的)、checkedOutClients用来存储从availableClients中拿出的Client表示正在使用的Client;ConnectionPoolManager.checkout()用于从availableClients中remove出client并放入checkedOutClients中,返回这个client;ConnectionPoolManager.checkIn(ClientWrapper client)方法用于将指定的Client从checkedOutClient中remove出并放入availableClients中;ConnectionPoolManager.destroy(ClientWrapper client)用于将checkedOutClients中的指定Client   remove并close。appendBatch方法中获得client后,会每次封装min(batchSize,events.size())个event,把他们封装成ThriftFlumeEvent加入thriftFlumeEvents列表,然后如果thriftFlumeEvents>0则执行doAppendBatch(client, thriftFlumeEvents).get(requestTimeout,TimeUnit.MILLISECONDS)阻塞等待传输完毕。doAppendBatch方法会构建一个Callable其call方法执行client.client.appendBatch(e),将这个Callable放入线程池callTimeoutPool中执行并返回执行结果Future。

      以上四种RpcClient的append(Event event)方法也比较容易理解,不再讲述。

      四、stop()方法主要是销毁链接,关闭cxnResetExecutor。

      

      其实flume支持avro和thrift两种(目前)传输,上面的(2)和(3)只不过是对(1)的上层业务做了一次封装而已,本质上还是一样的都是avro(基于netty)。同时记住avrosink是支持压缩的。

      在此,由于博主对avro、netty、thrift并未深入研究过,所以只能从flume层面讲解avrosink,对于某些人来说,可能讲的并不深入,相关内容请自行学习!!

  • 相关阅读:
    小波变换的引入,通俗易懂
    Leetcode 437. Path Sum III
    Leetcode 113. Path Sum II
    Leetcode 112 Path Sum
    Leetcode 520 Detect Capital
    Leetcode 443 String Compression
    Leetcode 38 Count and Say
    python中的生成器(generator)总结
    python的random模块及加权随机算法的python实现
    leetcode 24. Swap Nodes in Pairs(链表)
  • 原文地址:https://www.cnblogs.com/lxf20061900/p/3753630.html
Copyright © 2011-2022 走看看