from pyspark import SparkContext from pyspark.sql import SQLContext if __name__=="__main__": sc = SparkContext(appName="local") sqlContext = SQLContext(sc) df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/test?user=root&password=root",dbtable="test_customer").load() df.show() sc.stop()
如果报错 no suitable driver
需要把连接mysql的jdbc的jar包拷到spark文件夹里的jars文件夹里。
附:使用SQL查询的代码
from pyspark import SparkContext from pyspark.sql import SQLContext if __name__=="__main__": sc = SparkContext(appName="local") sqlContext = SQLContext(sc) df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/test?user=root&password=root",dbtable="test_customer").load() df.registerTempTable("test1"); ls = sqlContext.sql("select * from test1 where did = 1").collect() for it in ls: print("1") sc.stop()
再附,HiveContext的使用及RDD转DataFrame:
from pyspark import SparkContext from pyspark.sql import HiveContext,SQLContext,Row if __name__=="__main__": sc = SparkContext(appName="local") hc = HiveContext(sc) #HiveContext sqlContext = SQLContext(sc) #SqlContext datas = ["1 a 28","2 b 29", "3 c 30"] source = sc.parallelize(datas) #加载数组 splits = source.map(lambda line: line.split(" ")) #map方法返回的RDD格式的数据 rows = splits.map(lambda words: Row(id = words[0], name = words[1], age = words[2])) structType = hc._inferSchema(rows); #获得StructType people = sqlContext.createDataFrame(rows, structType) #通过StructType和查询出来的数据转换成DataFrame people.registerTempTable("people") #注册表 results = hc.sql("select name from people").collect() #results1 = results.map(lambda row: row.name.upper()).collect() for result in results: print("name:"+result.name) sc.stop()