zoukankan      html  css  js  c++  java
  • Flume学习

    【下载&安装】

    https://flume.apache.org/download.html

    其中的bin文件下载下来直接tar解压就可以使用的安装包,src是源码包,可供开发者进行二次开发。

    【开发&文档】

    使用者文档:https://flume.apache.org/FlumeUserGuide.html

    开发者文档:https://flume.apache.org/FlumeDeveloperGuide.html

    使用者文档里面介绍source,chanel,sink等各种方式的配置参数,开发者文档则介绍了client,sink,chanel,source等开发的模板。

    【单机配置】

    # example.conf: A single-node Flume configuration
    # 本地netcat模式,调试使用telnet localhost 44444 # Name the components on this agent a1.sources
    = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
    # example.conf: A single-node Flume configuration
    # 本地spooldir模式 # Name the components on this agent a1.sources
    = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/yimiao/test/logs # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

    【agent - collent模式】

    #推数据代理的配置文件push.conf  
    #Name the components on this agent  
    a2.sources= r1
    a2.sinks= k1
    a2.channels= c1
    
    #Describe/configure the source  
    a2.sources.r1.type= spooldir
    a2.sources.r1.spoolDir= /home/yimiao/test/logs
    a2.sources.r1.channels= c1
    
    #Use a channel which buffers events in memory  
    a2.channels.c1.type= memory
    a2.channels.c1.keep-alive= 10
    a2.channels.c1.capacity= 100000
    a2.channels.c1.transactionCapacity= 100000
    
    #Describe/configure the source  
    a2.sinks.k1.type= avro
    a2.sinks.k1.channel= c1
    a2.sinks.k1.hostname= 192.168.81.131
    a2.sinks.k1.port= 44444
    #汇总数据代理的配置文件pull.conf  
    #Name the components on this agent  
    a1.sources= r1
    a1.sinks= k1
    a1.channels= c1
    
    #Describe/configure the source  
    a1.sources.r1.type= avro
    a1.sources.r1.channels= c1
    a1.sources.r1.bind= 192.168.81.131
    a1.sources.r1.port= 44444
    
    #Describe the sink  
    a1.sinks.k1.type= logger
    a1.sinks.k1.channel = c1
    
    #Use a channel which buffers events in memory  
    a1.channels.c1.type= memory
    a1.channels.c1.keep-alive= 10
    a1.channels.c1.capacity= 100000
    a1.channels.c1.transactionCapacity= 100000

    【整合HDFS】

    # example.conf: A single-node Flume configuration
    # 需要配置HADOOP_HOME,否则启动报错 # Name the components on this agent a1.sources
    = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink #a1.sinks.k1.type = logger a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://singlenode:9000/flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.hdfs.filePrefix = events-
    #这项必须加,否则会报错 a1.sinks.k1.hdfs.useLocalTimeStamp = true
    #定期生成文件,10分钟一个新文件 a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000

    【复制模式,一个event被复制到多个channel】

    #配置文件:replicate_source_case11.conf  
    # Name the components on this agent  
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1 c2
    
    # Describe/configure the source  
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir= /home/yimiao/test/logs
    a1.sources.r1.selector.type = replicating
    a1.sources.r1.channels = c1 c2
    
    # Describe the sink  
    a1.sinks.k1.type = avro
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hostname = 192.168.81.131
    a1.sinks.k1.port = 50000
    
    a1.sinks.k2.type = avro
    a1.sinks.k2.channel = c2
    a1.sinks.k2.hostname = 192.168.81.131
    a1.sinks.k2.port = 50001
    # Use a channel which buffers events inmemory  
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100
    #配置文件:replicate_sink1_case11.conf  
    # Name the components on this agent  
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    
    # Describe/configure the source  
    a2.sources.r1.type = avro
    a2.sources.r1.channels = c1
    a2.sources.r1.bind = 192.168.81.131
    a2.sources.r1.port = 50000
    
    # Describe the sink  
    a2.sinks.k1.type = logger
    a2.sinks.k1.channel = c1
    
    # Use a channel which buffers events inmemory  
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    #配置文件:replicate_sink2_case11.conf  
    # Name the components on this agent  
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c1
    
    # Describe/configure the source  
    a3.sources.r1.type = avro
    a3.sources.r1.channels = c1
    a3.sources.r1.bind = 192.168.81.131
    a3.sources.r1.port = 50001
    
    # Describe the sink  
    a3.sinks.k1.type = logger
    a3.sinks.k1.channel = c1
    
    # Use a channel which buffers events inmemory  
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100

    【复用模式,event被路由到特定的channel,轮询算法】

    #配置文件:multi_source_case12.conf  
    a1.sources= r1
    a1.sinks= k1 k2
    a1.channels= c1 c2
    
    #Describe/configure the source  
    a1.sources.r1.type= org.apache.flume.source.http.HTTPSource
    a1.sources.r1.port= 50000
    a1.sources.r1.host= 192.168.81.130
    a1.sources.r1.selector.type= multiplexing
    #配置文件:multi_source_case12.conf  
    a1.sources= r1
    a1.sinks= k1 k2
    a1.channels= c1 c2
    
    #Describe/configure the source  
    a1.sources.r1.type= org.apache.flume.source.http.HTTPSource
    a1.sources.r1.port= 50000
    a1.sources.r1.host= 192.168.81.130
    a1.sources.r1.selector.type= multiplexing
    a1.sources.r1.channels= c1 c2
    
    a1.sources.r1.selector.header= state
    a1.sources.r1.selector.mapping.CZ= c1
    a1.sources.r1.selector.mapping.US= c2
    a1.sources.r1.selector.default= c1
    
    #Describe the sink  
    a1.sinks.k1.type= avro
    a1.sinks.k1.channel= c1
    a1.sinks.k1.hostname= 192.168.81.131
    a1.sinks.k1.port= 50000
    
    a1.sinks.k2.type= avro
    a1.sinks.k2.channel= c2
    a1.sinks.k2.hostname= 192.168.81.131
    a1.sinks.k2.port= 50001
    # Usea channel which buffers events in memory  
    a1.channels.c1.type= memory
    a1.channels.c1.capacity= 1000
    a1.channels.c1.transactionCapacity= 100
    
    a1.channels.c2.type= memory
    a1.channels.c2.capacity= 1000
    a1.channels.c2.transactionCapacity= 100

     【自定义开发 - sink】

    新建Maven项目

    pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>org.apache.flume.flume-dev-sinks</groupId>
      <artifactId>flume-dev-mysql-sink</artifactId>
      <version>1.0</version>
      <name>Flume Mysql Sink</name>
      <description>Flume Mysql Sink</description>
      
      <dependencies>
        <dependency>
          <groupId>org.apache.flume</groupId>
          <artifactId>flume-ng-sdk</artifactId>
          <version>1.6.0</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flume</groupId>
          <artifactId>flume-ng-core</artifactId>
          <version>1.6.0</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flume</groupId>
          <artifactId>flume-ng-configuration</artifactId>
          <version>1.6.0</version>
        </dependency>
    
      </dependencies>
      
    </project>

    MysqlSink.java

    package com.hejia.flume.sink;
    
    import org.apache.flume.Channel;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.Transaction;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    
    public class MysqlSink extends AbstractSink implements Configurable {
        
        private String myProp;
    
        public void configure(Context context) {
            // TODO Auto-generated method stub
            String myProp = context.getString("myProp", "defaultValue");
            this.myProp = myProp;
        }
        
        public Status process() throws EventDeliveryException {
            // TODO Auto-generated method stub
            
            Status status = null;  
            Channel ch = getChannel();
            Transaction txn = ch.getTransaction();
            txn.begin();
            
            try {
                Event event = ch.take();
                
                //System.out.println("============= *Hejia Yimiao Test* ================");
                //System.out.println(new String(event.getBody()));
                
                txn.commit();
                status = Status.READY;
                
            } catch (Throwable t) {
                // TODO: handle exception
                txn.rollback();
                
                status = Status.BACKOFF;
                
                if (t instanceof Error) {
                    throw (Error)t;
                }
            }finally {  
                txn.close();  
            } 
            
            return status;
        }
        
        @Override
        public void start() {
            
        }
        
        @Override
        public void stop() {
            
        } 
    
    }

    代码写完以后,执行maven install,会生成相应的 flume-dev-mysql-sink-1.0.jar

    将生成的jar包拷贝到flume的安装目录下的lib文件夹内

    然后配置自定义的conf文件

    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    #a1.sinks.k1.type = logger
    a1.sinks.k1.type = com.hejia.flume.sink.MysqlSink
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    其他开发模板,官方文档里面都有,可以自行研究下。至此flume入门学习开始了。

  • 相关阅读:
    Redis面试题(46题)
    公共组件及脚手架webpack模板
    css3中@font-face模块自定义字体
    字段加密实践(django-fernet-fields)
    django导入导出excel实践
    vue-loader和单页组件介绍
    Axios介绍和使用
    微服务架构理解及微服务架构局限性
    v-model的双向数据绑定(表单)
    eureka集群
  • 原文地址:https://www.cnblogs.com/yimiao/p/6125076.html
Copyright © 2011-2022 走看看