zoukankan      html  css  js  c++  java
  • Spark Streaming、Kafka结合Spark JDBC External DataSouces处理案例

    场景:使用Spark Streaming接收Kafka发送过来的数据与关系型数据库中的表进行相关的查询操作;

    Kafka发送过来的数据格式为:id、name、cityId,分隔符为tab 

    1       zhangsan        1
    2       lisi    1
    3       wangwu  2
    4       zhaoliu 3

    MySQL的表city结构为:id int, name varchar

    1    bj
    2    sz
    3    sh

    本案例的结果为:select s.id, s.name, s.cityId, c.name from student s join city c on s.cityId=c.id;

    Kafka安装参见:Kafka单机版环境搭建

    启动Kafka:

    zkServer.sh start
    kafka-server-start.sh  $KAFKA_HOME/config/server.properties  &
    kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1  --topic luogankun_topic
    kafka-console-producer.sh --broker-list hadoop000:9092 --topic luogankun_topic

    实例代码:

    package com.asiainfo.ocdc
    
    case class Student(id: Int, name: String, cityId: Int)
    package com.asiainfo.ocdc
    
    
    import org.apache.spark.streaming._
    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.kafka._
    
    /**
     * Spark Streaming处理Kafka的数据并结合Spark JDBC外部数据源处理
     *
     * @author luogankun
     */
    object KafkaStreaming {
      def main(args: Array[String]) {
    
        if (args.length < 4) {
          System.err.println("Usage: KafkaStreaming <zkQuorum> <group> <topics> <numThreads>")
          System.exit(1)
        }
    
        val Array(zkQuorum, group, topics, numThreads) = args
        val sparkConf = new SparkConf()
        val sc = new SparkContext(sparkConf)
        val ssc = new StreamingContext(sc, Seconds(5))
    
        val sqlContext = new HiveContext(sc)
        import sqlContext._
    
        import com.luogankun.spark.jdbc._
        //使用External Data Sources处理MySQL中的数据
        val cities = sqlContext.jdbcTable("jdbc:mysql://hadoop000:3306/test", "root", "root", "select id, name from city")
        //将cities RDD注册成city临时表
        cities.registerTempTable("city")
    
        val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
        val inputs = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
    
        inputs.foreachRDD(rdd => {
          if (rdd.partitions.length > 0) {
            //将Streaming中接收到的数据注册成student临时表
            rdd.map(_.split("	")).map(x => Student(x(0).toInt, x(1), x(2).toInt)).registerTempTable("student")
            //关联Streaming和MySQL表进行查询操作
            sqlContext.sql("select s.id, s.name, s.cityId, c.name from student s join city c on s.cityId=c.id").collect().foreach(println)
          }
        })
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

    提交到集群执行脚本:sparkstreaming_kafka_jdbc.sh

    #!/bin/sh
    . /etc/profile
    set -x
    
    cd $SPARK_HOME/bin
    
    spark-submit 
    --name KafkaStreaming 
    --class com.asiainfo.ocdc.KafkaStreaming 
    --master spark://hadoop000:7077 
    --executor-memory 1G 
    --total-executor-cores 1 
    /home/spark/software/source/streaming-app/target/streaming-app-V00B01C00-SNAPSHOT-jar-with-dependencies.jar 
    hadoop000:2181 test-consumer-group luogankun_topic 1
  • 相关阅读:
    The OpenGL pipeline
    HLS协议实现
    用C++设计一个不能被继承的类
    Ansible@一个高效的配置管理工具--Ansible configure management--翻译(八)
    史上最简单的软件破解——5行脚本代码完美破解99%的过期软件
    oracle11g创建新的用户和改动最大连接数
    【SICP感应】1 工艺和替代模式
    ant利用先进,ant订单具体解释,ant包,ant包装删除编译jar文件
    SqlServer表EXCEL数据复制的另一种方法
    【摘要干】如何执飞前写商业计划?
  • 原文地址:https://www.cnblogs.com/luogankun/p/4267093.html
Copyright © 2011-2022 走看看