zoukankan      html  css  js  c++  java
  • pyspark连接mysql

    from pyspark import SparkContext  
    from pyspark.sql import SQLContext 
    
    if __name__=="__main__":
        sc = SparkContext(appName="local")  
        sqlContext = SQLContext(sc)  
        df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/test?user=root&password=root",dbtable="test_customer").load()  
        df.show() 
        sc.stop()

    如果报错 no suitable driver

    需要把连接mysqljdbcjar包拷到spark文件夹里的jars文件夹里。

    附:使用SQL查询的代码

    from pyspark import SparkContext  
    from pyspark.sql import SQLContext 
    
    if __name__=="__main__":
        sc = SparkContext(appName="local")  
        sqlContext = SQLContext(sc)  
        df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/test?user=root&password=root",dbtable="test_customer").load()  
        df.registerTempTable("test1");
        ls = sqlContext.sql("select * from test1 where did = 1").collect()
        for it in ls:
            print("1")
        sc.stop()

     再附,HiveContext的使用及RDD转DataFrame:

    from pyspark import SparkContext  
    from pyspark.sql import HiveContext,SQLContext,Row 
    
    if __name__=="__main__":
        sc = SparkContext(appName="local")
        hc = HiveContext(sc) #HiveContext
        sqlContext = SQLContext(sc) #SqlContext
        datas = ["1 a 28","2 b 29", "3 c 30"]
        source = sc.parallelize(datas) #加载数组
        splits = source.map(lambda line: line.split(" ")) #map方法返回的RDD格式的数据
        rows = splits.map(lambda words: Row(id = words[0], name = words[1], age = words[2]))
        structType = hc._inferSchema(rows);  #获得StructType
        people = sqlContext.createDataFrame(rows, structType)  #通过StructType和查询出来的数据转换成DataFrame
        people.registerTempTable("people") #注册表
        results = hc.sql("select name from people").collect()
        #results1 = results.map(lambda row: row.name.upper()).collect()
        for result in results:
            print("name:"+result.name)
        sc.stop()
  • 相关阅读:
    一张图告诉你为什么是服务网关,文末有现金抽奖。
    Java中的宏变量,宏替换详解。
    Java中创建String的两道面试题及详解
    JSON Web Token (JWT),服务端信息传输安全解决方案。
    jdk紧急漏洞,XMLDecoder反序列化攻击
    Java对象引用四个级别(强、软、弱、虚)
    Java7任务并行执行神器:Fork&Join框架
    (2)Django-pycharm部署
    批处理编写
    (1)Django安装
  • 原文地址:https://www.cnblogs.com/wpcnblog/p/8086505.html
Copyright © 2011-2022 走看看