zoukankan      html  css  js  c++  java
  • 《理财市场情绪监测系统》代码实现【2】之爬虫数据解析

    数据源为从新浪,腾讯,搜狐三个财经网站爬取而来,C++先进行过分词;

    这边对分词后的词进行处理,代码如下:

    /**
      * Created by lkl on 2017/6/26.
      *///spark-shell --driver-class-path /home/hadoop/test/mysqljdbc.jar
    import java.sql.{DriverManager, ResultSet}
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    object titlesplit {
    
      val rl = "jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456&useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false"
    
      classOf[com.mysql.jdbc.Driver]
      val conn = DriverManager.getConnection(rl)
      val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE)
      def main(args: Array[String]) {
        val conf = new SparkConf().setMaster("local").setAppName("test")
        val sc = new SparkContext(conf)
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        val format = new java.text.SimpleDateFormat("yyyyMMdd")
    
        val yearformat = new java.text.SimpleDateFormat("yyyy")
        val   year=yearformat.format(new java.util.Date().getTime())
    
        val monthformat = new java.text.SimpleDateFormat("MM")
        val   month=monthformat.format(new java.util.Date().getTime())
    
        val dayformat = new java.text.SimpleDateFormat("dd")
        val   day=dayformat.format(new java.util.Date().getTime())
    
        val dat01 = format.format(new java.util.Date().getTime() - 1 * 24 * 60 * 60 * 1000)
        val dat02 = format.format(new java.util.Date().getTime() - 0 * 24 * 60 * 60 * 1000)
        val dat03 = format.format(new java.util.Date().getTime() - 2 * 24 * 60 * 60 * 1000)
    
        val format2 = new java.text.SimpleDateFormat("yyyy-MM-dd")
        val dat = format2.format(new java.util.Date().getTime() - 1 * 24 * 60 * 60 * 1000)
       // val log01= sc.textFile("hdfs://192.168.0.211:9000/user/datacenter/home/datacenter/datacollect/logs/dataplatform/Crawler/Crawler_Common_WebPageNews/"+year+"/"+month+"/"+day+"/events_192.168.0.217_datacenter4.1499879147814")
        val  log01=sc.textFile("hdfs://192.168.0.211:9000/user/datacenter/home/datacenter/datacollect/logs/dataplatform/Crawler/Crawler_Common_WebPageNews/2017/07/14/events_192.168.0.217_datacenter4.1499994258650.gzip")
         ///user/datacenter/home/datacenter/datacollect/logs/dataplatform/Crawler/Crawler_Common_WebPageNews/2017/07/13
        val  l=log01.map(line=>(line.split("","")(1).split("":"")(1),line.split("","")(4).split("":"")(1),line.split("","")(12).split("":"")(1)
         ,line.split("","")(13).split("":"")(1)
          ,line.split("","")(23).split("":"")(1)))
    
         val role = "jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456&useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false"
        import sqlContext.implicits._
        val df=l.toDF("channelType","sourcetitle","title","time","innerSessionId")
        df.printSchema()
        df.insertIntoJDBC(role, "newstitles", true)
    
        val job = sqlContext.jdbc("jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456", "newstitle")
        val jo = job.toDF().registerTempTable("job")
        val ed = sqlContext.sql("select `innerSessionId`,`time`,`channelType`,`sourcetitle`,`title` from job")
    
    
        val pp = ed.map(p => {
          val v0 = p.getString(0)
          val v1 = p.getString(1)
          val v2 = p.getString(2)
          val v3 = p.getString(3)
          val v4 = p.getString(4)
          val v5 = p.getString(4).split("\|")
          (v0, v1, v2, v3,v4, v5)
        })
    
        pp.foreach(p => {
          for (i <- 0 until p._6.size) {
            println(p._6.size)
            val v0 = p._1
            val v1 = p._2
            val v2 = p._3
            val v3 = p._4
            val v4 = p._5
            val v5 = p._6(i).split(" ")
            if (v5.size == 4) {
              println("12")
              insert(v0, v1, v2, v3,v4, v5(0), v5(1), v5(2), v5(3))
            }
    
          }
          conn.close()
        })
    
    
        def insert(value0: String, value1: String, value2: String, value3: String, value4: String, value5: String,
                   value6: String, value7: String, value8: String): Unit = {
    
          println(value0, value1, value2, value3, value4, value5, value6, value7, value8)
    
          // CREATE TABLE words2(innersessionId VARCHAR(100),words VARCHAR(100), VARCHAR(100),posit VARCHAR(100),va VARCHAR(100))
          try {
            val prep = conn.prepareStatement("INSERT INTO titlesplit(innserSessionid,times,channelType,sourcetitle,title,words,characters,refer,role) VALUES (?,?,?,?,?,?,?,?,?) ")
            prep.setString(1, value0)
            prep.setString(2, value1)
            prep.setString(3, value2)
            prep.setString(4, value3)
            prep.setString(5, value4)
            prep.setString(6, value5)
            prep.setString(7, value6)
            prep.setString(8, value7)
            prep.setString(9, value8)
            prep.executeUpdate
          } catch {
            case e: Exception => e.printStackTrace
          }
          finally {
          conn.close()
          }
        }
      }
      }
  • 相关阅读:
    关于Git的一些常规操作
    .Net 常用ORM框架对比:EF Core、FreeSql、SqlSuger (下篇)
    Myeclipse打开许多JSP文件或者js文件之后非常卡-------的解决办法
    单点登录常用生成token的操作-----UUID.randomUUID().toString() 简介
    如何使用时间复杂度和空间复杂度来区分算法的效率?
    自定义注解的简单使用
    企业中常用的Git和Svn比较。
    框架中常见的注解分析
    处理大数据流常用的三种Apache框架:Storm、Spark和Samza。(主要介绍Storm)
    养生食谱
  • 原文地址:https://www.cnblogs.com/canyangfeixue/p/7192987.html
Copyright © 2011-2022 走看看