zoukankan      html  css  js  c++  java
  • Spark 与 JDBC、Hbase之间的交互

    JDBC

    以MySQL为例

    读取

    import java.sql.DriverManager
    
    import org.apache.spark.rdd.JdbcRDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * Author yangxu
     * Date 2020/5/9 10:23
     */
    object JdbcRead {
        def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setAppName("JdbcRead").setMaster("local[2]")
            val sc: SparkContext = new SparkContext(conf)
            
            val driver = "com.mysql.jdbc.Driver"
            val url = "jdbc:mysql://hadoop102:3306/rdd"
            val user = "root"
            val pw = "aaaaaa"
            val rdd = new JdbcRDD(
                sc,
                () => {
                    Class.forName(driver)
                    DriverManager.getConnection(url, user, pw)
                    // 千万不要关闭连接
                },
                "select id, name from user where id >= ? and id <= ?",//?是占位符
                1,//对应第一个 ?
                10,//对应第二个 ?
                2,
                row => {
                    (row.getInt("id"), row.getString("name"))
                }//读到的数据类型是Set集合
            )
            
            rdd.collect.foreach(println)
            
            sc.stop()
            /*
            jdbc编程:
                加载启动
                class.forName(..)
                DiverManager.get...
                conn.prestat..
                    ...
                    pre.ex
                    resultSet
             */
            
        }
    }

    写入

    import java.sql.DriverManager
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * Author yangxu
     * Date 2020/5/9 10:43
     */
    object JdbcWrite {
        val driver = "com.mysql.jdbc.Driver"
        val url = "jdbc:mysql://hadoop102:3306/rdd"
        val user = "root"
        val pw = "aaaaaa"
        
        def main(args: Array[String]): Unit = {
            // 把rdd的数据写入到mysql
            val conf: SparkConf = new SparkConf().setAppName("JdbcWrite").setMaster("local[2]")
            val sc: SparkContext = new SparkContext(conf)
            // wordCount, 然后把wordCount的数据写入到mysql
            val wordCount = sc
                .textFile("c:/1128.txt")
                .flatMap(_.split("\W+"))
                .map((_, 1))
                .reduceByKey(_ + _, 3)
    
            val sql = "insert into word_count1128 values(?, ?)"
            //一次写入一条,效率较低
            /*wordCount.foreachPartition( it => {
                // it就是存储的每个分区数据
                // 建立到mysql的连接
                Class.forName(driver)
                // 获取连接
                val conn = DriverManager.getConnection(url, user, pw)
                
                it.foreach{
                    case (word, count) =>
                        val ps = conn.prepareStatement(sql)
                        ps.setString(1, word)
                        ps.setInt(2, count)
                        ps.execute()
                        ps.close()
                }
                conn.close()
            } )*/
            //最终优化版本
            wordCount.foreachPartition(it => {
                // it就是存储的每个分区数据
                // 建立到mysql的连接
                Class.forName(driver)
                // 获取连接
                val conn = DriverManager.getConnection(url, user, pw)
                val ps = conn.prepareStatement(sql)
                var max = 0 // 最大批次
                it.foreach {
                    case (word, count) =>
                        ps.setString(1, word)
                        ps.setInt(2, count)
                        //批处理
                        ps.addBatch()
                        max += 1
                        if (max >= 10) {
                            ps.executeBatch()
                            ps.clearBatch()
                            max = 0
                        }
                }
                // 最后一次不到提交的上限,  做收尾
                ps.executeBatch()
                conn.close()
            })
            
            
            sc.stop()
            
        }
    }

    Hbase

    读取

    import java.util
    
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    import org.json4s.DefaultFormats
    import org.json4s.jackson.Serialization
    
    /**
     * Author yangxu
     * Date 2020/5/9 13:41
     **/
    object HbaseRead {
        def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setAppName("HbaseRead").setMaster("local[2]")
            val sc: SparkContext = new SparkContext(conf)
    
            val hbaseConf: Configuration = HBaseConfiguration.create()
          //配置参数
          hbaseConf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104") // zookeeper配置
            //要读取的表名
            hbaseConf.set(TableInputFormat.INPUT_TABLE, "student")
    
            // 通用的读法 noSql key-value cf
            val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
                hbaseConf,
                fClass = classOf[TableInputFormat], // InputFormat的类型
            //k v  类型,基本固定的格式
                kClass = classOf[ImmutableBytesWritable],
                vClass = classOf[Result]
            )
    
            val rdd2: RDD[String] = hbaseRDD.map {
                case (ibw, result) =>
                    //                Bytes.toString(ibw.get())
                    // 把每一行所有的列都读出来, 然后放在一个map中, 组成一个json字符串
                    var map: Map[String, String] = Map[String, String]()
                    // 先把row放进去
                    map += "rowKey" -> Bytes.toString(ibw.get())
                    // 拿出来所有的列
                    val cells: util.List[Cell] = result.listCells()
                    // 导入里面的一些隐式转换函数, 可以自动把java的集合转成scala的集合
                    import scala.collection.JavaConversions._
                    for (cell <- cells) { // for循环, 只支持scala的集合
                        val key: String = Bytes.toString(CellUtil.cloneQualifier(cell))
                        val value: String = Bytes.toString(CellUtil.cloneValue(cell))
                        map += key -> value
                    }
                    // 把map序列化成json字符串
                    // json4s 专门为scala准备的json工具
                    implicit val d: DefaultFormats = org.json4s.DefaultFormats
                    Serialization.write(map)
            }
            //        rdd2.collect.foreach(println)
            rdd2.saveAsTextFile("./hbase")
            sc.stop()
    
        }
    }

    写入

    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Put
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
    import org.apache.hadoop.hbase.util.Bytes._
    import org.apache.hadoop.mapreduce.Job
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * Author yangxu
     * Date 2020/5/9 13:41
     */
    object HbaseWrite {
        def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setAppName("HbaseWrite").setMaster("local[2]")
            val sc: SparkContext = new SparkContext(conf)
            
            val list: List[(String, String, String, String)] = List(
                ("2100", "zs", "male", "10"),
                ("2101", "li", "female", "11"),
                ("2102", "ww", "male", "12"))
            val rdd1: RDD[(String, String, String, String)] = sc.parallelize(list)
            
            // 把数据写入到Hbase
            // rdd1做成kv形式
            val resultRDD: RDD[(ImmutableBytesWritable, Put)] = rdd1.map {
                case (rowKey, name, gender, age) =>
                    val rk: ImmutableBytesWritable = new ImmutableBytesWritable()
                    //为rowkey赋值
                    rk.set(toBytes(rowKey))
                    val put: Put = new Put(toBytes(rowKey))
                    //为各个列赋值
                    put.addColumn(toBytes("cf"), toBytes("name"), toBytes(name))
                    put.addColumn(toBytes("cf"), toBytes("gender"), toBytes(gender))
                    put.addColumn(toBytes("cf"), toBytes("age"), toBytes(age))
                    (rk, put)
            }
            
            val hbaseConf: Configuration = HBaseConfiguration.create()
            hbaseConf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104") // zookeeper配置
            hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "student")  // 输出表
            // 通过job来设置输出的格式的类
            val job: Job = Job.getInstance(hbaseConf)
            job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
            job.setOutputKeyClass(classOf[ImmutableBytesWritable])
            job.setOutputValueClass(classOf[Put])
            
            resultRDD.saveAsNewAPIHadoopDataset(conf = job.getConfiguration)
            
            sc.stop()
            
        }
    }
  • 相关阅读:
    netcore 报错 502 缺少运行时
    简单工厂模式
    net之-------状态模式
    pc端字体正常, 缩放浏览器正常,手机模式查看出问题
    我的后续情况
    [wip]Berty
    利用FileReader对象回显图片
    测试
    CMP云管平台竞标产品
    nacos spring cloud
  • 原文地址:https://www.cnblogs.com/yangxusun9/p/12860359.html
Copyright © 2011-2022 走看看