zoukankan      html  css  js  c++  java
  • 数据处理,简易流程开发

        跟项目经理,确认业务
        自己思考,写出大概流程,画出草体,跟同事交流,
        写出具体流程,画出具体流程图,
        确定业务,跟项目经理交流,进一步确定项目,
        开始进行项目的开发
    

    0 、创建项目scala项目:G A V ,构建项目目录树,导入依赖

        向pom.xml中导入依赖
    
        <scala.version>2.11.8</scala.version>
        <kafka.version>0.9.0.0</kafka.version>
        <spark.version>2.2.0</spark.version>
    
    
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>${scala.version}</version>
        </dependency>
    
        <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-streaming_2.11</artifactId>
         <version>${spark.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flume.flume-ng-clients</groupId>
          <artifactId>flume-ng-log4jappender</artifactId>
          <version>1.6.0</version>
        </dependency>
    

    1 、编写log4j,进行本地测试,测试log4j是否生成日志

    src/main/java/com/imooc/LoggerGenerator.java
    
    
    import org.apache.log4j.Logger;
    /**
     * 模拟日志生成
     */
    public class GenerateLog4j {
        private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());
        public static void main(String[] args) throws InterruptedException {
            int index = 0;
            while (true) {
                Thread.sleep(1000);
                logger.info("current : " + index++);
            }
        }
    }
    
    
    
    src/resources/log4j.properties:
    
    
     log4j.rootLogger=info,stdout,flume
    
        log4j.appender.stdout=org.apache.log4j.ConsoleAppender
        log4j.appender.stdout.Target=System.out
        log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
        log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
    

    测试正确后,下一步


    2 、flume从log4j收集数据

    在flume官网中到log4jAppender中对接flume的属性值
    放入本地资源共享库resources内的log4j.properties中

    src/resources/log4j.properties:
    
    
    log4j.rootLogger=info,stdout,flume
    
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.Target=System.out
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
    
    #log4j对接flume,输出到控制台
    log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
    log4j.appender.flume.Hostname = Master
    log4j.appender.flume.Port = 41414
    log4j.appender.flume.UnsafeMode = true
    

    编写log4j对接flume配置的测试,log4j-channels-flume.conf
    然后进行控制台进行测试
    Note:编写flume的配置文件时,直接取官网查找,然后粘贴复制,最好不要手敲,容易出错的

    log4j-channels-flume.conf:
    
    
    agent1.sources=avro-source      
    agent1.channels=logger-channel  
    agent1.sinks=log-sink   
    
    #define source
    agent1.sources.avro-source.type = avro
    agent1.sources.avro-source.bind = 0.0.0.0
    agent1.sources.avro-source.port = 41414
    
    #define channel
    agent1.channels.logger-channel.type = memory
    
    #define sink
    agent1.sinks.log-sink.type = logger
    
    #linked sources and sinks
    agent1.sources.avro-source.channels = logger-channel
    agent1.sinks.log-sink.channel = logger-channel
    
    启动flume:
    flume-ng agent 
    --conf /Users/hadoop/app/flume/conf 
    --conf-file /Users/hadoop/app/flume/Configure/log4j-channels-flume.conf 
    --name agent1 
    -Dflume.root.logger=INFO,console
    

    测试正确后,下一步


    3 、flume输送数据到kafka

    先启动zookeeper服务器,然后启动kafka服务器,创建并查看kafka的topic
    编写flume配置文件flume-channels-kafka.conf,去官网找kafka sink的配置信息,进行更改
    编写kafka对接SparkStreaming的测试代码。
    启动flume,
    启动kafka的消费者
    启动log4jGenerated.java程序,
    启动kafkaStreaming.scala程序
    观察控制台和kafka消费者终端的信息。

    $ ./zkServer.sh start
    $ ./kafka-server-start.sh -daemon /Users/hadoop/app/kafka/config/server.properties &
    $ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streamingtopicdemo
    $ ./kafka-topics.sh --list --zookeeper localhost:2181
    $ ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic streamingtopicdemo
    $ ./kafka-console-consumer.sh --zookeeper Master:2181 --topic streamingtopic
    $ flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/Configure/flume-channels-kafka.conf --name agent1 -Dflume.root.logger=INFO,consol
    IDEA下 运行log4jGenerated.java
    IDEA下 运行kafkaStreaming.scala
    
    
    /src/main/scala/com/imooc/KafkaStreaming.scala
    
    Program arguments:  Master:2181 test streamingtopic 1
    
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * Kafka对接Spark Streaming
      */
    object KafkaStreaming {
    
      def main(args: Array[String]): Unit = {
    
        if (args.length != 4) {
          System.err.println("Usage: <zkQuorum> <groupId> <topics> <numThreads>")
        }
    
        val Array(zkQuorum, groupId, topics, numThreads) = args
    
        val sparkConf = new SparkConf().setAppName("KafkaStreaming").setMaster("local[2]");
        val ssc = new StreamingContext(sparkConf,Seconds(5))
    
        val topicsMap =  topics.split(",").map((_, numThreads.toInt)).toMap
    
        // TODO...  Spark Streaming如何对接Kafka
        val message = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicsMap)
    
        // TODO...  Spark Streaming的测试代码
        message.map(_._2).count().print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    测试正确后,下一步


    4 、开始SparkStreaming代码的正式开发

    现在是在本地进行测试的,在IDEA中运行LoggerGenerator,
    然后使用Flume、Kafka以及Spark Streaming进行处理操作。

    在生产上肯定不是这么干的,怎么干呢?
    1)打包jar,执行LoggerGenerator类
    2)Flume、Kafka和我们的测试是一样的
    3)Spark Streaming的代码也是需要打成jar包,然后使用spark-submit的方式进行提交到环境上执行
    到底采用什么方式? 可以根据实际情况选择运行模式: local/yarn/standalone/mesos

    在生产上,整个流处理的流程都一样的,区别在于业务逻辑的复杂性

  • 相关阅读:
    快速删除段落间多余的空行
    平时一些mysql小技巧及常识
    mysql中常用的控制流函数
    按年、季度、月分组&&计算日期和时间的函数
    Excel通过身份证获取出生年月,性别,年龄,生肖,星座,省份等信息总结归纳
    统计图表类型选择应用总结&表数据挖掘方法及应用
    EXCEL如何提取文字中包含的数字?
    一篇说尽Excel常见函数用法
    RStudio中,出现中文乱码问题的解决方案
    R-RMySQL包介绍学习
  • 原文地址:https://www.cnblogs.com/suixingc/p/que-ding-ye-wu-jin-xing-hua-tu-gou-si-xie-chu-zhen.html
Copyright © 2011-2022 走看看