zoukankan      html  css  js  c++  java
  • Flume的Source、Sink总结,及常用使用场景

    数据源Source

    RPC异构流数据交换

    • Avro Source
    • Thrift Source

    文件或目录变化监听

    • Exec Source
    • Spooling Directory Source
    • Taildir Source

    MQ或队列订阅数据持续监听

    • JMS Source
    • SSL and JMS Source
    • Kafka Source

    Network类数据交换

    • NetCat TCP Source
    • NetCat UDP Source
    • HTTP Source
    • Syslog Sources
    • Syslog TCP Source
    • Multiport Syslog TCP Source
    • Syslog UDP Source

    定制源

    • Custom Source

    Sink

    • HDFS Sink
    • Hive Sink
    • Logger Sink
    • Avro Sink
    • Thrift Sink
    • IRC Sink
    • File Roll Sink
    • HBaseSinks
    • HBaseSink
    • HBase2Sink
    • AsyncHBaseSink
    • MorphlineSolrSink
    • ElasticSearchSink
    • Kite Dataset Sink
    • Kafka Sink
    • HTTP Sink
    • Custom Sink

    案例

    1、监听文件变化 

    exec-memory-logger.properties

    #指定agent的sources,sinks,channels
    a1.sources = s1
    a1.sinks = k1
    a1.channels = c1
    
    #配置sources属性
    a1.sources.s1.type = exec
    a1.sources.s1.command = tail -F /tmp/log.txt
    a1.sources.s1.shell = /bin/bash -c
    a1.sources.s1.channels = c1
    
    #配置sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = 192.168.1.103
    a1.sinks.k1.port = 8888
    a1.sinks.k1.batch-size = 1
    a1.sinks.k1.channel = c1
    
    #配置channel类型
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    启动

    flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/exec-memory-logger.properties --name a1 -Dflume.root.logger=INFO,console

    测试

    echo "asfsafsf" >> /tmp/log.txt

    2、TCP NetCat监听

    netcat.properties
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    启动

    flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/netcat.properties --name a1 -Dflume.root.logger=INFO,console

    测试

    telnet localhost 44444

    3、Kafka读、写 (读:从kafka到log,写:从file到kafka)

    read-kafka.properties 、write-kafka.properties

    #指定agent的sources,sinks,channels
    a1.sources = s1  
    a1.sinks = k1  
    a1.channels = c1  
       
    #配置sources属性
    a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.s1.channels = c1
    a1.sources.s1.batchSize = 5000
    a1.sources.s1.batchDurationMillis = 2000
    a1.sources.s1.kafka.bootstrap.servers = 192.168.1.103:9092
    a1.sources.s1.kafka.topics = test1
    a1.sources.s1.kafka.consumer.group.id = custom.g.id
    
    #将sources与channels进行绑定
    a1.sources.s1.channels = c1
       
    #配置sink 
    a1.sinks.k1.type = logger
    
    #将sinks与channels进行绑定  
    a1.sinks.k1.channel = c1  
       
    #配置channel类型
    a1.channels.c1.type = memory
    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1                                                                                         
    
    a1.sources.s1.type=exec
    a1.sources.s1.command=tail -F /tmp/kafka.log
    a1.sources.s1.channels=c1 
    
    #设置Kafka接收器
    a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
    #设置Kafka地址
    a1.sinks.k1.brokerList=192.168.1.103:9092
    #设置发送到Kafka上的主题
    a1.sinks.k1.topic=test1
    #设置序列化方式
    a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
    a1.sinks.k1.channel=c1     
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=10000
    a1.channels.c1.transactionCapacity=100   

    启动

    flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/read-kafka.properties --name a1 -Dflume.root.logger=INFO,console
    
    flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/write-kafka.properties --name a1 -Dflume.root.logger=INFO,console

    测试

    # 创建用于测试主题
    bin/kafka-topics.sh --create 
                        --bootstrap-server 192.168.1.103:9092 
                        --replication-factor 1 
                        --partitions 1  
                        --topic test1
    # 启动 Producer,用于发送测试数据:
    bin/kafka-console-producer.sh --broker-list 192.168.1.103:9092 --topic test1

    4、定制源

    a1.sources = r1
    a1.channels = c1
    a1.sources.r1.type = org.example.MySource
    a1.sources.r1.channels = c1

    5、HDFS Sink 

    spooling-memory-hdfs.properties ,监听目录变化,将新建的文件传到HDFS

    #指定agent的sources,sinks,channels
    a1.sources = s1  
    a1.sinks = k1  
    a1.channels = c1  
       
    #配置sources属性
    a1.sources.s1.type =spooldir  
    a1.sources.s1.spoolDir =/tmp/log2
    a1.sources.s1.basenameHeader = true
    a1.sources.s1.basenameHeaderKey = fileName 
    #将sources与channels进行绑定  
    a1.sources.s1.channels =c1 
    
       
    #配置sink 
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/
    a1.sinks.k1.hdfs.filePrefix = %{fileName}
    #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
    a1.sinks.k1.hdfs.fileType = DataStream  
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    #将sinks与channels进行绑定  
    a1.sinks.k1.channel = c1
       
    #配置channel类型
    a1.channels.c1.type = memory

    测试

    hdfs dfs -ls /flume/events/19-11-21/15

    6、Hive Sink 

    a1.channels = c1
    a1.channels.c1.type = memory
    a1.sinks = k1
    a1.sinks.k1.type = hive
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
    a1.sinks.k1.hive.database = logsdb
    a1.sinks.k1.hive.table = weblogs
    a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
    a1.sinks.k1.useLocalTimeStamp = false
    a1.sinks.k1.round = true
    a1.sinks.k1.roundValue = 10
    a1.sinks.k1.roundUnit = minute
    a1.sinks.k1.serializer = DELIMITED
    a1.sinks.k1.serializer.delimiter = "	"
    a1.sinks.k1.serializer.serdeSeparator = '	'
    a1.sinks.k1.serializer.fieldnames =id,,msg

    7、Avro Source、Avro Sink

    exec-memory-avro.properties、avro-memory-log.properties

    #指定agent的sources,sinks,channels
    a1.sources = s1
    a1.sinks = k1
    a1.channels = c1
    
    #配置sources属性
    a1.sources.s1.type = exec
    a1.sources.s1.command = tail -F /tmp/log.txt
    a1.sources.s1.shell = /bin/bash -c
    a1.sources.s1.channels = c1
    
    #配置sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = 192.168.1.103
    a1.sinks.k1.port = 8888
    a1.sinks.k1.batch-size = 1
    a1.sinks.k1.channel = c1
    
    #配置channel类型
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    #指定agent的sources,sinks,channels
    a2.sources = s2
    a2.sinks = k2
    a2.channels = c2
    
    #配置sources属性
    a2.sources.s2.type = avro
    a2.sources.s2.bind = 192.168.1.103
    a2.sources.s2.port = 8888
    
    #将sources与channels进行绑定
    a2.sources.s2.channels = c2
    
    #配置sink
    a2.sinks.k2.type = logger
    
    #将sinks与channels进行绑定
    a2.sinks.k2.channel = c2
    
    #配置channel类型
    a2.channels.c2.type = memory
    a2.channels.c2.capacity = 1000
    a2.channels.c2.transactionCapacity = 100

    启动

    先
    flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/avro-memory-log.properties --name a2 -Dflume.root.logger=INFO,console
    后
    flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/exec-memory-avro.properties --name a1 -Dflume.root.logger=INFO,console

    测试,使用一个Avro客户端发送数据

    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.event.EventBuilder;
    import org.apache.flume.api.SecureRpcClientFactory;
    import org.apache.flume.api.RpcClientConfigurationConstants;
    import org.apache.flume.api.RpcClient;
    import java.nio.charset.Charset;
    import java.util.Properties;
    
    public class MyApp {
      public static void main(String[] args) {
        MySecureRpcClientFacade client = new MySecureRpcClientFacade();
        // Initialize client with the remote Flume agent's host, port
        Properties props = new Properties();
        props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift");
        props.setProperty("hosts", "h1");
        props.setProperty("hosts.h1", "client.example.org"+":"+ String.valueOf(8888));
    
        // Initialize client with the kerberos authentication related properties
        props.setProperty("kerberos", "true");
        props.setProperty("client-principal", "flumeclient/client.example.org@EXAMPLE.ORG");
        props.setProperty("client-keytab", "/tmp/flumeclient.keytab");
        props.setProperty("server-principal", "flume/server.example.org@EXAMPLE.ORG");
        client.init(props);
    
        // Send 10 events to the remote Flume agent. That agent should be
        // configured to listen with an AvroSource.
        String sampleData = "Hello Flume!";
        for (int i = 0; i < 10; i++) {
          client.sendDataToFlume(sampleData);
        }
    
        client.cleanUp();
      }
    }
    
    class MySecureRpcClientFacade {
      private RpcClient client;
      private Properties properties;
    
      public void init(Properties properties) {
        // Setup the RPC connection
        this.properties = properties;
        // Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory
        this.client = SecureRpcClientFactory.getThriftInstance(properties);
      }
    
      public void sendDataToFlume(String data) {
        // Create a Flume Event object that encapsulates the sample data
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
    
        // Send the event
        try {
          client.append(event);
        } catch (EventDeliveryException e) {
          // clean up and recreate the client
          client.close();
          client = null;
          client = SecureRpcClientFactory.getThriftInstance(properties);
        }
      }
    
      public void cleanUp() {
        // Close the RPC connection
        client.close();
      }
    }

    8、Elasticsearch Sink

    a1.channels = c1
    a1.sinks = k1
    a1.sinks.k1.type = elasticsearch
    a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
    a1.sinks.k1.indexName = foo_index
    a1.sinks.k1.indexType = bar_type
    a1.sinks.k1.clusterName = foobar_cluster
    a1.sinks.k1.batchSize = 500
    a1.sinks.k1.ttl = 5d
    a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
    a1.sinks.k1.channel = c1

    9、定制Source、Sink开发

    public class MySink extends AbstractSink implements Configurable {
      private String myProp;
    
      @Override
      public void configure(Context context) {
        String myProp = context.getString("myProp", "defaultValue");
    
        // Process the myProp value (e.g. validation)
    
        // Store myProp for later retrieval by process() method
        this.myProp = myProp;
      }
    
      @Override
      public void start() {
        // Initialize the connection to the external repository (e.g. HDFS) that
        // this Sink will forward Events to ..
      }
    
      @Override
      public void stop () {
        // Disconnect from the external respository and do any
        // additional cleanup (e.g. releasing resources or nulling-out
        // field values) ..
      }
    
      @Override
      public Status process() throws EventDeliveryException {
        Status status = null;
    
        // Start transaction
        Channel ch = getChannel();
        Transaction txn = ch.getTransaction();
        txn.begin();
        try {
          // This try clause includes whatever Channel operations you want to do
    
          Event event = ch.take();
    
          // Send the Event to the external repository.
          // storeSomeData(e);
    
          txn.commit();
          status = Status.READY;
        } catch (Throwable t) {
          txn.rollback();
    
          // Log exception, handle individual exceptions as needed
    
          status = Status.BACKOFF;
    
          // re-throw all Errors
          if (t instanceof Error) {
            throw (Error)t;
          }
        }
        return status;
      }
    }
    public class MySource extends AbstractSource implements Configurable, PollableSource {
      private String myProp;
    
      @Override
      public void configure(Context context) {
        String myProp = context.getString("myProp", "defaultValue");
    
        // Process the myProp value (e.g. validation, convert to another type, ...)
    
        // Store myProp for later retrieval by process() method
        this.myProp = myProp;
      }
    
      @Override
      public void start() {
        // Initialize the connection to the external client
      }
    
      @Override
      public void stop () {
        // Disconnect from external client and do any additional cleanup
        // (e.g. releasing resources or nulling-out field values) ..
      }
    
      @Override
      public Status process() throws EventDeliveryException {
        Status status = null;
    
        try {
          // This try clause includes whatever Channel/Event operations you want to do
    
          // Receive new data
          Event e = getSomeData();
    
          // Store the Event into this Source's associated Channel(s)
          getChannelProcessor().processEvent(e);
    
          status = Status.READY;
        } catch (Throwable t) {
          // Log exception, handle individual exceptions as needed
    
          status = Status.BACKOFF;
    
          // re-throw all Errors
          if (t instanceof Error) {
            throw (Error)t;
          }
        } finally {
          txn.close();
        }
        return status;
      }
    }
  • 相关阅读:
    [OpenJudge] 反正切函数的应用 (枚举)(数学)
    [OpenJudge] 摘花生 (模拟)
    [OpenJudge] 宇航员(模拟)
    [OpenJudge] 显示器(模拟)
    背包问题
    BFS_最短路径
    链表
    网站
    网站
    洛谷_递归整理
  • 原文地址:https://www.cnblogs.com/starcrm/p/11909979.html
Copyright © 2011-2022 走看看