zoukankan      html  css  js  c++  java
  • spark MySQL jar 包

    /**
      * Created by songcl on 2016/6/24.
      */
    import java.sql.DriverManager
    //val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import  org.apache.spark.sql.SQLContext
    import org.apache.spark.{SparkConf, SparkContext}
    object insertmysql {
    
      def main(args:Array[String]): Unit = {
        //classOf[com.mysql.jdbc.Driver]
        // Class.forName("com.mysql.jdbc.Driver").newInstance();
        val url = "jdbc:mysql://10.0.73.46:3306/falcon?user=data&password=datass"
        val conf = new SparkConf().setAppName("Simple Application")//.setMaster("spark://10.0.58.21:7077")
        val sc= new SparkContext(conf)
        // val conn = DriverManager.getConnection(url)
        // val conn2=DriverManager.getConnection(url)
        val format = new java.text.SimpleDateFormat("yyyy/MM/dd")
        val dat = format.format(new java.util.Date())
        val log1 = sc.textFile("hdfs://10.0.58.21:9000/falcon/" + dat + "/*.log")
        //val log1 =sc.textFile( "hdfs://10.0.58.21:9000/falcon/"+dat+"/*.log")
        // println(log1.count())
        //val sqlContext = new org.apache.spark.sql.SQLContext(sc1)
        //val conf = new SparkConf().setAppName("Simple Application").setMaster("spark://10.0.58.21:7077")
        // val sc = new SparkContext(conf)
        // val log=sc.textFile(logFile)
        //val t= log.take(2)
        //val log1=sc.textFile("hdfs://10.0.58.21:9000/falcon/2016/*/*/*.log")
        val format2 = new java.text.SimpleDateFormat("yyyyMMdd")
        val dat2 = format2.format(new java.util.Date())
        //val log2=sc1.textFile("hdfs://10.0.58.21:9000/user/yejin/logstash."+dat2+".log")
        val log2 = sc.textFile("hdfs://10.0.58.21:9000/falcon/" + dat + "/*.log")
        import sqlContext.implicits
        val rowRDD=log2.map(line=>(line.split(""message":"").last.split(" ").head.trim(),line.split("account: ").last.split(", args:").head)).filter({case(k,v) =>k.length==10 && !k.contains("TypeError:")}).filter({case(k,v)=>v.length==8}).distinct
        //import sqlContext.implicits
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        import sqlContext.implicits._
        val df=rowRDD.toDF("created","user_id")
        df.insertIntoJDBC(url,"userlog",false)
        //println(log2.count())
      }}

       参考链接

     http://www.jianshu.com/p/b1a709a57faa

        提交包,前提是要配置环境变量

    export SPARK_CLASSPATH=$SPARK_CLASSPATH:/usr/local/spark/mysql/mysql.jar

    spark-submit  --class insertmysql /home/deploy/mysqltest.jar

  • 相关阅读:
    WINFORM实现进程信息的查看,listview,点击,右键,右键菜单
    VFP 实验参考答案
    C++ 大作业 超市收银系统
    ASP实现计算机爱好者网站,可以直接浏览
    C语言实验单片机串口发送int型数据
    超声波 HC-SR04
    django应用之corsheaders[跨域设置]
    git
    Your activation code could not be validated(error 1653219)
    Django Rest Framework简介及初步使用
  • 原文地址:https://www.cnblogs.com/canyangfeixue/p/5613742.html
Copyright © 2011-2022 走看看