数据源为从新浪,腾讯,搜狐三个财经网站爬取而来,C++先进行过分词;
这边对分词后的词进行处理,代码如下:
/** * Created by lkl on 2017/6/26. *///spark-shell --driver-class-path /home/hadoop/test/mysqljdbc.jar import java.sql.{DriverManager, ResultSet} import org.apache.spark.SparkContext import org.apache.spark.SparkConf object titlesplit { val rl = "jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456&useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false" classOf[com.mysql.jdbc.Driver] val conn = DriverManager.getConnection(rl) val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE) def main(args: Array[String]) { val conf = new SparkConf().setMaster("local").setAppName("test") val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val format = new java.text.SimpleDateFormat("yyyyMMdd") val yearformat = new java.text.SimpleDateFormat("yyyy") val year=yearformat.format(new java.util.Date().getTime()) val monthformat = new java.text.SimpleDateFormat("MM") val month=monthformat.format(new java.util.Date().getTime()) val dayformat = new java.text.SimpleDateFormat("dd") val day=dayformat.format(new java.util.Date().getTime()) val dat01 = format.format(new java.util.Date().getTime() - 1 * 24 * 60 * 60 * 1000) val dat02 = format.format(new java.util.Date().getTime() - 0 * 24 * 60 * 60 * 1000) val dat03 = format.format(new java.util.Date().getTime() - 2 * 24 * 60 * 60 * 1000) val format2 = new java.text.SimpleDateFormat("yyyy-MM-dd") val dat = format2.format(new java.util.Date().getTime() - 1 * 24 * 60 * 60 * 1000) // val log01= sc.textFile("hdfs://192.168.0.211:9000/user/datacenter/home/datacenter/datacollect/logs/dataplatform/Crawler/Crawler_Common_WebPageNews/"+year+"/"+month+"/"+day+"/events_192.168.0.217_datacenter4.1499879147814") val log01=sc.textFile("hdfs://192.168.0.211:9000/user/datacenter/home/datacenter/datacollect/logs/dataplatform/Crawler/Crawler_Common_WebPageNews/2017/07/14/events_192.168.0.217_datacenter4.1499994258650.gzip") ///user/datacenter/home/datacenter/datacollect/logs/dataplatform/Crawler/Crawler_Common_WebPageNews/2017/07/13 val l=log01.map(line=>(line.split("","")(1).split("":"")(1),line.split("","")(4).split("":"")(1),line.split("","")(12).split("":"")(1) ,line.split("","")(13).split("":"")(1) ,line.split("","")(23).split("":"")(1))) val role = "jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456&useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false" import sqlContext.implicits._ val df=l.toDF("channelType","sourcetitle","title","time","innerSessionId") df.printSchema() df.insertIntoJDBC(role, "newstitles", true) val job = sqlContext.jdbc("jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456", "newstitle") val jo = job.toDF().registerTempTable("job") val ed = sqlContext.sql("select `innerSessionId`,`time`,`channelType`,`sourcetitle`,`title` from job") val pp = ed.map(p => { val v0 = p.getString(0) val v1 = p.getString(1) val v2 = p.getString(2) val v3 = p.getString(3) val v4 = p.getString(4) val v5 = p.getString(4).split("\|") (v0, v1, v2, v3,v4, v5) }) pp.foreach(p => { for (i <- 0 until p._6.size) { println(p._6.size) val v0 = p._1 val v1 = p._2 val v2 = p._3 val v3 = p._4 val v4 = p._5 val v5 = p._6(i).split(" ") if (v5.size == 4) { println("12") insert(v0, v1, v2, v3,v4, v5(0), v5(1), v5(2), v5(3)) } } conn.close() }) def insert(value0: String, value1: String, value2: String, value3: String, value4: String, value5: String, value6: String, value7: String, value8: String): Unit = { println(value0, value1, value2, value3, value4, value5, value6, value7, value8) // CREATE TABLE words2(innersessionId VARCHAR(100),words VARCHAR(100), VARCHAR(100),posit VARCHAR(100),va VARCHAR(100)) try { val prep = conn.prepareStatement("INSERT INTO titlesplit(innserSessionid,times,channelType,sourcetitle,title,words,characters,refer,role) VALUES (?,?,?,?,?,?,?,?,?) ") prep.setString(1, value0) prep.setString(2, value1) prep.setString(3, value2) prep.setString(4, value3) prep.setString(5, value4) prep.setString(6, value5) prep.setString(7, value6) prep.setString(8, value7) prep.setString(9, value8) prep.executeUpdate } catch { case e: Exception => e.printStackTrace } finally { conn.close() } } } }