zoukankan      html  css  js  c++  java
  • spark MySQL jar 包

    /**
      * Created by songcl on 2016/6/24.
      */
    import java.sql.DriverManager
    //val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import  org.apache.spark.sql.SQLContext
    import org.apache.spark.{SparkConf, SparkContext}
    object insertmysql {
    
      def main(args:Array[String]): Unit = {
        //classOf[com.mysql.jdbc.Driver]
        // Class.forName("com.mysql.jdbc.Driver").newInstance();
        val url = "jdbc:mysql://10.0.73.46:3306/falcon?user=data&password=datass"
        val conf = new SparkConf().setAppName("Simple Application")//.setMaster("spark://10.0.58.21:7077")
        val sc= new SparkContext(conf)
        // val conn = DriverManager.getConnection(url)
        // val conn2=DriverManager.getConnection(url)
        val format = new java.text.SimpleDateFormat("yyyy/MM/dd")
        val dat = format.format(new java.util.Date())
        val log1 = sc.textFile("hdfs://10.0.58.21:9000/falcon/" + dat + "/*.log")
        //val log1 =sc.textFile( "hdfs://10.0.58.21:9000/falcon/"+dat+"/*.log")
        // println(log1.count())
        //val sqlContext = new org.apache.spark.sql.SQLContext(sc1)
        //val conf = new SparkConf().setAppName("Simple Application").setMaster("spark://10.0.58.21:7077")
        // val sc = new SparkContext(conf)
        // val log=sc.textFile(logFile)
        //val t= log.take(2)
        //val log1=sc.textFile("hdfs://10.0.58.21:9000/falcon/2016/*/*/*.log")
        val format2 = new java.text.SimpleDateFormat("yyyyMMdd")
        val dat2 = format2.format(new java.util.Date())
        //val log2=sc1.textFile("hdfs://10.0.58.21:9000/user/yejin/logstash."+dat2+".log")
        val log2 = sc.textFile("hdfs://10.0.58.21:9000/falcon/" + dat + "/*.log")
        import sqlContext.implicits
        val rowRDD=log2.map(line=>(line.split(""message":"").last.split(" ").head.trim(),line.split("account: ").last.split(", args:").head)).filter({case(k,v) =>k.length==10 && !k.contains("TypeError:")}).filter({case(k,v)=>v.length==8}).distinct
        //import sqlContext.implicits
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        import sqlContext.implicits._
        val df=rowRDD.toDF("created","user_id")
        df.insertIntoJDBC(url,"userlog",false)
        //println(log2.count())
      }}

       参考链接

     http://www.jianshu.com/p/b1a709a57faa

        提交包,前提是要配置环境变量

    export SPARK_CLASSPATH=$SPARK_CLASSPATH:/usr/local/spark/mysql/mysql.jar

    spark-submit  --class insertmysql /home/deploy/mysqltest.jar

  • 相关阅读:
    使用Node.js和Redis实现push服务--转载
    phpredis中文手册——《redis中文手册》 php版--引用他人
    nginx path_info问题解决
    mysql 获取一个表中缺失的最小编号
    mysql row number的实现
    Redis应用场景-转载
    从MySQL到Redis 提升数据迁移的效率
    hdu2057
    进制转换
    有关C语言指针访问问题
  • 原文地址:https://www.cnblogs.com/canyangfeixue/p/5613742.html
Copyright © 2011-2022 走看看