zoukankan      html  css  js  c++  java
  • 将sparkStreaming结果保存到Redshift数据库

    1.保存到redshift数据库的代码

    package test05

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types._
    import org.apache.spark.sql._
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka.KafkaUtils

    object SaveDataToMysql {
    def main(args: Array[String]): Unit = {

    // 屏蔽不必要的日志 ,在终端上显示需要的日志
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.OFF)

    //初始化sparkStreaming
    val conf = new SparkConf().setAppName("SaveDataToMysql").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))

    //设置连接Kafka的配置信息
    val zkQuorum = "192.168.1.112:2181" //zookeeper集群的IP:port,IP:port,IP:port
    val group = "testgroup" //在consumer.properties配置group.id
    val topics = "huiliyang" //选择要连接的producer,它是以topic来区分每个producer的。例如:我这里的创建的topic是huiliyang
    val numThreads = 2 //线程
    val topicpMap = topics.split(" ").map((_,numThreads.toInt)).toMap //这个是有可能有好几个topic同时提供数据,那么我们要把它用空格分割开,然后映射成(topic,2),再转换成map集合
        ssc.checkpoint("checkpoint"
        val lines: DStream[String] = KafkaUtils.createStream(ssc,zkQuorum,group,topicpMap).map(_._2)    //创建流

    lines.print()

    //保存到redshift
    lines.map(x=>x.split(",")).foreachRDD(line =>{
    line.foreachPartition(rdd =>{
    val conn = ConnectPoolUtil.getConnection //
    ConnectPoolUtil是我创建的一个数据库连接池,getConnection是它的一个方法

    conn.setAutoCommit(false); //设为手动提交
            val  stmt = conn.createStatement()
    rdd.foreach(word=>{
    stmt.addBatch("insert into test_log2(time, ip, user_id, user_type, source, scene) values('" + word(0)+"','"+word(1)+"','"+word(2)+"','"+word(3)+"','"+word(4)+"','"+word(5) + "')")
    })
    stmt.executeBatch()
    conn.commit()
    conn.close()
    })
    })
    ssc.start()
    ssc.awaitTermination()
      }

    }


    数据库连接池代码:
    package test05

    import java.sql.{Connection, PreparedStatement, ResultSet}
    import org.apache.commons.dbcp.BasicDataSource


    object ConnectPoolUtil {

    private var bs:BasicDataSource = null

    /**
    * 创建数据源
    * @return
    */
    def getDataSource():BasicDataSource={
    if(bs==null){
    bs = new BasicDataSource()
    bs.setDriverClassName("org.postgresql.Driver")
    bs.setUrl("jdbc:postgresql://172.30.11.61:5439/test")
    bs.setUsername("*****")
    bs.setPassword("*****")
    bs.setMaxActive(200) //设置最大并发数
    bs.setInitialSize(30) //数据库初始化时,创建的连接个数
    bs.setMinIdle(50) //最小空闲连接数
    bs.setMaxIdle(200) //数据库最大连接数
    bs.setMaxWait(1000)
    bs.setMinEvictableIdleTimeMillis(60*1000) //空闲连接60秒中后释放
    bs.setTimeBetweenEvictionRunsMillis(5*60*1000) //5分钟检测一次是否有死掉的线程
    bs.setTestOnBorrow(true)
    }
    bs
    }

    /**
    * 释放数据源
    */
    def shutDownDataSource(){
    if(bs!=null){
    bs.close()
    }
    }

    /**
    * 获取数据库连接
    * @return
    */
    def getConnection():Connection={
    var con:Connection = null
    try {
    if(bs!=null){
    con = bs.getConnection()
    }else{
    con = getDataSource().getConnection()
    }
    } catch{
    case e:Exception => println(e.getMessage)
    }
    con
    }

    /**
    * 关闭连接
    */
    def closeCon(rs:ResultSet ,ps:PreparedStatement,con:Connection){
    if(rs!=null){
    try {
    rs.close()
    } catch{
    case e:Exception => println(e.getMessage)
    }
    }
    if(ps!=null){
    try {
    ps.close()
    } catch{
    case e:Exception => println(e.getMessage)
    }
    }
    if(con!=null){
    try {
    con.close()
    } catch{
    case e:Exception => println(e.getMessage)
    }
    }
    }
    }

    pom文件
    <properties>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.2.0</spark.version>
    <hadoop.version>2.7.2</hadoop.version>
    <spark.pom.scope>compile</spark.pom.scope>
    </properties>

    <dependencies>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>${spark.version}</version>
    <!--<scope>${spark.pom.scope}</scope>-->
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>${spark.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.2.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>${spark.version}</version>
    <!--<scope>${spark.pom.scope}</scope>-->
    </dependency>
    <dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>9.4.1212</version>
    </dependency>

    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.39</version>
    </dependency>
    </dependencies>
     
  • 相关阅读:
    koa学习
    nodejs工作大全
    《程序员周先生之前端开发面试题》
    使用vue技术应当使用的技术和兼容性选择
    IdentityServer4简单入门demo系列 (一)认证服务端
    IdentityServer4客户端获取Token的方法
    wpf 右键菜单的使用
    wpf 在用户控件里,关掉父级窗口
    EntityFramework集成Sqlite的详细步骤
    wpf DataGrid 里的列模版的值绑定
  • 原文地址:https://www.cnblogs.com/yhl-yh/p/7741539.html
Copyright © 2011-2022 走看看