zoukankan      html  css  js  c++  java
  • 【慕课网实战】Spark Streaming实时流处理项目实战笔记十四之铭文升级版

    铭文一级:

    第11章 Spark Streaming整合Flume&Kafka打造通用流处理基础

    streaming.conf

    agent1.sources=avro-source
    agent1.channels=logger-channel
    agent1.sinks=log-sink

    #define source
    agent1.sources.avro-source.type=avro
    agent1.sources.avro-source.bind=0.0.0.0
    agent1.sources.avro-source.port=41414

    #define channel
    agent1.channels.logger-channel.type=memory

    #define sink
    agent1.sinks.log-sink.type=logger

    agent1.sources.avro-source.channels=logger-channel
    agent1.sinks.log-sink.channel=logger-channel

    flume-ng agent
    --conf $FLUME_HOME/conf
    --conf-file $FLUME_HOME/conf/streaming.conf
    --name agent1
    -Dflume.root.logger=INFO,console


    java.lang.ClassNotFoundException: org.apache.flume.clients.log4jappender.Log4jAppender


    ./kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic streamingtopic


    streaming2.conf
    agent1.sources=avro-source
    agent1.channels=logger-channel
    agent1.sinks=kafka-sink

    #define source
    agent1.sources.avro-source.type=avro
    agent1.sources.avro-source.bind=0.0.0.0
    agent1.sources.avro-source.port=41414

    #define channel
    agent1.channels.logger-channel.type=memory

    #define sink
    agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
    agent1.sinks.kafka-sink.topic = streamingtopic
    agent1.sinks.kafka-sink.brokerList = hadoop000:9092
    agent1.sinks.kafka-sink.requiredAcks = 1
    agent1.sinks.kafka-sink.batchSize = 20

    agent1.sources.avro-source.channels=logger-channel
    agent1.sinks.kafka-sink.channel=logger-channel


    flume-ng agent
    --conf $FLUME_HOME/conf
    --conf-file $FLUME_HOME/conf/streaming2.conf
    --name agent1
    -Dflume.root.logger=INFO,console

    我们现在是在本地进行测试的,在IDEA中运行LoggerGenerator,
    然后使用Flume、Kafka以及Spark Streaming进行处理操作。

    在生产上肯定不是这么干的,怎么干呢?
    1) 打包jar,执行LoggerGenerator类
    2) Flume、Kafka和我们的测试是一样的
    3) Spark Streaming的代码也是需要打成jar包,然后使用spark-submit的方式进行提交到环境上执行
    可以根据你们的实际情况选择运行模式:local/yarn/standalone/mesos

    在生产上,整个流处理的流程都一样的,区别在于业务逻辑的复杂性

    铭文二级:

    第11章 Spark Streaming整合Flume&Kafka打造通用流处理基础

    Flume整合log4j日志:streaming.conf=>avro-memory-logger

    log4j.properties:需添加内容(上面四行即可):

    #...
    log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
    log4j.appender.flume.Hostname = example.com
    log4j.appender.flume.Port = 41414
    log4j.appender.flume.UnsafeMode = true
    
    # configure a class's logger to output to the flume appender
    log4j.logger.org.example.MyClass = DEBUG,flume
    #...
    

    加上log4j.propertied内容为:

    log4j.rootLogger=INFO,stdout,flume
    
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.target = System.out
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
    

      

      

    1.example.com改成hadoop000

    2.log4j.rootLogger=INFO,stdout  //右侧添加flume

    官网地址为:http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.5.0/FlumeUserGuide.html

    3.报错找不到类,根据内容添加依赖:

    org.apache.flume.flume-ng-clients

    flume-ng-log4jappender    //实际上打上这行就可以出现其他行

    1.6.0

    4.运行,若显示不全,将日志生成器的字符串减少一点

    ps:运行前可以将不必要的进程kill掉先

    Flume与Kafka整合=>

    启动zk、启动kafka

    修改类KafkaReceiverWordCount为KafkaStreamingApp

    ToDo内容改成count().print() //简便测试总数

    本地测试与生产环节使用拓展:

    即将KafkaStreamingApp打包!!

    第12章 Spark Streaming项目实战

    课程目录、需求说明 //前面已经提过

  • 相关阅读:
    MySql优化-你的SQL命中索引了吗
    php根据两点经纬度算距离
    二维数组排序
    php 求对数
    socket,websocket,socketio之间的关系
    PHP获取两个时间戳间的所有日期
    通过动画理解Raft公式算法
    Fabric创建通道、组织加入通道
    查看进程线程的方法
    Fabric添加节点
  • 原文地址:https://www.cnblogs.com/kkxwz/p/8392658.html
Copyright © 2011-2022 走看看