1、启动spark shell
spark-shell --master spark://s101:7077
2、选择数据库
spark.sql("use spark").show
3、生成DF
scala> val df = spark.sql("select * from orders") df: org.apache.spark.sql.DataFrame = [oid: int, num: string ... 2 more fields]
4、打印数据
scala> df.show +---+-----+-----+---+ |oid| num|price|cid| +---+-----+-----+---+ | 1|no001| 12.3| 7| | 2|no002| 18.8| 4| +---+-----+-----+---+
5.1、输出为json:切为local模式
df.write.json("file:///home/centos/myspark/json")
读取json文件
scala> val dfRead = spark.read.json("file:///home/centos/myspark/json") dfRead: org.apache.spark.sql.DataFrame = [cid: bigint, num: string ... 2 more fields] scala> dfRead.show +---+-----+---+-----+ |cid| num|oid|price| +---+-----+---+-----+ | 7|no001| 1| 12.3| | 4|no002| 2| 18.8| | 3|no003| 3| 20.0| | 7|no004| 4| 50.0| | 2|no005| 5| 23.1| | 3|no006| 6| 39.0| | 2|no007| 7| 5.0| | 1|no008| 8| 6.0| +---+-----+---+-----+
5.2、保存为parquet格式
df.write.parquet("file:///home/centos/myspark/par")
读取parquet文件
scala> val dfReadPar = spark.read.parquet("file:///home/centos/myspark/par") dfReadPar: org.apache.spark.sql.DataFrame = [oid: int, num: string ... 2 more fields] scala> dfReadPar.show +---+-----+-----+---+ |oid| num|price|cid| +---+-----+-----+---+ | 1|no001| 12.3| 7| | 2|no002| 18.8| 4| | 3|no003| 20.0| 3| | 4|no004| 50.0| 7| | 5|no005| 23.1| 2| | 6|no006| 39.0| 3| | 7|no007| 5.0| 2| | 8|no008| 6.0| 1| +---+-----+-----+---+
5.3、操纵jdbc数据库
//读取表数据
scala> val prop = new java.util.Properties() prop: java.util.Properties = {} scala> prop.put("driver" , "com.mysql.jdbc.Driver") res5: Object = null scala> prop.put("user" , "root") res6: Object = null scala> prop.put("password" , "root") res7: Object = null scala> val df = spark.read.jdbc("jdbc:mysql://s101:3306/lx" , "wc1" ,prop)
scala> df.show
写入到i表
scala> val dfRead = spark.read.json("file:///home/centos/myspark/json") dfRead: org.apache.spark.sql.DataFrame = [cid: bigint, num: string ... 2 more fields] scala> dfRead.write. bucketBy format jdbc mode options parquet save sortBy csv insertInto json option orc partitionBy saveAsTable text scala> dfRead.write.jdbc def jdbc(url: String,table: String,connectionProperties: java.util.Properties): Unit scala> dfRead.write.jdbc("jdbc:mysql://s101:3306/lx" , "wuyong" ,prop)
查看
mysql> select * from wuyong; +------+-------+------+-------+ | cid | num | oid | price | +------+-------+------+-------+ | 7 | no001 | 1 | 12.3 | | 4 | no002 | 2 | 18.8 | | 3 | no003 | 3 | 20 | | 7 | no004 | 4 | 50 | | 2 | no005 | 5 | 23.1 | | 3 | no006 | 6 | 39 | | 2 | no007 | 7 | 5 | | 1 | no008 | 8 | 6 | +------+-------+------+-------+