JDBC
以MySQL为例
读取
import java.sql.DriverManager import org.apache.spark.rdd.JdbcRDD import org.apache.spark.{SparkConf, SparkContext} /** * Author yangxu * Date 2020/5/9 10:23 */ object JdbcRead { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("JdbcRead").setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://hadoop102:3306/rdd" val user = "root" val pw = "aaaaaa" val rdd = new JdbcRDD( sc, () => { Class.forName(driver) DriverManager.getConnection(url, user, pw) // 千万不要关闭连接 }, "select id, name from user where id >= ? and id <= ?",//?是占位符 1,//对应第一个 ? 10,//对应第二个 ? 2, row => { (row.getInt("id"), row.getString("name")) }//读到的数据类型是Set集合 ) rdd.collect.foreach(println) sc.stop() /* jdbc编程: 加载启动 class.forName(..) DiverManager.get... conn.prestat.. ... pre.ex resultSet */ } }
写入
import java.sql.DriverManager import org.apache.spark.{SparkConf, SparkContext} /** * Author yangxu * Date 2020/5/9 10:43 */ object JdbcWrite { val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://hadoop102:3306/rdd" val user = "root" val pw = "aaaaaa" def main(args: Array[String]): Unit = { // 把rdd的数据写入到mysql val conf: SparkConf = new SparkConf().setAppName("JdbcWrite").setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) // wordCount, 然后把wordCount的数据写入到mysql val wordCount = sc .textFile("c:/1128.txt") .flatMap(_.split("\W+")) .map((_, 1)) .reduceByKey(_ + _, 3) val sql = "insert into word_count1128 values(?, ?)" //一次写入一条,效率较低 /*wordCount.foreachPartition( it => { // it就是存储的每个分区数据 // 建立到mysql的连接 Class.forName(driver) // 获取连接 val conn = DriverManager.getConnection(url, user, pw) it.foreach{ case (word, count) => val ps = conn.prepareStatement(sql) ps.setString(1, word) ps.setInt(2, count) ps.execute() ps.close() } conn.close() } )*/ //最终优化版本 wordCount.foreachPartition(it => { // it就是存储的每个分区数据 // 建立到mysql的连接 Class.forName(driver) // 获取连接 val conn = DriverManager.getConnection(url, user, pw) val ps = conn.prepareStatement(sql) var max = 0 // 最大批次 it.foreach { case (word, count) => ps.setString(1, word) ps.setInt(2, count) //批处理 ps.addBatch() max += 1 if (max >= 10) { ps.executeBatch() ps.clearBatch() max = 0 } } // 最后一次不到提交的上限, 做收尾 ps.executeBatch() conn.close() }) sc.stop() } }
Hbase
读取
import java.util import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.json4s.DefaultFormats import org.json4s.jackson.Serialization /** * Author yangxu * Date 2020/5/9 13:41 **/ object HbaseRead { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("HbaseRead").setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val hbaseConf: Configuration = HBaseConfiguration.create() //配置参数 hbaseConf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104") // zookeeper配置 //要读取的表名 hbaseConf.set(TableInputFormat.INPUT_TABLE, "student") // 通用的读法 noSql key-value cf val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD( hbaseConf, fClass = classOf[TableInputFormat], // InputFormat的类型 //k v 类型,基本固定的格式 kClass = classOf[ImmutableBytesWritable], vClass = classOf[Result] ) val rdd2: RDD[String] = hbaseRDD.map { case (ibw, result) => // Bytes.toString(ibw.get()) // 把每一行所有的列都读出来, 然后放在一个map中, 组成一个json字符串 var map: Map[String, String] = Map[String, String]() // 先把row放进去 map += "rowKey" -> Bytes.toString(ibw.get()) // 拿出来所有的列 val cells: util.List[Cell] = result.listCells() // 导入里面的一些隐式转换函数, 可以自动把java的集合转成scala的集合 import scala.collection.JavaConversions._ for (cell <- cells) { // for循环, 只支持scala的集合 val key: String = Bytes.toString(CellUtil.cloneQualifier(cell)) val value: String = Bytes.toString(CellUtil.cloneValue(cell)) map += key -> value } // 把map序列化成json字符串 // json4s 专门为scala准备的json工具 implicit val d: DefaultFormats = org.json4s.DefaultFormats Serialization.write(map) } // rdd2.collect.foreach(println) rdd2.saveAsTextFile("./hbase") sc.stop() } }
写入
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat} import org.apache.hadoop.hbase.util.Bytes._ import org.apache.hadoop.mapreduce.Job import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Author yangxu * Date 2020/5/9 13:41 */ object HbaseWrite { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("HbaseWrite").setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val list: List[(String, String, String, String)] = List( ("2100", "zs", "male", "10"), ("2101", "li", "female", "11"), ("2102", "ww", "male", "12")) val rdd1: RDD[(String, String, String, String)] = sc.parallelize(list) // 把数据写入到Hbase // rdd1做成kv形式 val resultRDD: RDD[(ImmutableBytesWritable, Put)] = rdd1.map { case (rowKey, name, gender, age) => val rk: ImmutableBytesWritable = new ImmutableBytesWritable() //为rowkey赋值 rk.set(toBytes(rowKey)) val put: Put = new Put(toBytes(rowKey)) //为各个列赋值 put.addColumn(toBytes("cf"), toBytes("name"), toBytes(name)) put.addColumn(toBytes("cf"), toBytes("gender"), toBytes(gender)) put.addColumn(toBytes("cf"), toBytes("age"), toBytes(age)) (rk, put) } val hbaseConf: Configuration = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104") // zookeeper配置 hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "student") // 输出表 // 通过job来设置输出的格式的类 val job: Job = Job.getInstance(hbaseConf) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Put]) resultRDD.saveAsNewAPIHadoopDataset(conf = job.getConfiguration) sc.stop() } }