zoukankan      html  css  js  c++  java
  • 【慕课网实战】Spark Streaming实时流处理项目实战笔记四之铭文升级版

    铭文一级:

    需求二:
    Agent选型:exec source + memory channel + logger sink
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /home/hadoop/data/data.log
    a1.sources.r1.shell = /bin/sh -c

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    启动agent
    flume-ng agent
    --name a1
    --conf $FLUME_HOME/conf
    --conf-file $FLUME_HOME/conf/exec-memory-logger.conf
    -Dflume.root.logger=INFO,console


    需求三:
    技术选型:
    exec source + memory channel + avro sink
    avro source + memory channel + logger sink

    exec-memory-avro.conf

    exec-memory-avro.sources = exec-source
    exec-memory-avro.sinks = avro-sink
    exec-memory-avro.channels = memory-channel

    exec-memory-avro.sources.exec-source.type = exec
    exec-memory-avro.sources.exec-source.command = tail -F /home/hadoop/data/data.log
    exec-memory-avro.sources.exec-source.shell = /bin/sh -c

    exec-memory-avro.sinks.avro-sink.type = avro
    exec-memory-avro.sinks.avro-sink.hostname = hadoop000
    exec-memory-avro.sinks.avro-sink.port = 44444

    exec-memory-avro.channels.memory-channel.type = memory

    exec-memory-avro.sources.exec-source.channels = memory-channel
    exec-memory-avro.sinks.avro-sink.channel = memory-channel

    avro-memory-logger.conf
    avro-memory-logger.sources = avro-source
    avro-memory-logger.sinks = logger-sink
    avro-memory-logger.channels = memory-channel

    avro-memory-logger.sources.avro-source.type = avro
    avro-memory-logger.sources.avro-source.bind = hadoop000
    avro-memory-logger.sources.avro-source.port = 44444

    avro-memory-logger.sinks.logger-sink.type = logger

    avro-memory-logger.channels.memory-channel.type = memory

    avro-memory-logger.sources.avro-source.channels = memory-channel
    avro-memory-logger.sinks.logger-sink.channel = memory-channel

    先启动avro-memory-logger
    flume-ng agent
    --name avro-memory-logger
    --conf $FLUME_HOME/conf
    --conf-file $FLUME_HOME/conf/avro-memory-logger.conf
    -Dflume.root.logger=INFO,console


    flume-ng agent
    --name exec-memory-avro
    --conf $FLUME_HOME/conf
    --conf-file $FLUME_HOME/conf/exec-memory-avro.conf
    -Dflume.root.logger=INFO,console

    第四章:分布式发布订阅消息系统Kafka

    Kafka概述
    和消息系统类似

    消息中间件:生产者和消费者

    妈妈:生产者
    你:消费者
    馒头:数据流、消息

    正常情况下: 生产一个 消费一个
    其他情况:
    一直生产,你吃到某一个馒头时,你卡主(机器故障), 馒头就丢失了
    一直生产,做馒头速度快,你吃来不及,馒头也就丢失了

    拿个碗/篮子,馒头做好以后先放到篮子里,你要吃的时候去篮子里面取出来吃

    篮子/框: Kafka
    当篮子满了,馒头就装不下了,咋办?
    多准备几个篮子 === Kafka的扩容

    Kafka架构
    producer:生产者,就是生产馒头(老妈)
    consumer:消费者,就是吃馒头的(你)
    broker:篮子
    topic:主题,给馒头带一个标签,topica的馒头是给你吃的,topicb的馒头是给你弟弟吃

    铭文二级:

    需求二=>

    监听某个文件实时采集新增的数据输出到控制台

    Agent的选型(exec source:监听文件; avro sink:跨服务器节点):

    配置文件一:exec-memory-avro.conf(运行时启动)

    配置文件二:avro-memory-logger.conf(运行时启动)

    步骤:

    1、先创建一个文件=>

    touch ~/app/data/data.log

    2、修改配置文件=>

    配置文件一(注意agent、source、channel、sink的名字需要修改,不可以跟前面配置的a1、r1、k1、c1一样):

    exec source:需要配置 type:exec 与command:tail -F /home/hadoop/data/data.log

    avro sink:需要配置 type:avro 与hostname:hadoop000 和port:44444

    配置文件二:

    avro source:需要配置 type:avro 与bind:hadoop000 和port:44444

    3、启动配置文件=>

    开启两个终端:先启动后面有东西装的、再启动前面的

    4、在第三个终端进入data文件夹:

    echo "hello" >> data.log

    echo "world" >> data.log

    5、观察avro source终端可看到有内容输出(输入内容少时有一点点延迟是因为memory有大小与时间限制)

    第四章:分布式发布订阅消息系统Kafka

    Kafka四个核心概念:producer、consumer、broker、topic

    三种模式:单节点单Broker、单节点多Broker、多节点多Broker

    因为Kafka运行在zooKeeper上,所以需先装zooKeeper(wget CDH5即可)

    1.创建临时文件目录 mkdir /home/hadoop/app/tmp/zk(如用默认的每次启动文件会丢失)

    2.配置好环境变量后需修改conf文件夹下的配置文件:cp zoo.sample.cfg zoo.cfg

    dataDir=/home/hadoop/tmp/zk

    3.进入bin目录执行:./zkServer.sh start

    4.联上客户端(在当前终端执行):./zkCli.sh 

    然后执行可查所开启的进程:jps

    执行:ls /(可查看详细目录与内容)

  • 相关阅读:
    linux gcc安装
    重装win7后如何恢复ubuntu引导
    Eclipse搭建Android开发环境(安装ADT,Android4.4.2)
    mysql变量使用总结
    最快得到MYSQL两个表的差集
    mysqldb
    更改时间 (时分秒)
    使用命令转移文件
    报喜啦~过了!
    Jmeter接口测试示例
  • 原文地址:https://www.cnblogs.com/kkxwz/p/8351452.html
Copyright © 2011-2022 走看看