zoukankan      html  css  js  c++  java
  • 【慕课网实战】Spark Streaming实时流处理项目实战笔记十三之铭文升级版

    铭文一级:

    第10章 Spark Streaming整合Kafka

    spark-submit
    --class com.imooc.spark.KafkaReceiverWordCount
    --master local[2]
    --name KafkaReceiverWordCount
    --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0
    /home/hadoop/lib/sparktrain-1.0.jar hadoop000:2181 test kafka_streaming_topic 1


    spark-submit
    --class com.imooc.spark.KafkaDirectWordCount
    --master local[2]
    --name KafkaDirectWordCount
    --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0
    /home/hadoop/lib/sparktrain-1.0.jar hadoop000:9092 kafka_streaming_topic

    铭文二级:

    第10章 Spark Streaming整合Kafka

    Receiver方式的联调

    hadoop000:2181 test kafka_streaming_topic 1  //可直接到IDEA的edit configuration复制

    //test:group名、1:线程数

    setMaster("local[2]")    //一定要大于2

    mvn、scp、运行后看4040端口Spark Streaming的UI界面

    可发现Receiver是一直都在运作的,二Direct方式没有此Jobs

    Direct Approach(常用 spark1.3引入)

    特点:

    1、简化了并行度,不需要多个Input Stream,只需要一个DStream

    2、加强了性能,真正做到了0数据丢失,而Receiver方式需要写到WAL才可以(即副本存储),Direct方式没有Receiver

    3、只执行一次

    缺点:基于ZooKeeper的Kafka监控工具,无法展示出来,所以需要周期性地访问offset才能更新到ZooKeeper去

    操作:

    1、cp KafkaReceiverWordCount 为KafkaDirectWordCount

    将createStream改为createDirectStream

    参数只需要传brokers与topics,注意查看源码与泛型看返回类型并构造出来

    2、关键代码:

      val topicsSet = topics.split(",").toSet
      val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers)
        // TODO... Spark Streaming如何对接Kafka
      val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicsSet)

    3、联调方式跟Receiver完全一样

    第11章 Spark Streaming整合Flume&Kafka打造通用流处理基础

    整合日志输出到Flume、整合Flume到Kafka、整合Kafka到Spark Streaming

    将Spark Streaming接受到的数据进行处理

    日志产生器开发并结合log4j完成日志的输出=>

    项目结构的构建:

    在test文件夹建java文件夹(改颜色):

    新建类LoggerGenerator

    public class LoggerGenerator {
        private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());
        public static void main(String[] args) throws Exception{
            int index = 0;
            while(true) {
                Thread.sleep(1000);
                logger.info("value : " + index++);
            }
        }
    }

    在test文件夹建resources文件夹(改颜色):

    新建文件log4j.properties

    log4j.rootLogger=INFO,stdout,flume
    
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.target = System.out
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
    

     含义:

      %m   输出代码中指定的消息
      %p   输出优先级,即DEBUG,INFO,WARN,ERROR,FATAL 
      %r   输出自应用启动到输出该log信息耗费的毫秒数 
      %c   输出所属的类目,通常就是所在类的全名 
      %t   输出产生该日志事件的线程名 
      %n   输出一个回车换行符,Windows平台为“
    ”,Unix平台为“
    ” 
      %d   输出日志时间点的日期或时间,默认格式为ISO8601,也可以在其后指定格式,比如:%d{yyy MMM dd HH:mm:ss , SSS},
         输出类似:2002年10月18日 22 : 10 : 28 , 921   %l 输出日志事件的发生位置,包括类目名、发生的线程,以及在代码中的行数。举例:Testlog4.main(TestLog4.java: 10 )

      

  • 相关阅读:
    【BZOJ1014】【JSOI2008】火星人prefix
    [agc011e]increasing numbers
    NOIp2018模拟赛四十一
    拉格朗日插值&&快速插值
    NOIp2018模拟赛四十
    (2016北京集训十四)【xsy1557】task
    (2016北京集训十四)【xsy1556】股神小D
    数据泵导入ORA-39082报错解决
    OracleDBA职责—备份与恢复技术—概念
    OracleDBA职责—备份与恢复技术—RMAN3
  • 原文地址:https://www.cnblogs.com/kkxwz/p/8391895.html
Copyright © 2011-2022 走看看