zoukankan      html  css  js  c++  java
  • Spark Streaming、HDFS结合Spark JDBC External DataSouces处理案例

    场景:使用Spark Streaming接收HDFS上的文件数据与关系型数据库中的表进行相关的查询操作;

    使用技术:Spark Streaming + Spark JDBC External DataSources

     
    HDFS上文件的数据格式为:id、name、cityId,分隔符为tab 
    1       zhangsan        1
    2       lisi    1
    3       wangwu  2
    4       zhaoliu 3
    MySQL的表city结构为:id int, name varchar
    1    bj
    2    sz
    3    sh
    本案例的结果为:select s.id, s.name, s.cityId, c.name from student s join city c on s.cityId=c.id;

    示例代码:

    package com.asiainfo.ocdc
    
    case class Student(id: Int, name: String, cityId: Int)
    package com.asiainfo.ocdc
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.hive.HiveContext
    
    /**
     * Spark Streaming处理HDFS上的数据并结合Spark JDBC外部数据源处理
     *
     * @author luogankun
     */
    object HDFSStreaming {
      def main(args: Array[String]) {
    
        if (args.length < 1) {
          System.err.println("Usage: HDFSStreaming <path>")
          System.exit(1)
        }
    
        val location = args(0)
    
        val sparkConf = new SparkConf()
        val sc = new SparkContext(sparkConf)
        val ssc = new StreamingContext(sc, Seconds(5))
    
        val sqlContext = new HiveContext(sc)
        import sqlContext._
    
        import com.luogankun.spark.jdbc._
        //使用External Data Sources处理MySQL中的数据
        val cities = sqlContext.jdbcTable("jdbc:mysql://hadoop000:3306/test", "root", "root", "select id, name from city")
        //将cities RDD注册成city临时表
        cities.registerTempTable("city")
    
        val inputs = ssc.textFileStream(location)
        inputs.foreachRDD(rdd => {
          if (rdd.partitions.length > 0) {
            //将Streaming中接收到的数据注册成student临时表
            rdd.map(_.split("	")).map(x => Student(x(0).toInt, x(1), x(2).toInt)).registerTempTable("student");
    
            //关联Streaming和MySQL表进行查询操作
            sqlContext.sql("select s.id, s.name, s.cityId, c.name from student s join city c on s.cityId=c.id").collect().foreach(println)
          }
        })
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

    提交到集群执行脚本:sparkstreaming_hdfs_jdbc.sh

    #!/bin/sh
    . /etc/profile
    set -x
    
    cd $SPARK_HOME/bin
    
    spark-submit 
    --name HDFSStreaming 
    --class com.asiainfo.ocdc.HDFSStreaming 
    --master spark://hadoop000:7077 
    --executor-memory 1G 
    --total-executor-cores 1 
    /home/spark/software/source/streaming-app/target/streaming-app-V00B01C00-SNAPSHOT-jar-with-dependencies.jar 
    hdfs://hadoop000:8020/data/hdfs 
  • 相关阅读:
    break return continue
    爬虫---请求
    pycharm加开头注释
    爬虫---入门
    pip
    XML基础
    英语
    布局
    adobe
    StackOverflow
  • 原文地址:https://www.cnblogs.com/luogankun/p/4250297.html
Copyright © 2011-2022 走看看