zoukankan      html  css  js  c++  java
  • pyspark操作数据库

    使用pyspark连接数据库

    # spark操作数据库
    from pyspark.sql import SQLContext, SparkSession
    from pyspark import SparkConf, SparkContext
    
    # spark地址
    master_url = "spark://hdp-100:7777"
    conf = SparkConf().setAppName("mainProject").setMaster(master_url).set("spark.sql.execution.arrow.enabled", "true")
    sc = SparkContext.getOrCreate(conf)
    sql_context = SQLContext(sc)
    url = "jdbc:mysql://192.168.130.77:3306/paixin?useSSL=false&useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&allowPublicKeyRetrieval=true"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = "123456"
    table = "paixin"
    properties = {"user": user, "password": password}
    df = sql_context.read.jdbc(url=url, table=table, properties=properties)
    print(df)
    # print(df.select('id'))
    print(df.show(10))
    sql = "select * from paixin where id < 5000"
    print(sql)
    df.createOrReplaceTempView(table)
    df2 = sql_context.sql(sql)
    
    print(df2.collect())
    
    

    操作

    from pyspark.sql import SQLContext
    from pyspark import SparkConf, SparkContext
    import pandas as pd
    
    # 设置spark地址
    master_url = "spark://hdp-100:7777"
    conf = SparkConf().setAppName("paixin").setMaster(master_url).set("spark.sql.execution.arrow.enabled", "true")
    sc = SparkContext.getOrCreate(conf)
    sql_context = SQLContext(sc)
    url = "jdbc:mysql://192.168.130.77:3306/paixin?useSSL=false&useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&allowPublicKeyRetrieval=true"
    user = "root"
    password = "123456"
    table = "paixin"
    properties = {"user": user, "password": password}
    df = sql_context.read.jdbc(url=url, table=table, properties=properties)
    print(df)
    print(df.show(10))
    print("*" * 100)
    print(df.select('store_path').count())
    print(df.select("store_path").distinct().count())
    print("*" * 100)
    print(df)
    print(type(df))
    print(df.filter(df.store_path == "D:图片拍信/交通运输/公路交通在日落.jpg").show(truncate=False))
    print(df.orderBy(df.img_id.desc()).show(truncate=False, n=100))
    # show函数默认打印20行,可以指定,truncate=False显示所有信息
    print(df.show(truncate=False))
    print("*" * 20, "1.show", "*" * 20)
    print(df.show(truncate=False))
    print("*" * 20, "2.以树的形式打印概要", "*" * 20)
    print(df.printSchema())
    print("*" * 20, "3.获取头几行到本地", "*" * 20)
    print(df.head(3))
    print(df.take(5))
    print("*" * 20, "4.查询总行数", "*" * 20)
    print(df.count())
    print("*" * 20, "5.取别名", "*" * 20)
    print(df.select(df.img_id.alias("图片id"), 'img_url').show())
    print("*" * 20, "6.查询某列为null的行", "*" * 20)
    from pyspark.sql.functions import isnull
    print(df.filter(isnull('img_id')).show())
    print("*" * 20, "7.收集到本地,耗费资源", "*" * 20)
    # print(df.collect())
    print("*" * 20, "8.查询概况", "*" * 20)
    print(df.describe().show())
    print("*" * 20, "9.去重set操作", "*" * 20)
    print(df.select("img_id").distinct().show())
    print("*" * 20, "10.选择一列或多列", "*" * 20)
    # print(df['img_id'].show()) x
    # print(df.img_id.show()) x
    print(df.select('img_id').show())
    print(df.select('img_id', 'img_url').show())
    print("*" * 20, "11.可以用where按条件选择", "*" * 20)
    print(df.where("store_path like '%交通运输%'").show(truncate=False))
    print("*" * 20, "12.排序,默认按升序orderBy和sort", "*" * 20)
    print(df.orderBy(df.id.desc()).show(truncate=False))
    print("*" * 20, "13.抽样", "*" * 20)
    print(df.sample(False, 0.2, 42).show())
    print(df.sample(False, 0.2, 43).show())
    print("*" * 20, "14.when满足条件和不满足条件应该怎么赋值", "*" * 20)
    from pyspark.sql import functions as F
    print(df.select(df.store_path, F.when(df.id < 3400, 1).when(df.id > 3500, -1).otherwise(0)).show(300))
    print("*" * 20, "15.between筛选出某个范围的值,返回的市TRUE or FALSE", "*" * 20)
    print(df.select(df.store_path, df.id.between(3400, 3456)).show(100))
    print("*" * 20, "16.筛选特定行数", "*" * 20)
    # 1.先添加索引
    from pyspark.sql.functions import monotonically_increasing_id
    dfWithIndex = df.withColumn("id", monotonically_increasing_id())
    print(dfWithIndex.select(dfWithIndex.store_path, dfWithIndex.id.between(50, 100)).show())
    print("*" * 20, "17.apply函数", "*" * 20)
    
    
  • 相关阅读:
    手把手教你创建ASP.NET MVC Dashboard应用
    DevExpress ASP.NET v20.2版本亮点放送:甘特图控件全面升级
    .NET 6已到来?Telerik WinForm率先支持
    手把手教你创建一个Vue Dashboard应用
    Kendo UI for jQuery数据管理使用教程:更改PivotGrid字段名称
    现代应用的启动屏幕如何更美观?这款第三方控件你使用了吗?
    VS插件CodeRush v20.2.8正式发布,支持新的代码模板
    这个三方控件,让你的ASP.NET应用图表界面更酷炫
    nginx负载均衡技术基础
    面向过程的代码请不要拆分成多个脚本
  • 原文地址:https://www.cnblogs.com/hziwei/p/12837326.html
Copyright © 2011-2022 走看看