zoukankan      html  css  js  c++  java
  • flume简介安装及高可用集群部署

    一:flume简介及安装

    Flume学习笔记:Flume的安装与基础应用

    Flume学习之路 (一)Flume的基础介绍

    Flume学习笔记:Flume集群的Avro RPC实现

    Flume学习之路 (二)Flume的Source类型

    Flume学习之路 (三)Flume的配置方式

    flume拦截器

    Flume学习系列(四)---- Interceptors(拦截器)

    Flume中的HDFS Sink配置

    Flume常见错误整理

    (一)flume连接Hadoop集群

    1.启动Hadoop集群

    1)分布式集群HA模式部署:https://www.cnblogs.com/ssyfj/p/12369486.html
    2)若是启动过程出现下述问题:retry.RetryInvocationHandler (RetryInvocationHandler.java:invoke则重启zkfc  ----   hadoop-daemon.sh start zkfc

    2.配置flume

    1)将hadoop根目录下的share/hadoop/common/下的jar包、以及share/hadoop/common/lib下的jar包拷贝到flume根目录的lib目录下
    (2)将hadoop根目录下的share/hadoop/hdfs/下的jar包、以及share/hadoop/hdfs/lib下的jar包拷贝到flume根目录的lib目录下
    (3)将hadoop根目录下的etc/hadoop/下的core-site.xml和hdfs-site.xml拷贝到flume根目录下的conf目录下

    3.编写flume配置文件

    #agent1
    #name the component on this agent
    a1.sources = r1
    a1.channels = c1
    a1.sinks = s1
    #config source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/hadoop/App/apache-flume-1.6.0-bin/SourceData
    a1.sources.r1.channels = c1
    #config channel --- file
    a1.channels.c1.type = file
    a1.channels.c1.dataDirs = /home/hadoop/App/apache-flume-1.6.0-bin/ChannelData
    #config sink
    a1.sinks.s1.type = hdfs
    a1.sinks.s1.channel = c1
    a1.sinks.s1.hdfs.path = hdfs://ns1/flume/logdfs/%y-%m-%d/%H%M/%S
    a1.sinks.s1.hdfs.round = true
    a1.sinks.s1.hdfs.roundValue = 10
    a1.sinks.s1.hdfs.roundUnit = minute
    a1.sinks.s1.hdfs.useLocalTimeStamp = true
    a1.sinks.s1.hdfs.fileType = DataStream

    4.启动flume

    flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agentdfs.conf --name a1 -Dflume.root.logger=INFO,console

    5.上传文件到spoolDir

    6.hdfs系统中查看文件(发现是flume采集的数据)

    7.补充:采用的文件系统存储channel数据,是将数据持久化到磁盘中,所以我们可以在指定的目录下查看信息

    (二)flume连接Kafka集群

    1.先启动zookeeper集群

    2.进入Kafka根目录,启动Kafka进程

    bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

    3.生成一个topic,用于从flume获取数据

    bin/kafka-topics.sh --create --zookeeper hadoopH5:2181 --replication-factor 3 --partitions 1 --topic flume

    只需要在任意一台中创建,后面会自动同步

    4.编写flume配置文件 

    #agent1
    #name the component on this agent
    a1.sources = r1
    a1.channels = c1
    a1.sinks = s1
    #config source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/hadoop/App/apache-flume-1.6.0-bin/SourceData
    a1.sources.r1.channels = c1
    #config channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity = 100
    #config sink
    a1.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.s1.topic = flume
    a1.sinks.s1.brokerList =hadoopH5:9092,hadoopH6:9092,hadoopH7:9092  #注意:主机名我们是在hosts文件中添加映射过的
    a1.sinks.s1.channel = c1

    5.启动flume

    flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agentkafka.conf --name a1 -Dflume.root.logger=INFO,console

    6.拷贝文件到spoolDir中

    7.开启Kafka消费者端,读取topic:flume数据

    bin/kafka-console-consumer.sh --zookeeper hadoopH5:2181 --from-beginning --topic flume

    二:flume分布式安装

    flume高可用集群安装

    Flume NG高可用集群搭建详解(基于flume-1.7.0)

    flume集群高可用连接kafka集群

    Flume架构

    基于Flume的日志收集系统(一)架构和设计

    (一)flume分布式简单实现

    第一个是agent1,第二个是agent2

    (二)配置文件修改

    agent1.conf

    #agent1
    #name the component on this agent
    a1.sources = r1
    a1.channels = c1
    a1.sinks = s1
    #config source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/hadoop/App/apache-flume-1.6.0-bin/data
    a1.sources.r1.channels = c1
    #config channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity = 100
    #config sink
    a1.sinks.s1.type = avro
    a1.sinks.s1.channel = c1
    a1.sinks.s1.hostname = hadoopH2
    a1.sinks.s1.port = 4141

    agent2.conf

    #agent1
    #name the component on this agent
    a1.sources = r1
    a1.channels = c1
    a1.sinks = s1
    #config source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = 0.0.0.0  #绑定本机的ip和端口,进行监听
    a1.sources.r1.port = 4141
    a1.sources.r1.channels = c1
    #config channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity = 100
    #config sink
    a1.sinks.s1.type = logger
    a1.sinks.s1.channel = c1

    (三)启动flume

    1.先启动后面的节点agent2

    flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agent2.conf --name a1 -Dflume.root.logger=INFO,console

    2.后启动agent1

    flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agent1.conf --name a1 -Dflume.root.logger=INFO,console

    (四)flume数据采集测试

     1.向指定目录下传送文件

    2.agent2查看采集的数据

    三:分布式flume集群对比(NG、OG)

    (一)Flume简介

    Flume NG是一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合,最后存储到一个中心化数据存储系统中,方便进行数据分析。事实上flume也可以收集其他信息,不仅限于日志。由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本。相比较而言,flume NG更简单更易于管理操作。

    总之:Flume NG相比于Flume OG更好

    (二)flume OG分布式集群组件---更符合我们之前学的分布式系统(用于理解,不进行实现)

    OG有三个组件agent、collector、master,agent主要负责收集各个日志服务器上的日志,将日志聚合到collector,可设置多个collector,master主要负责管理agent和collector,最后由collector把收集的日志写的HDFS中,当然也可以写到本地、给storm、给Hbase。

    (三)flume NG---不需要master节点,系统整体实现负载均衡

    NG最大的改动就是不再有分工角色设置,所有的都是agent,可以彼此之间相连,多个agent连到一个agent,此agent也就相当于collector了,NG也支持负载均衡.

    四:高可用flume分布式系统集群安装(NG)

    防止一个agent失败,导致数据采集失败:

    控制Source的链接数:Source无法同时保持很大的连接数。分组实现

     控制Sink的链接数:每个Agent Sink都会和数据库保持一个连接,当Agent变多时,连接数最终会超过数据库的限制。分层实现

    这里采用该拓扑图进行集群构建。注意:数据只是在一条链路中传送,如果某个节点出现故障,会按照优先级选择一条其他路径传递数据,避免了链路数据冗余 

    (一)集群节点规划

    (二)启动zookeeper集群(同上)

    (三)启动hdfs系统(同上)

    (四)flume配置文件编写---数据源agent(注意:编写的代码后面不要加空格和注释)

    1.在hadoopH3中编写agentdata.conf

    #agent 在数据源节点上,这里是在hadoopH3上
    ag.sources = r1
    ag.channels = c1
    ag.sinks = k1 k2 k3 k4
    #set group
    ag.sinkgroups = g1
    #set source
    ag.sources.r1.channels = c1
    ag.sources.r1.type = spooldir
    ag.sources.r1.spoolDir = /home/hadoop/App/apache-flume-1.6.0-bin/data
    #set channel
    ag.channels.c1.type = memory
    ag.channels.c1.capacity = 1000
    ag.channels.c1.transactionCapacity = 100
    #set sink1  #配置agent1主机
    ag.sinks.k1.channel = c1
    ag.sinks.k1.type = avro
    ag.sinks.k1.hostname = hadoopH1
    ag.sinks.k1.port = 6666
    #set sink2  #配置agent2主机
    ag.sinks.k2.channel = c1
    ag.sinks.k2.type = avro
    ag.sinks.k2.hostname = hadoopH2
    ag.sinks.k2.port = 6666
    #set sink2  #配置agent3主机
    ag.sinks.k3.channel = c1
    ag.sinks.k3.type = avro
    ag.sinks.k3.hostname = hadoopH3
    ag.sinks.k3.port = 6666
    #set sink2  #配置agent4主机
    ag.sinks.k4.channel = c1
    ag.sinks.k4.type = avro
    ag.sinks.k4.hostname = hadoopH4
    ag.sinks.k4.port = 6666
    #set sink group
    ag.sinkgroups.g1.sinks = k1 k2 k3 k4
    ag.sinkgroups.g1.processor.type = failover
    ag.sinkgroups.g1.processor.priority.k1 = 10
    ag.sinkgroups.g1.processor.priority.k2 = 9
    ag.sinkgroups.g1.processor.priority.k3 = 5
    ag.sinkgroups.g1.processor.priority.k4 = 4
    ag.sinkgroups.g1.processor.maxpenalty = 10000

    2.在hadoopH4中编写agentdata.conf

    #agent 在数据源节点上,这里是在hadoopH4上
    ag.sources = r1
    ag.channels = c1
    ag.sinks = k1 k2 k3 k4
    #set group
    ag.sinkgroups = g1
    #agent 在数据源节点上,这里是在hadoopH4上
    ag.sources = r1
    ag.channels = c1
    ag.sinks = k1 k2 k3 k4
    #set group
    ag.sinkgroups = g1
    #set source
    ag.sources.r1.channels = c1
    ag.sources.r1.type = spooldir
    ag.sources.r1.spoolDir = /home/hadoop/App/apache-flume-1.6.0-bin/data
    #set channel
    ag.channels.c1.type = memory
    ag.channels.c1.capacity = 1000
    ag.channels.c1.transactionCapacity = 100
    #set sink1  #设置agent1主机
    ag.sinks.k1.channel = c1
    ag.sinks.k1.type = avro
    ag.sinks.k1.hostname = hadoopH1
    ag.sinks.k1.port = 6666
    #set sink2  #设置agent2主机
    ag.sinks.k2.channel = c1
    ag.sinks.k2.type = avro
    ag.sinks.k2.hostname = hadoopH2
    ag.sinks.k2.port = 6666
    #set sink2  #设置agent3主机
    ag.sinks.k3.channel = c1
    ag.sinks.k3.type = avro
    ag.sinks.k3.hostname = hadoopH3
    ag.sinks.k3.port = 6666
    #set sink2  #设置agent4主机
    ag.sinks.k4.channel = c1
    ag.sinks.k4.type = avro
    ag.sinks.k4.hostname = hadoopH4
    ag.sinks.k4.port = 6666
    #set sink group
    ag.sinkgroups.g1.sinks = k1 k2 k3 k4
    ag.sinkgroups.g1.processor.type = failover
    ag.sinkgroups.g1.processor.priority.k1 = 5
    ag.sinkgroups.g1.processor.priority.k2 = 4
    ag.sinkgroups.g1.processor.priority.k3 = 10
    ag.sinkgroups.g1.processor.priority.k4 = 9
    ag.sinkgroups.g1.processor.maxpenalty = 10000

    (五)flume配置文件编写---数据采集agent

    1.在hadoopH1、hadoopH2中配置agent1.conf

    #agent信息采集节点 agent1 agent2
    ag.sources = r1
    ag.channels = c1
    ag.sinks = k1
    #set source
    ag.sources.r1.channels = c1
    ag.sources.r1.type = avro
    ag.sources.r1.bind = 0.0.0.0
    ag.sources.r1.port = 6666
    #set channel
    ag.channels.c1.type = memory
    ag.channels.c1.capacity = 1000
    ag.channels.c1.transactionCapacity = 100
    #set sink1
    ag.sinks.k1.channel = c1
    ag.sinks.k1.type = avro
    #设置controller主机 ag.sinks.k1.hostname
    = hadoopH3 ag.sinks.k1.port = 8888

    2.在hadoopH1、hadoopH2中配置agent1.conf

    #agent信息采集节点 agent3 agent4
    ag.sources = r1
    ag.channels = c1
    ag.sinks = k1
    #set source
    ag.sources.r1.channels = c1
    ag.sources.r1.type = avro
    ag.sources.r1.bind = 0.0.0.0
    ag.sources.r1.port = 6666
    #set channel
    ag.channels.c1.type = memory
    ag.channels.c1.capacity = 1000
    ag.channels.c1.transactionCapacity = 100
    #set sink1
    ag.sinks.k1.channel = c1
    ag.sinks.k1.type = avro
    #设置controller主机
    ag.sinks.k1.hostname = hadoopH4
    ag.sinks.k1.port = 8888

    (六)flume配置文件编写---数据控制controller

    1.在hadoopH3、hadoopH4中配置controller.conf

    #controller节点,进行数据读取和写入
    ag.sources = r1
    ag.channels = c1
    ag.sinks = k1
    #set source
    ag.sources.r1.channels = c1
    ag.sources.r1.type = avro
    ag.sources.r1.bind = 0.0.0.0
    ag.sources.r1.port = 8888
    #set channel
    ag.channels.c1.type = memory
    ag.channels.c1.capacity = 1000
    ag.channels.c1.transactionCapacity = 100
    #set sink1
    ag.sinks.k1.type = hdfs
    ag.sinks.k1.channel = c1
    ag.sinks.k1.hdfs.path = hdfs://ns1/flume/logdfs/%y-%m-%d/%H%M/%S
    ag.sinks.k1.hdfs.round = true
    ag.sinks.k1.hdfs.roundValue = 10
    ag.sinks.k1.hdfs.roundUnit = minute
    ag.sinks.k1.hdfs.useLocalTimeStamp = true
    ag.sinks.k1.hdfs.fileType = DataStream

    (七)启动flume集群

    1.启动hadoopH3、hadoopH4主机中controller节点

    flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./controller.conf --name ag -Dflume.root.logger=INFO,console

    2.启动hadoopH1、hadoopH2、hadoopH3、hadoopH4主机中的agent采集节点

    flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agent1.conf --name ag -Dflume.root.logger=INFO,console

    3.启动hadoopH3、hadoopH4主机的数据源点采集节点agentdata.conf

    flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agentdata.conf --name ag -Dflume.root.logger=INFO,console

    (八)数据采集测试

    1.拷贝数据到spoolDir目录下

    2.查看hdfs系统目录

  • 相关阅读:
    ref和out的区别
    final、finally、finalize的区别
    get和post的区别
    什么是事务
    锁的详细
    什么是游标
    什么是存储过程
    委托的来由
    多线程的秘密
    String str=null; 和String str=""的区别
  • 原文地址:https://www.cnblogs.com/ssyfj/p/12444999.html
Copyright © 2011-2022 走看看