zoukankan      html  css  js  c++  java
  • 新闻实时分析系统 Spark Streaming实时数据分析

    1.Spark Streaming功能介绍
    1)定义
    Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams

    2.NC服务安装并运行Spark Streaming
    1)在线安装nc命令
    yum install -y nc
    2)运行Spark Streaming 的WordCount
    bin/run-example streaming.NetworkWordCount localhost 9999
    3)把文件通过管道作为nc的输入,然后观察spark Streaming计算结果
    cat test.txt | nc -lk 9999
    文件具体内容
    hadoop storm spark
    hbase spark flume
    spark dajiangtai spark
    hdfs mapreduce spark
    hive hdfs solr
    spark flink storm
    hbase storm es
    3.Spark Streaming工作原理
    1)Spark Streaming数据流处理

    2)接收器工作原理




    3)综合工作原理


    4.Spark Streaming编程模型
    1)StreamingContext初始化的两种方式
    #第一种
    val ssc = new StreamingContext(sc, Seconds(5))
    #第二种
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(1))
    2)Spark Streaming socket代码
    object NetworkWordCount {
    def main(args: Array[String]) {
    if (args.length < 2) {
    System.err.println("Usage: NetworkWordCount ")
    System.exit(1)
    }

    //创建StreamingContext,每秒钟计算一次
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    //监听网络端口,参数一:hostname 参数二:port 参数三:存储级别,创建了lines流
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    //flatMap运算
    val words = lines.flatMap(_.split(" "))
    //map reduce 计算
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
    }
    }
    5.Spark Streaming读取Socket流数据
    1)spark-shell运行Streaming程序,要么线程数大于1,要么基于集群。
    bin/spark-shell --master local[2]
    bin/spark-shell --master spark://bigdata-pro01.kfk.com:7077
    2)spark 运行模式

    3)Spark Streaming读取Socket流数据
    a)编写测试代码,并本地运行
    object TestStreaming {
    def main(args: Array[String]) {
    if (args.length < 2) {
    System.err.println("Usage: NetworkWordCount ")
    System.exit(1)
    }

    val spark=SparkSession.builder().master("local[2]").setAppName("streaming").getOrCreate()
    val sc = spark.SparkContext

    val ssc = new StreamingContext(sc, Seconds(5))

    //监听网络端口,参数一:hostname 参数二:port 参数三:存储级别,创建了lines流
    val lines = ssc.socketTextStream("igdata-pro02.kfk.com", 9999, StorageLevel.MEMORY_AND_DISK_SER)
    //flatMap运算
    val words = lines.flatMap(_.split(" "))
    //map reduce 计算
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
    }
    }
    b)启动nc服务发送数据
    nc -lk 9999
    6.Spark Streaming保存数据到外部系统
    1)保存到mysql数据库

    2)保存到hdfs

    7.Spark Streaming与Kafka集成
    1)Maven引入相关依赖:spark-streaming-kafka
    2)编写测试代码并启动运行
    object StreamingKafka8 {

    def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
    .master("local[2]")
    .appName("streaming").getOrCreate()

    val sc =spark.sparkContext;
    val ssc = new StreamingContext(sc, Seconds(5))

    // Create direct kafka stream with brokers and topics
    val topicsSet =Set("weblogs")
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "bigdata-pro01.kfk.com:9092")
    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet)

    val lines = kafkaStream.map(x => x._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
    }
    }
    3)启动Kafka服务并测试生成数据
    bin/kafka-server-start.sh config/server.properties
    bin/kafka-console-producer.sh --broker-list bigdata-pro01.kfk.com --topic weblogs

  • 相关阅读:
    Asp.net文章内容分页
    JQuery文字不间断滚动
    .Net Core利用反射动态加载DLL类库的方法(解决类库不包含Nuget依赖包的问题)
    【Bug】远程登录导致WPF应用程序中的UserControl控件Loaded事件重复触发
    【原创】WPF TreeView带连接线样式的优化(WinFrom风格)
    DataGrid 字体垂直居中
    Elasticsearch.Net
    利用数学归纳法指导编写递归程序
    多种图像格式相互转换工具的开发(附源代码)
    油气大数据分析 第一章 软计算基础(第四、五、六节)
  • 原文地址:https://www.cnblogs.com/misliu/p/11559228.html
Copyright © 2011-2022 走看看