zoukankan      html  css  js  c++  java
  • 【Spark】SparkStreaming从不同基本数据源读取数据


    基本数据源

    文件数据源

    注意事项

    1.SparkStreaming不支持监控嵌套目录
    2.文件进入dataDirectory(受监控的文件夹)需要通过移动或者重命名实现
    3.一旦文件移动进目录,则不能再修改,即使修改也不会读取修改后的数据

    步骤

    一、创建maven工程并导包
    <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.2.0</spark.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.5</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
    
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
    
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <!--    <verbal>true</verbal>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    
    二、在HDFS创建目录,并上传要做测试的数据
    cd /export/servers/
    vim wordcount.txt
    
    hello world
    abc test
    hadoop hive
    

    HDFS上创建目录

    hdfs dfs -mkdir /stream_data
    hdfs dfs -put wordcount.txt /stream_data
    
    三、开发SparkStreaming代码
    package cn.itcast.sparkstreaming.demo1
    
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object getHdfsFiles {
    
      // 自定义updateFunc函数
      /**
       * updateFunc需要两个参数
       *
       * @param newValues    新输入数据计数累加的值
       * @param runningCount 历史数据计数累加完成的值
       * @return 返回值是Option
       *
       *         Option是scala中比较特殊的类,是some和none的父类,主要为了解决null值的问题
       */
      def updateFunc(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
        val finalResult: Int = newValues.sum + runningCount.getOrElse(0)
        Option(finalResult)
      }
    
      def main(args: Array[String]): Unit = {
        //获取SparkConf
        val sparkConf: SparkConf = new SparkConf().setAppName("getHdfsFiles_to_wordcount").setMaster("local[6]").set("spark.driver.host", "localhost")
        // 获取SparkContext
        val sparkContext = new SparkContext(sparkConf)
        // 设置日志级别
        sparkContext.setLogLevel("WARN")
        // 获取StreamingContext
        val streamingContext = new StreamingContext(sparkContext, Seconds(5))
        // 将历史结果都保存到一个路径下
        streamingContext.checkpoint("./stream.check")
    
        // 读取HDFS上的文件
        val fileStream: DStream[String] = streamingContext.textFileStream("hdfs://node01:8020/stream_data")
        // 对读取到的文件进行计数操作
        val flatMapStream: DStream[String] = fileStream.flatMap(x => x.split(" "))
        val wordAndOne: DStream[(String, Int)] = flatMapStream.map(x => (x, 1))
        // reduceByKey不会将历史消息的值进行累加,所以需要用到updateStateByKey,需要的参数是updateFunc,需要自定义
        val byKey: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
    
        //输出结果
        byKey.print()
    
        streamingContext.start()
        streamingContext.awaitTermination()
      }
    }
    
    四、运行代码后,往HDFS文件夹上传文件

    在这里插入图片描述

    五、控制台输出结果
    -------------------------------------------
    Time: 1586856345000 ms
    -------------------------------------------
    
    -------------------------------------------
    Time: 1586856350000 ms
    -------------------------------------------
    
    -------------------------------------------
    Time: 1586856355000 ms
    -------------------------------------------
    (abc,1)
    (world,1)
    (hadoop,1)
    (hive,1)
    (hello,1)
    (test,1)
    
    -------------------------------------------
    Time: 1586856360000 ms
    -------------------------------------------
    (abc,1)
    (world,1)
    (hadoop,1)
    (hive,1)
    (hello,1)
    (test,1)
    
    -------------------------------------------
    Time: 1586856365000 ms
    -------------------------------------------
    (abc,1)
    (world,1)
    (hadoop,1)
    (hive,1)
    (hello,1)
    (test,1)
    
    -------------------------------------------
    Time: 1586856370000 ms
    -------------------------------------------
    (abc,2)
    (world,2)
    (hadoop,2)
    (hive,2)
    (hello,2)
    (test,2)
    
    -------------------------------------------
    Time: 1586856375000 ms
    -------------------------------------------
    (abc,2)
    (world,2)
    (hadoop,2)
    (hive,2)
    (hello,2)
    (test,2)
    
    

    自定义数据源

    步骤

    一、使用nc工具给指定端口发送数据
    nc -lk 9999
    
    二、开发代码
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    object CustomReceiver {
    
      /**
       * 自定义updateFunc函数
       * @param newValues
       * @param runningCount
       * @return
       */
      def updateFunc(newValues:Seq[Int], runningCount:Option[Int]):Option[Int] = {
        val finalResult: Int = newValues.sum + runningCount.getOrElse(0)
        Option(finalResult)
      }
    
      def main(args: Array[String]): Unit = {
        // 获取SparkConf
        val sparkConf: SparkConf = new SparkConf().setAppName("CustomReceiver").setMaster("local[6]").set("spark.driver.host", "localhost")
        // 获取SparkContext
        val sparkContext = new SparkContext(sparkConf)
        sparkContext.setLogLevel("WARN")
        // 获取StreamingContext
        val streamingContext = new StreamingContext(sparkContext, Seconds(5))
        streamingContext.checkpoint("./stream_check")
    
        // 读取自定义数据源的数据
        val stream: ReceiverInputDStream[String] = streamingContext.receiverStream(new MyReceiver("node01", 9999))
    
        // 对数据进行切割、计数操作
        val mapStream: DStream[String] = stream.flatMap(x => x.split(" "))
        val wordAndOne: DStream[(String, Int)] = mapStream.map((_, 1))
        val byKey: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
    
        // 输出结果
        byKey.print()
    
        streamingContext.start()
        streamingContext.awaitTermination()
      }
    }
    
    import java.io.{BufferedReader, InputStream, InputStreamReader}
    import java.net.Socket
    import java.nio.charset.StandardCharsets
    
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.receiver.Receiver
    
    class MyReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2){
      /**
       * 自定义receive方法接收socket数据,并调用store方法将数据保存起来
       */
      private def receiverDatas(): Unit ={
        // 接收socket数据
        val socket = new Socket(host, port)
        // 获取socket数据输入流
        val stream: InputStream = socket.getInputStream
        //通过BufferedReader ,将输入流转换为字符串
        val reader = new BufferedReader(new InputStreamReader(stream,StandardCharsets.UTF_8))
    
        var line: String = null
        //判断读取到的数据不为空且receiver没有被停掉时
        while ((line = reader.readLine()) != null && !isStopped()){
          store(line)
        }
    
        stream.close()
        socket.close()
        reader.close()
      }
    
    
      /**
       * 重写onStart和onStop方法,主要是onStart,onStart方法会被反复调用
       */
      override def onStart(): Unit = {
        // 启动通过连接接收数据的线程
        new Thread(){
          //重写run方法
          override def run(): Unit = {
            // 定义一个receiverDatas接收socket数据
            receiverDatas()
          }
        }
      }
    
      // 停止结束的时候被调用
      override def onStop(): Unit = {
    
      }
    }
    

    RDD队列

    步骤

    一、开发代码
    package cn.itcast.sparkstreaming.demo3
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import scala.collection.mutable
    
    object QueneReceiver {
      def main(args: Array[String]): Unit = {
        //获取SparkConf
        val sparkConf: SparkConf = new SparkConf().setMaster("local[6]").setAppName("queneReceiver").set("spark.driver.host", "localhost")
        //获取SparkContext
        val sparkContext = new SparkContext(sparkConf)
        sparkContext.setLogLevel("WARN")
        //获取StreamingContext
        val streamingContext = new StreamingContext(sparkContext, Seconds(5))
    
        val queue = new mutable.SynchronizedQueue[RDD[Int]]
        // 需要参数 queue: Queue[RDD[T]]
        val inputStream: InputDStream[Int] = streamingContext.queueStream(queue)
        // 对DStream进行操作
        val mapStream: DStream[Int] = inputStream.map(x => x * 2)
    
        mapStream.print()
    
        streamingContext.start()
        //定义一个RDD队列
        for (x <- 1 to 100){
          queue += streamingContext.sparkContext.makeRDD(1 to 10)
          Thread.sleep(3000)
        }
        streamingContext.awaitTermination()
    
      }
    }
    
  • 相关阅读:
    负数求余数 C 和 Matlab&Python 处理不一样
    [Matlab] 线性卷积&圆周卷积代码实现
    [Arduino] 驱动RC522 读取 UID例程
    [C++] Nested Radical Constant
    [Arduino] 学习总结小合集(更新ING)
    谐振电路的品质因素总结
    142. Linked List Cycle II
    664. Strange Printer
    188. Best Time to Buy and Sell Stock IV
    50. Pow(x, n)
  • 原文地址:https://www.cnblogs.com/zzzsw0412/p/12772384.html
Copyright © 2011-2022 走看看