zoukankan      html  css  js  c++  java
  • spark streaming插入hbase

    import java.sql.{DriverManager, ResultSet}
    
    import org.apache.spark._
    import org.apache.spark.streaming._
    
    import scala.util.Random
    
    import org.apache.hadoop.hbase.{HTableDescriptor,HColumnDescriptor,HBaseConfiguration,TableName}
    import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put,Table}
    
    
    
    object Pi {
    
      val user="root"
      val password = "root"
      val host="10.8.8.123"
      val database="db_1"
      val port=3306
      val conn_str = "jdbc:mysql://"+host +":"+port+"/"+database
    
    
      val tablename="achi"
      val cf="a"
      val qulified="name"
    
    
      def CreatTableIfNotFind(conn:Connection,userTable:TableName): Unit ={
        //从Connection获得 Admin 对象(相当于以前的 HAdmin)
        val admin=conn.getAdmin
    
        if(admin.tableExists(userTable)){
          println("Table exists!")
          //admin.disableTable(userTable)
          //admin.deleteTable(userTable)
          //exit()
        }else{
          val tableDesc=new HTableDescriptor(userTable)
          tableDesc.addFamily(new HColumnDescriptor(cf.getBytes))
          admin.createTable(tableDesc)
          println("Create table success!")
        }
      }
    
      def InsertHbase(table:Table,cf:String,qulified:String,value:String): Unit ={
        val p=new Put("id001".getBytes())
        p.addColumn(cf.getBytes,qulified.getBytes,value.getBytes)
        table.put(p)
      }
    
    
      def main(args: Array[String]) {
        val conf=new SparkConf().setAppName("Spark Streaming").setMaster("local[2]")
        val ssc=new StreamingContext(conf,Seconds(3))
    
        val lines=ssc.socketTextStream("localhost",9999)
        val words=lines.map(_.split('|'))
    
        words.print()
    
    
    
        words.foreachRDD{
          rdd=>rdd.foreachPartition{
            pa=>
              val conf=HBaseConfiguration.create()
              val conn=ConnectionFactory.createConnection(conf)
              val userTable=TableName.valueOf(tablename)
              val table=conn.getTable(userTable)
    
              pa.foreach{
                w=>
                  try{
                    var beg = System.currentTimeMillis()
                    println(w(0)+w(1))
                    InsertHbase(table,cf,w(0),w(1))
                    println("***************************************************************")
                    println(" 耗时: " + (System.currentTimeMillis() - beg)+"ms")
                    println("***************************************************************")
                  }catch{
                    case _:Exception=>println("raw error!")
                  }
              }
              table.close()
              conn.close()
    
          }
        }
    
    
    
        ssc.start()
        ssc.awaitTermination()
    
      /*
        Class.forName("com.mysql.jdbc.Driver").newInstance();
        val conn1 = DriverManager.getConnection(conn_str,user,password)
    
        try {
          val statement = conn1.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
          val rs = statement.executeQuery("select * from achi limit 10")
          while (rs.next) {
            println(rs.getString(1))
          }
        }
        catch {
          case _ : Exception => println("===>")
        }
        finally {
          conn1.close
        }
    
        */
      }
    }
    name := "untitled"
    
    version := "1.0"
    
    scalaVersion := "2.10.6"
    
    libraryDependencies++= Seq(
      "mysql" % "mysql-connector-java" % "5.1.38",
      "org.apache.spark" %% "spark-core" % "1.5.2",
      "org.apache.spark" %% "spark-streaming" % "1.5.2",
      "org.apache.hbase" % "hbase-client" % "1.1.3",
      "org.apache.hbase" % "hbase-common" % "1.1.3",
      "org.apache.hbase" % "hbase-server" % "1.1.3"
    )
    
    
    resolvers+="OS China" at "http://maven.oschina.net/content/groups/public/"
  • 相关阅读:
    Python Semaphore
    Python 互斥锁
    Python 递归锁
    Python GIL锁
    Python 线程调用
    进程与线程
    Python paramiko模块
    Python SocketServer模块
    MonoDevelop with Visual Studio to Linux and Mac OSX maintaining a single code base for all platforms.
    mime大全收集
  • 原文地址:https://www.cnblogs.com/ggzone/p/10121138.html
Copyright © 2011-2022 走看看