zoukankan      html  css  js  c++  java
  • 重写Sink合并多行

    flume1.6+elasticsearch6.3.2

    Pom

    <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>3.8.1</version>
                <scope>test</scope>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>6.4.3</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport -->
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>transport</artifactId>
                <version>6.4.3</version>
            </dependency>
            <!-- <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> 
                <version>4.1.32.Final</version> </dependency> -->
            <!-- https://mvnrepository.com/artifact/org.apache.flume.flume-ng-sinks/flume-ng-elasticsearch-sink -->
            <dependency>
                <groupId>org.apache.flume.flume-ng-sinks</groupId>
                <artifactId>flume-ng-elasticsearch-sink</artifactId>
                <version>1.6.0</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
                <version>2.8.5</version>
            </dependency>
    
    </dependencies>

    ElasticSearchForLogSink.java

    package com.jachs.sink.elasticsearch;
    
    import org.apache.flume.Channel;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.Transaction;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
    import org.apache.flume.sink.elasticsearch.client.RoundRobinList;
    import org.apache.http.client.HttpClient;
    import org.apache.http.impl.client.DefaultHttpClient;
    import org.elasticsearch.action.bulk.BulkRequestBuilder;
    import org.elasticsearch.action.index.IndexRequestBuilder;
    import org.elasticsearch.action.index.IndexResponse;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.bytes.BytesReference;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.transport.TransportAddress;
    import org.elasticsearch.transport.client.PreBuiltTransportClient;
    
    import com.google.gson.Gson;
    
    import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
    import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
    
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES;
    
    public class ElasticSearchForLogSink extends AbstractSink implements Configurable {
        private String hostNames;
        private String indexName;
        private String clusterName;
        static TransportClient client;
        static Map<String, String> dataMap = new HashMap<String, String>();;
    
        public void configure(Context context) {
            hostNames = context.getString(HOSTNAMES);
            indexName = context.getString(INDEX_NAME);
            clusterName = context.getString(CLUSTER_NAME);
        }
    
        @Override
        public void start() {
            Settings settings = Settings.builder().put("cluster.name", clusterName).build();
            try {
                client = new PreBuiltTransportClient(settings).addTransportAddress(new TransportAddress(
                        InetAddress.getByName(hostNames.split(":")[0]), Integer.parseInt(hostNames.split(":")[1])));
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void stop() {
            super.stop();
        }
    
        public Status process() throws EventDeliveryException {
            Status status = Status.BACKOFF;
            Channel ch = getChannel();
            Transaction txn = ch.getTransaction();
            txn.begin();
            try {
                Event event = ch.take();
                if (event == null) {
                    txn.rollback();
                    return status;
                }
                String data = new String(event.getBody(), "UTF-8");
                if (data.indexOf("token") != -1) {
                    String token = data.substring(data.length() - 1, data.length());
                    System.out.println("获取标识" + token);
                    String sb = dataMap.get(token);
                    if (sb != null) {
                        sb = sb + data;
                    } else {
                        dataMap.put(token, data);
                    }
                }
                System.out.println("打印" + dataMap.size());
                if (dataMap.size() >=10 ) {//十条数据一提交,条件自己改
                    BulkRequestBuilder bulkRequest = client.prepareBulk();
    
                    bulkRequest.add(client.prepareIndex(indexName, "text").setSource(dataMap));
                    bulkRequest.execute().actionGet();
                    dataMap.clear();
                    System.out.println("归零" + dataMap.size());
                }
                // Map<String, Object> map = new HashMap<String, Object>();
    
                // for (String key : head.keySet()) {
                // map.put("topic", key);
                // map.put("timestamp", head.get(key));
                // map.put("data", new String(event.getBody(), "UTF-8"));
                // }
    
                // IndexRequestBuilder create = client.prepareIndex(indexName,
                // "text").setSource(map);
                // IndexResponse response = create.execute().actionGet();
    
                txn.commit();
                status = Status.READY;
            } catch (Throwable t) {
                txn.rollback();
                status = Status.BACKOFF;
                t.printStackTrace();
                if (t instanceof Error) {
                    throw (Error) t;
                }
            } finally {
                txn.close();
            }
            return status;
        }
    }

    kafka生成者模仿日志写入代码

    package com.test.Kafka;
    
    import java.util.Properties;
    
    import org.apache.commons.lang.RandomStringUtils;
    import org.apache.commons.lang.StringUtils;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import com.google.gson.Gson;
    
    public class App {
        public static void main(String[] args) {
            Properties properties = new Properties();
            // properties.put("bootstrap.servers",
            // "192.168.2.200:9092,192.168.2.157:9092,192.168.2.233:9092,192.168.2.194:9092,192.168.2.122:9092");
            // properties.put("bootstrap.servers",
            // "192.168.2.200:9092,192.168.2.233:9092,192.168.2.122:9092");
            properties.put("bootstrap.servers", "127.0.0.1:9092");
            properties.put("acks", "all");
            properties.put("retries", 3);
            properties.put("batch.size", 16384);
            properties.put("linger.ms", 1);
            properties.put("buffer.memory", 33554432);
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            Producer<String, String> producer = null;
            RandomStringUtils randomStringUtils=new RandomStringUtils();
            try {
                producer = new KafkaProducer<String, String>(properties);
                for (int i = 0; i < 10; i++) {// topID无所谓
                    producer.send(new ProducerRecord<String, String>("test1", "tokenk"+randomStringUtils.random(1)));
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.close();
            }
        }
    }

    修改flume配置

    a1.sinks.elasticsearch.type=com.jachs.sink.elasticsearch.ElasticSearchForLogSink
  • 相关阅读:
    第十一周编程总结
    第十一周助教总结
    第十周编程总结
    第十周学习总结
    第十周助教总结
    第九周学习总结
    第九周编程总结
    第九周助教总结
    第八周学习总结
    第八周编程总结
  • 原文地址:https://www.cnblogs.com/zhanchaohan/p/10328980.html
Copyright © 2011-2022 走看看