zoukankan      html  css  js  c++  java
  • 自定义Flume Sink:ElasticSearch Sink

    Flume Sink的目的是从Flume Channel中获取数据然后输出到存储或者其他Flume Source中。Flume Agent启动的时候,它会为每一个Sink都启动一个SinkRunner的对象,SinkRunner.start()方法会启动一个新的线程去管理每一个Sink的生命周期。每一个Sink需要实现start()、Stop()和process()方法。你可以在start方法中去初始化Sink的参数和状态,在stop方法中清理Sink的资源。最关键的是process方法,它将处理从Channel中拿出来的数据。另外如果Sink有一些配置则需要实现Configurable接口。

    由于Flume官方提供的Sink往往不能满足要求,所以我们自定义Sink来实现定制化的需求,这里以ElasticSearch为例。在Sink中实现所以文档的简单的Insert功能。例子使用Flume 1.7。

    1. 编写代码

    首先新建类ElasticSearchSink类继承AbstractSink类,由于还希望有自定义的Sink的配置,所以实现Configurable接口。

    public class ElasticSearchSink extends AbstractSink implements Configurable

    ElasticSearch的IP以及索引的名称可以配置在配置文件里面,配置文件就是使用flume的conf文件。你可以重写Configurable的configure的方法去获取配置,代码如下:

    @Override
        public void configure(Context context)
        {
            esHost = context.getString("es_host");
            esIndex = context.getString("es_index");
        }

    注意里面的配置项“es_host”和“es_index”在conf配置文件中的语法:

    agent.sinks = sink1
    agent.sinks.sink1.type = nick.test.flume.ElasticSearchSink
    agent.sinks.sink1.es_host = 192.168.50.213
    agent.sinks.sink1.es_index = vehicle_event_test

    接下来就是实现process方法,在这个方法中需要获取channel,因为数据都是从channel中获得的。获取消息之前,需要先获取一个Channel是事务,处理完成之后需要commit和关闭这个事务。这样才能让channel知道这个消息已经消费完成,它可以从它的内部队列中删除这个消息。如果消费失败,需要重新消费的话,可以rollback这个事务。事务的引入是flume对消息可靠性保证的关键。

    process方法需要返回一个Status类型的枚举,Ready和BackOff。如果你到了一个消息,并正常处理了,需要使用Ready。如果拿到的消息是null,则可以返回BackOff。所谓BackOff(失效补偿)就是当sink获取不到 消息的时候, Sink的PollingRunner 线程需要等待一段backoff时间,等channel中的数据得到了补偿再来进行pollling 操作。

    完整的代码如下:

    public class ElasticSearchSink extends AbstractSink implements Configurable
    {
        private String esHost;
        private String esIndex;
    
        private TransportClient client;
    
        @Override
        public Status process() throws EventDeliveryException
        {
    
            Status status = null;
            // Start transaction
            Channel ch = getChannel();
            Transaction txn = ch.getTransaction();
            txn.begin();
            try
            {
                Event event = ch.take();
    
                if (event != null)
                {
                    String body = new String(event.getBody(), "UTF-8");
    
                    BulkRequestBuilder bulkRequest = client.prepareBulk();
                    List<JSONObject> jsons = new ArrayList<JSONObject>();
    
                    JSONObject obj = JSONObject.parseObject(body);
    
                    String vehicleId = obj.getString("vehicle_id");
                    String eventBeginCode = obj.getString("event_begin_code");
                    String eventBeginTime = obj.getString("event_begin_time");
    
                    //doc id in index
                    String id = (vehicleId + "_" + eventBeginTime + "_" + eventBeginCode).trim();
    
    
                    JSONObject json = new JSONObject();
                    json.put("vehicle_id", vehicleId);
    
                    bulkRequest.add(client.prepareIndex(esIndex, esIndex).setSource(json));
    
                    BulkResponse bulkResponse = bulkRequest.get();
    
                    status = Status.READY;
                }
                else
                {
                    status = Status.BACKOFF;
                }
    
                txn.commit();
            }
            catch (Throwable t)
            {
                txn.rollback();
                t.getCause().printStackTrace();
    
                status = Status.BACKOFF;
            }
            finally
            {
                txn.close();
            }
    
            return status;
    
        }
    
        @Override
        public void configure(Context context)
        {
            esHost = context.getString("es_host");
            esIndex = context.getString("es_index");
        }
    
        @Override
        public synchronized void stop()
        {
            super.stop();
        }
    
        @Override
        public synchronized void start()
        {
            try
            {
                Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
                client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHost), 9300));
                super.start();
    
                System.out.println("finish start");
            }
            catch (Exception ex)
            {
                ex.printStackTrace();
            }
        }
    }

    2. 打包、配置和运行

    由于是自定义的Sink,所以需要打成jar包,然后copy到flume的lib文件夹下。然后配置agent的配置文件,最后启动flume就可以了。本例中,我使用了kafkasource、memorychannel和自定义的sink,完整的配置文件如下:

     

    agent.sources = source1
    agent.channels = channel1
    agent.sinks = sink1
    
    agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
    agent.sources.source1.channels = channel1
    agent.sources.source1.batchSize = 1
    agent.sources.source1.batchDurationMillis = 2000
    agent.sources.source1.kafka.bootstrap.servers = 192.168.50.116:9092,192.168.50.117:9092,192.168.50.118:9092,192.168.50.226:9092
    agent.sources.source1.kafka.topics = iov-vehicle-event
    agent.sources.source1.kafka.consumer.group.id = flume-vehicle-event-nick
    
    
    agent.sinks.sink1.type = nick.test.flume.ElasticSearchSink
    agent.sinks.sink1.es_host = 192.168.50.213
    agent.sinks.sink1.es_index = vehicle_event_test
    
    agent.sinks.sink1.channel = channel1
    
    agent.channels.channel1.type = memory
    agent.channels.channel1.capacity = 1000

    架构点滴

  • 相关阅读:
    优化MyBatis配置文件中的配置
    Java多线程---同步与锁
    Runtime.getRuntime().exec()
    java ---线程wait/notify/sleep/yield/join
    redis配置详情
    httpcline
    线程
    Bootstrap学习(一)
    springmvc注解配置
    salesforce上上传和导出.csv格式文件
  • 原文地址:https://www.cnblogs.com/haoxinyue/p/7517919.html
Copyright © 2011-2022 走看看