zoukankan      html  css  js  c++  java
  • 《OD学Flume》20160806Flume和Kafka

    一、Flume

    http://flume.apache.org/FlumeUserGuide.html

    Flume是一个分布式的,可靠的,可用的,非常有效率的对大数据量的日志数据进行收集、聚集、移动信息的服务。

    1. 架构方式

    1)所有应用使用一台flume服务器;

    2)所有应用共享flume集群;

    3)每个应用使用一台flume,然后使用一个flume节点收集分散的flume数据;

    2. flume组件

    1)启动的每个flume进程(jvm进程),称为agent

    每个flume agent:

    source:与外部数据源结合,获取或者接收数据

    (1)source主动从外部数据源上获取数据

    (2)source接收外部数据源发送过来的数据

    channel:缓存,类似队列,先进先出

    (1)容错: File channel

    (2)缓冲

    sink:从channel里面获取数据,发送出去

    flume:就是简单地选择合适的source、channel、sink类型

    二、安装部署

    1)下载安装包

    2)解压安装

    3)修改配置文件

    flume-env.sh.template  -> flume-env.sh.template

    JAVA_HOME

    JAVA_OPTS

    4)说明

    (1)agent里面的sink,只能从一个channel里面获取数据

    (2)source可以将数据发送到多个chanel

    (3)channel可以让多个sink来获取数据

    5)官方示例 http://flume.apache.org/FlumeUserGuide.html

    (1)在conf目录下创建agent子目录,配置example01.conf

    (2)如何启动flume agent

    进入$FLUME_HOME

    --conf  指定flume进程的配置

    --conf-file指定flume-agent的配置

    bin/flume-ng agent -n a1 -c conf/ -f conf/agents/example01.conf -Dflume.root.looger=INFO,CONSOLE

    netstat -tlnup | grep 44444

    测试结果: nc hostanme 44444 

    sudo yum -y install nc

    如果nc命令不存在,则安装nc命令

    socket client 

    netcat source 44444

    memory channel

    logger sink

    6)常用的source、channel、sink类型

    source:

    (1)avro source:

    avro 二进制数据传输协议

    protobuf

    json

    xml

    (2)netcat source

    (3)http source

    (4)spooldir source: 监听本地文件系统的目录

    (5)exec source: tail -F 监听本地文件系统的一个文件

    channel:

    (1)memory channel: 快、agent服务器重启时数据会丢失

    (2)file channel:慢、可以保证数据不会被丢失;会记录数据以及检查点checkpoint(记录上次处理后截止的地方,sink上一次从哪个位置获取数据)

    sink:

    (1)logger sink

    (2)hdfs sink: 上传数据到hdfs

    (3)kafka sink:将数据传送到kafka上

    (4)avro sink:连接不同的agent

    (5)file sink

    avro sink -> avro source

    7)案例avro source + file channel + hdfs sink

    如果hdfs目录使用了时间变量的话,一定要将useLocalTimeStamp值设为true

    hdfs.rollInterval: 间隔一定时间生成新的文件

    hdfs.roollSize: 接收到一定大小数据量写到新的文件

    hdfs.roolCount: 次数

    配置:以上三个互相之间是不兼容的

    将其中两个值设为0,启用第三个

    secondaryname 从namenode节点上拿edits文件进行合并:

    edits文件大小

    bin/flume-ng agent -n a1 -c conf/ -f conf/agents/example02.conf -Dflume.root.looger=INFO,CONSOLE

    bin/flume-ng avro-client help

    8)案例spool source + memory channel + hdfs sink

    flume数据在source/channel/sink之间传输的形式:

    event

    headers: 用来路由,根据规则多路复用

    body:

    注意:

    (1)spooldir监听的目录一定要先存在,而且要有访问权限

    (2)监听的目录里面文件名不能重复

    生产上每天要上传相同的文件到hdfs上,解决办法就是在原来的文件名称后面加个时间戳或者其他唯一字符串。

    9)exec source + memory channel  + logger sink

    nginx服务器:

    下载安装包,上传到服务器;

    解压,三步安装

    预编译

    编译

    编译安装

    添加nginx服务,先在

    /etc/init.d/目录下创建一个空的nginx文件,然后将windows上nginx文件内容复制进去,保存修改权限755

    sudo service nginx start 启动

    配置nigin日志数据接口地址:

    kill -9 `ps -ef | grep flume | awk '{print $2}' | head -n 1`

    curl -X POST -d 'content' host 

    http soure:

    http header ---> flume events headers

    http body  ----> flume events body

    10)

    failover容错模式(负载均衡模式):一个event只会进入其中一个channel里面,配置agent的时候,给每个sink一个权重值。

    多路复用模式:路由,根据event headers中的某个key值来选择某个channel;

    二、Kafka

    1. 分布式消息订阅发布系统

    消息队列

    J2EE 消息总线 esb

    消息队列: rabbitMQ activeMQ zeroMQ

    消息生产者:

    消息消费者:

    RPC服务: 通过消息队列

    服务提供方作为消费的消费者

    服务请求方作为消息的生产者,request请求

    服务请求方与服务提供方之间无需互相知道,进行解耦

    分布式部署

    结合实时流处理框架,消费者:storm、spark streaming流式处理框架

    kafka将数据写入磁盘上(顺序读写),随机读写(寻道时间)

    kafka消息队列与传统rabbitmq:

    log.dirs

    2. kafka安装部署

    http://kafka.apache.org/downloads.html

    5)检查运行

    producer:

    consumer:

    (1)创建topic:test

    ./kafka-topics.sh --create --zookeeper host --topic test --partitions 1 --

    kafka-topics.sh

    kafka中topic:逻辑概念

    物理上:每个topic在kafka集群上存储,以分区存储

    partition broker节点磁盘上一段连续存储空间

    备份数:跨节点备份

    每个分区的信息:

    Leader对于备份来说

    对于一条信息的多个备份,存储在不同的broker

    Isr 指定某个分区的备份在哪些broker上面

    Isr: 0 1 2

    Leader对写有影响:生产者往toipc发布消息,要确认是否发布成功,有三种确认机制

    参数: requireAcks

    不确认,只管发布消息    0

    只确认是否在Leader上发布成功   1

    确认所有备份都发布成功  -1

    (2)使用producer往topic发布消息

    producer:往往是由外部需要使用kafka集群的系统来编写

    flume:kafkasink

    log.dirs目录

    offset:偏移量

    消息在kafka存储位置的确认:topic、partition的位置、偏移量

    分区中存储的消息文件:

    test-0 test-1

    topic名称-partition编号

    xxxxx.index 索引文件

    xxxxx.log 数据文件

    partition: segment

    (3)使用consumer从topic消费消息

    外部谁要从kafka消费消息,由负责consumer编写

    storm: storm-kafka项目

    spark-streaming: spark kafaka消费者

    flume: kafka source

    ./kafka-console-consumer.sh --zookeeper host:2181 --topic test --from-beginning

    zookeeper:记录消费者上次消费者的位置offset,记录了集群的broker位置

    三、Kafka继承Flume

    1、kafka sink

    (1)netcat source + memory channel + kafka sink

    flume:

    先编写agent配置文件

    kafka消息保留时间,默认 7天 7* 24h = 168h

    2、kafka source(很少使用,了解)

  • 相关阅读:
    近来几个有用的网站
    军事视频网站
    美军武器命名
    区块链的五个关键要素
    处理多媒体的两个重要工具
    Python re模块将字符串分割为列表
    Python 自动刷新网页
    selenium:chromedriver与chrome版本的对应关系
    怎么批量删除QQ空间说说?
    ssm获取数据库名称
  • 原文地址:https://www.cnblogs.com/yeahwell/p/5743402.html
Copyright © 2011-2022 走看看