CREATE TABLE `middle` ( `id` INT(10) UNSIGNED NOT NULL AUTO_INCREMENT, `innserSessionid` VARCHAR(250), `times` VARCHAR(250), `number` VARCHAR(250), `category` VARCHAR(250), `svalue` DECIMAL(32,7), `wordscount` BIGINT, `categoryscount` BIGINT, `rank` DECIMAL(24,4), PRIMARY KEY (`id`) ) DEFAULT CHARSET=utf8; ALTER TABLE `middle` ADD INDEX(`category`);
/** * Created by lkl on 2017/7/31. */ /** * Created by lkl on 2017/6/26. *///spark-shell --driver-class-path /home/hadoop/test/mysqljdbc.jar import java.math.BigDecimal import java.sql.{DriverManager, ResultSet} import org.apache.spark.SparkContext import org.apache.spark.SparkConf import java.text.SimpleDateFormat import java.util.Date object middle { val rl = "jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456&useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false" classOf[com.mysql.jdbc.Driver] val conn = DriverManager.getConnection(rl) def main(args: Array[String]) { val conf = new SparkConf().setMaster("local").setAppName("test") val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val titlesplit1 = sqlContext.jdbc("jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456", "titlesplit") val titlesplit = titlesplit1.toDF().registerTempTable("titlesplit") val category1 = sqlContext.jdbc("jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456", "categorys") val category = category1.toDF().registerTempTable("categorys") val layer1 = sqlContext.jdbc("jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456", "layer") val layer = layer1.toDF().registerTempTable("layer") val format = new java.text.SimpleDateFormat("yyyyMMdd") val date = format.format(new java.util.Date().getTime()) import sqlContext.implicits._ val value = sqlContext.sql("SELECT titlesplit.`innserSessionid`," + "titlesplit.`times`,categorys.`number`," + "categorys.`category`,SUM(layer.`VALUE`) svalue," + "COUNT(DISTINCT titlesplit.`id`) AS wordscount," + "COUNT(DISTINCT categorys.`id`) AS categoryscount," + "COUNT(DISTINCT categorys.`id`)/COUNT(DISTINCT titlesplit.`id`) AS rank" + " FROM titlesplit LEFT JOIN categorys " + " ON titlesplit.`words`=categorys.`words`" + " LEFT JOIN `layer` ON titlesplit.`words`=layer.`words` " + "GROUP BY titlesplit.`innserSessionid`,titlesplit.`times`," + "categorys.`number`,categorys.`category`").toDF("innserSessionid", "times", "category", "number", "svalue", "wordscount", "categoryscount", "rank") // value.insertIntoJDBC(rl,"middles",false) //val jo = value.toDF("innserSessionid", "times", "category", "svalue", "wordscount", "categoryscount", "rank") val a=value.count() print(a) val p1 = value.map(p => { val v0 = p.getString(0) val v1 = p.getString(1) val v2 = p.getString(2) val v3 = p.getString(3) val v4 = p.getDecimal(4) var v5 = p.getLong(5) val v6 = p.getLong(6) val v7 = p.getDouble(7) (v0,v1,v2,v3,v4,v5,v6,v7) }) p1.foreach(p => { val v1=p._1 val v2=p._2 val v3=p._3 val v4=p._4 val v5=p._5 val v6=p._6 val v7=p._7 val v8=p._8 insert(v1,v2,v3,v4,v5,v6,v7,v8) }) conn.close() } def insert (value0:String,value1:String,value2:String,value3:String,value4:BigDecimal,value5:Long,value6:Long,value7:Double): Unit ={ val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE) // CREATE TABLE words2(innersessionId VARCHAR(100),words VARCHAR(100), VARCHAR(100),posit VARCHAR(100),va VARCHAR(100)) try { val prep = conn.prepareStatement("INSERT INTO middle(innserSessionid,times,number,category,svalue,wordscount,categoryscount,rank) VALUES (?,?,?,?,?,?,?,?) ") prep.setString(1,value0) prep.setString(2,value1) prep.setString(3,value2) prep.setString(4,value3) prep.setBigDecimal(5,value4) prep.setLong(6,value5) prep.setLong(7,value6) prep.setDouble(8,value7) prep.executeUpdate } catch{ case e:Exception =>e.printStackTrace } finally { } } }