zoukankan      html  css  js  c++  java
  • foreachRDD

    需求: 将统计结果写入到MySQL

    create table wordcount(
      word varchar(50) default null,
      wordcount int(10) default null
    );
    

    通过该sql将统计结果写入到MySQL

    "insert into wordcount(word, wordcount) vlaues('" + record._1 + "'," + record._2 + ")"
    

    存在的问题:
    1) 对与已有的数据做更新,而是对所有的数据均为insert

    改进思路:

    a) 在插入数据前先判断单词是否存在,如果存在就update,不存在则insert
    b) 工作中: HBase/Redis

    2) 每个rdd的partition创建connection,建议大家改成连接池

    import java.sql.DriverManager
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext }
    
    object outputMysqlApp extends App {
    
      //配置入口点
      val conf = new SparkConf().setAppName(getClass.getSimpleName).setMaster("local[2]")
      val ssc= new StreamingContext(conf, Seconds(1))
    
      //输入数据流(DStream)
      val lines = ssc.socketTextStream("localhost", 9999)
    
      //todo...
      val words = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    
    
    
    //  方式三:
        words.foreachRDD ( rdd => {
          rdd.foreachPartition(partitionOfRecords => {
    
            if (partitionOfRecords.size > 0) {
              val connection = createNewConnection()
              partitionOfRecords.foreach(record => {
                val sql = "insert into wordcount(word, wordcount) vlaues('" + record._1 + "'," + record._2 + ")"
                connection.createStatement().execute(sql)
              })
    
              connection.close()
            }
          })
        })
    
      //启动StreamingContext,接收数据,然后处理数据
      ssc.start()
      ssc.awaitTermination()
    
      //创建Mysql数据库连接/**
      /**
        * 获取Mysql数据库连接
         * @return    注意返回值,这块不能为空
        */
      def createNewConnection()= {
        Class.forName("com.mysql.jdbc.Driver")
        DriverManager.getConnection("jdbc:mysql://192.168.1.100:3306/streaming_mysql","root","root")
      }
    }
    
    
  • 相关阅读:
    SDOI2020游记
    Git和GitHub详解
    P1251 餐巾计划问题
    P2824 [HEOI2016/TJOI2016]排序
    P3224 [HNOI2012]永无乡
    P3605 [USACO17JAN]Promotion Counting晋升者计数
    P4314 CPU监控
    P2939 [USACO09FEB]改造路Revamping Trails
    P4254 [JSOI2008]Blue Mary开公司
    P1772 [ZJOI2006]物流运输
  • 原文地址:https://www.cnblogs.com/suixingc/p/foreachrdd.html
Copyright © 2011-2022 走看看