zoukankan      html  css  js  c++  java
  • foreachRDD

    需求: 将统计结果写入到MySQL

    create table wordcount(
      word varchar(50) default null,
      wordcount int(10) default null
    );
    

    通过该sql将统计结果写入到MySQL

    "insert into wordcount(word, wordcount) vlaues('" + record._1 + "'," + record._2 + ")"
    

    存在的问题:
    1) 对与已有的数据做更新,而是对所有的数据均为insert

    改进思路:

    a) 在插入数据前先判断单词是否存在,如果存在就update,不存在则insert
    b) 工作中: HBase/Redis

    2) 每个rdd的partition创建connection,建议大家改成连接池

    import java.sql.DriverManager
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext }
    
    object outputMysqlApp extends App {
    
      //配置入口点
      val conf = new SparkConf().setAppName(getClass.getSimpleName).setMaster("local[2]")
      val ssc= new StreamingContext(conf, Seconds(1))
    
      //输入数据流(DStream)
      val lines = ssc.socketTextStream("localhost", 9999)
    
      //todo...
      val words = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    
    
    
    //  方式三:
        words.foreachRDD ( rdd => {
          rdd.foreachPartition(partitionOfRecords => {
    
            if (partitionOfRecords.size > 0) {
              val connection = createNewConnection()
              partitionOfRecords.foreach(record => {
                val sql = "insert into wordcount(word, wordcount) vlaues('" + record._1 + "'," + record._2 + ")"
                connection.createStatement().execute(sql)
              })
    
              connection.close()
            }
          })
        })
    
      //启动StreamingContext,接收数据,然后处理数据
      ssc.start()
      ssc.awaitTermination()
    
      //创建Mysql数据库连接/**
      /**
        * 获取Mysql数据库连接
         * @return    注意返回值,这块不能为空
        */
      def createNewConnection()= {
        Class.forName("com.mysql.jdbc.Driver")
        DriverManager.getConnection("jdbc:mysql://192.168.1.100:3306/streaming_mysql","root","root")
      }
    }
    
    
  • 相关阅读:
    LightOj 1027 A Dangerous Maze
    hdu 4738 Caocao's Bridges(割边)
    数论模板
    Codeforces Round #316 (Div. 2) D. Tree Requests(dsu)
    Educational Codeforces Round 2 E. Lomsat gelral(dsu)
    qa问答机器人pysparnn问题的召回
    pysparnn 模块使用,相似句子召回
    pytorch seq2seq闲聊机器人beam search返回结果
    pytorch seq2seq闲聊机器人加入attention机制
    python 中自带的堆模块heapq
  • 原文地址:https://www.cnblogs.com/suixingc/p/foreachrdd.html
Copyright © 2011-2022 走看看