zoukankan      html  css  js  c++  java
  • FLUME安装&环境(二):拉取MySQL数据库数据到Kafka

    Flume安装成功,环境变量配置成功后,开始进行agent配置文件设置。

    1.agent配置文件(mysql+flume+Kafka)

    #利用Flume将MySQL表数据准实时抽取到Kafka

    a1.channels = c1

    a1.sinks = k1

    a1.sources = s1

    #sources(mysql)

    a1.sources.s1.type = org.keedio.flume.source.SQLSource

    a1.sources.s1.channels = c1

    a1.sources.s1.connection.url = jdbc:mysql://192.168.121.4:3306/alarm

    a1.sources.s1.user = root

    a1.sources.s1.password = root

    a1.sources.s1.table = alarm_query

    a1.sources.s1.columns.to.select = *

    a1.sources.s1.incremental.column.name = id

    a1.sources.s1.incremental.value = 0

    a1.sources.s1.run.query.delay=5000

    #source状态写入路径(必须存在且可写入)

    a1.sources.s1.status.file.path = /opt/apps/flume-1.6.0-cdh5.14.4-bin

    a1.sources.s1.status.file.name = sqlsource.status

    #channels(memory)

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    #sinks(kafka)

    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

    # kfk29,kfk30,kfk31对应主机hosts配置的Kafka主机

    a1.sinks.k1.brokerList= D-QP-Safe-4:9092, D-QP-Safe-5:9092, D-QP-Safe-6:9092

    a1.sinks.k1.topic=qpdy

    a1.sinks.k1.requiredAcks = 1

    a1.sinks.k1.batchSize = 2

    a1.sinks.k1.channel = c1

    2.配置准备

    2.1创建flume状态写入的文件夹和文件

    mkdir  /var/lib/flume

    vi s1.status

    给文件写入的权力 chmod 777 s1.status

    2.2将flume内存空间设置增大(开始时没有进行设置,结果报了内存溢出的错误)

    在flume启动脚本flume-ng中,修改JAVA_OPTS="-Xmx20m"为JAVA_OPTS="-Xmx10240m"

    此处将堆内存的阈值跳转到了10G,实际生产环境中可以根据具体的硬件情况作出调整

    2.3添加主机对应的kafka主机

    (flume.conf配置文件需要添加主机对应的Kafka主机,否则无法找到对应的sink)

    # vim /etc/hosts

    #添加主机对应的kafka主机

    192.168.241.229    D-QP-Safe-4

    192.168.241.230    D-QP-Safe-5

    192.168.241.231    D-QP-Safe-6

    2.4向flume安装目标的/lib目录下添加启动mysql,Kafka等的jar包

    3.启动flume

    要在flume的安装目录的bin目录下启动

    #启动命令

    flume-ng agent -c /opt/apps/flume-1.6.0-cdh5.14.4-bin/conf -f /opt/apps/flume-1.6.0-cdh5.14.4-bin/conf/flume.conf -n a1 -Dflume.root.logger=INFO,console

    a1为配置的agent名,-c和-f后是flume的安装路径(必须一致才能启动成功)

    4.查看flume进程

    ps -aux | grep flume

    如果存在多个进程必须将多余进程kill

    为了避免一个个的kill,我们需要提取flume的进程号:

    ps -aux | grep flume | awk '{print $2}'

    然后全部删除

    ps -aux | grep flume | awk '{print $2}' | xargs kill

    以上,拉取mysql数据库数据到Kafka就配置好了

  • 相关阅读:
    2015年秋季阅读计划
    课程改进意见
    第二阶段冲刺站立会议报告四
    第二阶段冲刺站立会议报告三(补)
    第二阶段冲刺站立会议报告二(补)
    第二阶段冲刺站立会议报告一(补)
    第一阶段冲刺站立会议报告四(补)
    第一阶段冲刺站立会议报告三(补)
    第一阶段冲刺站立会议报告二(补)
    第一阶段冲刺站立会议报告一(补)
  • 原文地址:https://www.cnblogs.com/koudaiyoutang/p/10115488.html
Copyright © 2011-2022 走看看