官网地址:http://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html
pyspark.sql module
pyspark.sql.types module
Spark SQL和DataFrames重要的类有:
pyspark.sql.SQLContext DataFrame和SQL方法的主入口
pyspark.sql.DataFrame 将分布式数据集分组到指定列名的数据框中
pyspark.sql.Column DataFrame中的列
pyspark.sql.Row DataFrame数据的行
pyspark.sql.HiveContext 访问Hive数据的主入口
pyspark.sql.GroupedData 由DataFrame.groupBy()创建的聚合方法集
pyspark.sql.DataFrameNaFunctions 处理丢失数据(空数据)的方法
pyspark.sql.DataFrameStatFunctions 统计功能的方法
pyspark.sql.functions DataFrame可用的内置函数
pyspark.sql.types 可用的数据类型列表
pyspark.sql.Window 用于处理窗口函数
1.class pyspark.sql.types.DataType
数据类型的基类。
1.1 fromInternal(obj)
将内部SQL对象转换为本机Python对象。
1.2 json()
1.3 jsonValue()
1.4 needConversion()
这种类型是否需要在Python对象和内部SQL对象之间进行转换?
这用于避免ArrayType / MapType / StructType的不必要的转换。
1.5 simpleString()
1.6 toInternal(obj)
将Python对象转换为内部SQL对象。
2.class pyspark.sql.types.NullType
Null类型
表示无的数据类型,用于无法推断的类型。
3.class pyspark.sql.types.StringType
String 数据类型。
4.class pyspark.sql.types.BinaryType
二进制(字节数组)数据类型。
5.class pyspark.sql.types.BooleanType
Boolean 数据类型。
6.class pyspark.sql.types.DateType
Date (datetime.date) 数据类型。
7.class pyspark.sql.types.TimestampType
Timestamp (datetime.datetime) 数据类型。
8.class pyspark.sql.types.DecimalType(precision=10, scale=0)
Decimal (decimal.Decimal) 数据类型。
DecimalType必须具有固定的精度(最大总位数)和比例(点右边的位数)。 例如,(5,2)可以支持[-999.99至999.99]之间的值。
precision可以达到38,scale要小于或等于precision。
创建DecimalType时,默认的precision和scale是(10,0)。 当从十进制对象中推断模式时,它将是DecimalType(38,18)。
参数:● precision – 最大的总位数 (默认: 10)
● scale – 点右侧的位数 (默认: 0)
9.class pyspark.sql.types.DoubleType
Double 数据类型,表示双精度浮点数。
10.class pyspark.sql.types.FloatType
Float数据类型,表示单精度浮点数。
11.class pyspark.sql.types.ByteType
Byte 数据类型,即单个字节中的有符号整数。
12.class pyspark.sql.types.IntegerType
Int数据类型,即有符号的32位整数。
13.class pyspark.sql.types.LongType
Long数据类型,即有符号的64位整数。
如果数值超出[-9223372036854775808,9223372036854775807]的范围,请使用DecimalType。
14.class pyspark.sql.types.ShortType
Short数据类型,即有符号的16位整数。
15.class pyspark.sql.types.ArrayType(elementType, containsNull=True)
数组数据类型。
参数:● elementType – 数组中每个元素的DataType。
● containsNull – 布尔值,数组是否可以包含null(None)值。
16.class pyspark.sql.types.MapType(keyType, valueType, valueContainsNull=True)
Map数据类型。
参数:● keyType – map中key的数据类型。
● valueType – map中value的数据类型。
● valueContainsNull – 指示values是否可以包含null(无)值。
map数据类型中的键不允许为null(无)。
17.class pyspark.sql.types.StructField(name, dataType, nullable=True, metadata=None)
StructType中的一个字段。
参数:● name – 字符串,字段的名称。
● dataType – 字段的数据类型。
● nullable – boolean,该字段是否可以为null(None)。
● metadata – 从字符串到简单类型的字典,可以自动内部转到JSON
18.class pyspark.sql.types.StructType(fields=None)
结构类型,由StructField的列表组成。
这是表示一个行的数据类型。
18.1 add(field, data_type=None, nullable=True, metadata=None)
通过添加新元素来构造一个StructType来定义schema。 该方法接受:
a:一个参数是一个StructField对象。
b:介于2到4之间的参数(name,data_type,nullable(可选),metadata(可选))。data_type参数可以是String或DataType对象。
>>> from pyspark.sql.types import * >>> struct1 = StructType().add("f1", StringType(), True).add("f2", StringType(), True, None) >>> struct2 = StructType([StructField("f1", StringType(), True), StructField("f2", StringType(), True, None)]) >>> struct1 == struct2 True >>> struct1 = StructType().add(StructField("f1", StringType(), True)) >>> struct2 = StructType([StructField("f1", StringType(), True)]) >>> struct1 == struct2 True >>> struct1 = StructType().add("f1", "string", True) >>> struct2 = StructType([StructField("f1", StringType(), True)]) >>> struct1 == struct2 True
参数:● field – 字段的名称或者StructField对象
● data_type – 如果存在,则创建StructField的DataType
● nullable – 要添加的字段是否可以是nullable (默认True)
● metadata – 任何其他元数据(默认无)
返回:一个新的更新的StructType
pyspark.sql.functions module
内建函数的集合
1.pyspark.sql.functions.abs(col)
计算绝对值。
2.pyspark.sql.functions.acos(col)
计算给定值的余弦逆; 返回的角度在0到π的范围内。
3.pyspark.sql.functions.add_months(start, months)
返回start后months个月的日期
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['d']) >>> df.select(add_months(df.d, 1).alias('d')).collect() [Row(d=datetime.date(2015, 5, 8))]
4.pyspark.sql.functions.approxCountDistinct(col, rsd=None)
返回col的近似不同计数的新列。
>>> l=[('Alice',2),('Bob',5)] >>> df = sqlContext.createDataFrame(l,['name','age']) >>> df.agg(approxCountDistinct(df.age).alias('c')).collect() [Row(c=2)]
5.pyspark.sql.functions.array(*cols)
创建一个新的数组列。
参数:● cols – 列名(字符串)列表或具有相同数据类型的列表达式列表。
>>> df.select(array('age', 'age').alias("arr")).collect() [Row(arr=[2, 2]), Row(arr=[5, 5])] >>> df.select(array([df.age, df.age]).alias("arr")).collect() [Row(arr=[2, 2]), Row(arr=[5, 5])]
6.pyspark.sql.functions.array_contains(col, value)
集合函数:如果数组包含给定值,则返回True。 收集元素和值必须是相同的类型。
参数:● col – 数组中的列名称
● value – 数组中要检查的值
>>> df = sqlContext.createDataFrame([(["a", "b", "c"],), ([],)], ['data']) >>> df.select(array_contains(df.data, "a")).collect() [Row(array_contains(data,a)=True), Row(array_contains(data,a)=False)]
7.pyspark.sql.functions.asc(col)
基于给定列名称的升序返回一个排序表达式。
8.pyspark.sql.functions.ascii(col)
计算字符串列的第一个字符的数值。
9.pyspark.sql.functions.asin(col)
计算给定值的正弦倒数; 返回的角度在负π/ 2到π/ 2的范围内。
10.pyspark.sql.functions.atan(col)
计算给定值的正切倒数。
11.pyspark.sql.functions.atan2(col1, col2)
返回直角坐标(x,y)到极坐标(r,theta)转换的角度theta。
12.pyspark.sql.functions.avg(col)
聚合函数:返回组中的值的平均值。
13.pyspark.sql.functions.base64(col)
计算二进制列的BASE64编码,并将其作为字符串列返回。
14.pyspark.sql.functions.bin(col)
返回给定列的二进制值的字符串表示形式
>>> l=[('Alice',2),('Bob',5)] >>> df = sqlContext.createDataFrame(l,['name','age']) >>> df.select(bin(df.age).alias('c')).collect() [Row(c=u'10'), Row(c=u'101')]
15.pyspark.sql.functions.bitwiseNOT(col)
不按位计算。
16.pyspark.sql.functions.broadcast(df)
将DataFrame标记为足够小以用于广播连接。
17.pyspark.sql.functions.cbrt(col)
计算给定值的立方根。
18.pyspark.sql.functions.ceil(col)
计算给定值的上限。
19.pyspark.sql.functions.coalesce(*cols)
返回不为空的第一列。
>>> cDf = sqlContext.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b")) >>> cDf.show() +----+----+ | a| b| +----+----+ |null|null| | 1|null| |null| 2| +----+----+
>>> cDf.select(coalesce(cDf["a"], cDf["b"])).show() +-------------+ |coalesce(a,b)| +-------------+ | null| | 1| | 2| +-------------+
注:使用coalesce需先引用,from pyspark.sql.functions import *
>>> cDf.select('*', coalesce(cDf["a"], lit(0.0))).show() +----+----+---------------+ | a| b|coalesce(a,0.0)| +----+----+---------------+ |null|null| 0.0| | 1|null| 1.0| |null| 2| 0.0| +----+----+---------------+
20.pyspark.sql.functions.col(col)
根据给定的列名返回一个列。
21.pyspark.sql.functions.collect_list(col)
聚合函数:返回重复对象的列表。
22.pyspark.sql.functions.collect_set(col)
聚合函数:返回一组消除重复元素的对象。
23.pyspark.sql.functions.column(col)
根据给定的列名返回一个列。
24.pyspark.sql.functions.concat(*cols)[source]
将多个输入字符串列连接成一个字符串列。
>>> df = sqlContext.createDataFrame([('abcd','123')], ['s', 'd']) >>> df.select(concat(df.s, df.d).alias('s')).collect() [Row(s=u'abcd123')]
25.pyspark.sql.functions.concat_ws(sep, *cols)[source]
使用给定的分隔符将多个输入字符串列连接到一个字符串列中。
>>> df = sqlContext.createDataFrame([('abcd','123')], ['s', 'd']) >>> df.select(concat_ws('-', df.s, df.d).alias('s')).collect() [Row(s=u'abcd-123')]
26.pyspark.sql.functions.conv(col, fromBase, toBase)[source]
将字符串列中的数字从一个基数转换为另一个基数。
>>> df = sqlContext.createDataFrame([("010101",)], ['n']) >>> df.select(conv(df.n, 2, 16).alias('hex')).collect() [Row(hex=u'15')]
27.pyspark.sql.functions.corr(col1, col2)
返回col1和col2的皮尔森相关系数的新列。
>>> from pyspark.sql.functions import * >>> a = [x * x - 2 * x + 3.5 for x in range(20)] >>> b = range(20) >>> corrDf = sqlContext.createDataFrame(zip(a, b)) >>> corrDf = corrDf.agg(corr(corrDf._1, corrDf._2).alias('c')) >>> corrDf.selectExpr('abs(c - 0.9572339139475857) < 1e-16 as t').collect() [Row(t=True)]
28.pyspark.sql.functions.cos(col)
计算给定值的余弦。
29.pyspark.sql.functions.cosh(col)
计算给定值的双曲余弦。
30.pyspark.sql.functions.count(col)
聚合函数:返回组中的项数量。
31.pyspark.sql.functions.countDistinct(col, *cols)
返回一列或多列的去重计数的新列。
>>> l=[('Alice',2),('Bob',5)] >>> df = sqlContext.createDataFrame(l,['name','age']) >>> df.agg(countDistinct(df.age, df.name).alias('c')).collect() [Row(c=2)] >>> df.agg(countDistinct("age", "name").alias('c')).collect() [Row(c=2)]
32.pyspark.sql.functions.crc32(col)
计算二进制列的循环冗余校验值(CRC32),并将该值作为bigint返回。
>>> from pyspark.sql.functions import * >>> sqlContext.createDataFrame([('ABC',)], ['a']).select(crc32('a').alias('crc32')).collect() [Row(crc32=2743272264)]
33.pyspark.sql.functions.cumeDist()
窗口函数:.. note ::在1.6中不推荐使用,而是使用cume_dist。
34.pyspark.sql.functions.cume_dist()
窗口函数:返回窗口分区内值的累积分布,即在当前行下面的行的分数。
35.pyspark.sql.functions.current_date()
以日期列的形式返回当前日期。
36.pyspark.sql.functions.current_timestamp()
将当前时间戳作为时间戳列返回。
37.pyspark.sql.functions.date_add(start, days)
返回start后days天的日期
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['d']) >>> df.select(date_add(df.d, 1).alias('d')).collect() [Row(d=datetime.date(2015, 4, 9))]
38.pyspark.sql.functions.date_format(date, format)
将日期/时间戳/字符串转换为由第二个参数给定日期格式指定格式的字符串值。
一个模式可能是例如dd.MM.yyyy,可能会返回一个字符串,如“18 .03.1993”。 可以使用Java类java.text.SimpleDateFormat的所有模式字母。
注意:尽可能使用像年份这样的专业功能。 这些受益于专门的实施。
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a']) >>> df.select(date_format('a', 'MM/dd/yyy').alias('date')).collect() [Row(date=u'04/08/2015')]
39.pyspark.sql.functions.date_sub(start, days)
返回start前days天的日期
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['d']) >>> df.select(date_sub(df.d, 1).alias('d')).collect() [Row(d=datetime.date(2015, 4, 7))]
40.pyspark.sql.functions.datediff(end, start)
返回从start到end的天数。
>>> df = sqlContext.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 'd2']) >>> df.select(datediff(df.d2, df.d1).alias('diff')).collect() [Row(diff=32)]
41.pyspark.sql.functions.dayofmonth(col)
将给定日期的月份的天解压为整数。
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a']) >>> df.select(dayofmonth('a').alias('day')).collect() [Row(day=8)]
42.pyspark.sql.functions.dayofyear(col)
将给定日期的年份中的某一天提取为整数。
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a']) >>> df.select(dayofyear('a').alias('day')).collect() [Row(day=98)]
43.pyspark.sql.functions.decode(col, charset)
使用提供的字符集(“US-ASCII”,“ISO-8859-1”,“UTF-8”,“UTF-16BE”,“UTF-16LE”,“UTF-16”之一)从二进制计算第一个参数到字符串中
44.pyspark.sql.functions.denseRank()
窗口函数:.. note ::在1.6中不推荐使用,而是使用dense_rank。
45.pyspark.sql.functions.dense_rank()
窗口函数:返回窗口分区内的行的等级,没有任何间隙。
rank和denseRank的区别在于,当有关系时,denseRank在排序顺序上没有差距。 也就是说,如果你使用密集排名进行比赛,并且有三个人排在第二位,那么你会说所有三个排在第二位,下一个排在第三位。
46.pyspark.sql.functions.desc(col)
基于给定列名称的降序返回一个排序表达式。
47.pyspark.sql.functions.encode(col, charset)
使用提供的字符集(‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’之一)将第一个参数从字符串计算为二进制
48.pyspark.sql.functions.exp(col)
计算给定值的指数。
49.pyspark.sql.functions.explode(col)
返回给定数组或映射中每个元素的新行。
>>> from pyspark.sql import Row >>> eDF = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})]) >>> eDF.select(explode(eDF.intlist).alias("anInt")).collect() [Row(anInt=1), Row(anInt=2), Row(anInt=3)]
>>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show() +---+-----+ |key|value| +---+-----+ | a| b| +---+-----+
50.pyspark.sql.functions.expm1(col)
计算给定值的指数减1。
51.pyspark.sql.functions.expr(str)
将表达式字符串分析到它表示的列中
>>> l=[('Alice',2),('Bob',5)] >>> df = sqlContext.createDataFrame(l,['name','age']) >>> df.select(expr("length(name)")).collect() [Row(length(name)=5), Row(length(name)=3)]
52.pyspark.sql.functions.factorial(col)
计算给定值的阶乘。
>>> df = sqlContext.createDataFrame([(5,)], ['n']) >>> df.select(factorial(df.n).alias('f')).collect() [Row(f=120)]
53.pyspark.sql.functions.first(col)
聚合函数:返回组中的第一个值。
54.pyspark.sql.functions.floor(col)
计算给定值的最小。
55.pyspark.sql.functions.format_number(col, d)
将数字X格式化为像'#, - #, - #.-'这样的格式,四舍五入到小数点后的位置,并以字符串形式返回结果。
参数:● col – 要格式化的数值的列名称
● d – N小数位
>>> from pyspark.sql.functions import * >>> sqlContext.createDataFrame([(5,)], ['a']).select(format_number('a',4).alias('v')).collect() [Row(v=u'5.0000')]
56.pyspark.sql.functions.format_string(format, *cols)
以printf样式格式化参数,并将结果作为字符串列返回。
参数:● format – 要格式化的格式
● cols - 要格式化的列
p.s.这里官网可能有误,参数与format_number一样了。
>>> from pyspark.sql.functions import * >>> df = sqlContext.createDataFrame([(5, "hello")], ['a', 'b']) >>> df.select(format_string('%d %s', df.a, df.b).alias('v')).collect() [Row(v=u'5 hello')]
57.pyspark.sql.functions.from_unixtime(timestamp, format='yyyy-MM-dd HH:mm:ss')
将来自unix时期(1970-01-01 00:00:00 UTC)的秒数转换为以给定格式表示当前系统时区中该时刻的时间戳的字符串。
58.pyspark.sql.functions.from_utc_timestamp(timestamp, tz)
假设时间戳是UTC,并转换为给定的时区
>>> df = sqlContext.createDataFrame([('1997-02-28 10:30:00',)], ['t']) >>> df.select(from_utc_timestamp(df.t, "PST").alias('t')).collect() [Row(t=datetime.datetime(1997, 2, 28, 2, 30))]
59.pyspark.sql.functions.get_json_object(col, path)
从基于指定的json路径的json字符串中提取json对象,并返回提取的json对象的json字符串。 如果输入的json字符串无效,它将返回null。
参数:● col – json格式的字符串列
● path – 提取json对象的路径
>>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')] >>> df = sqlContext.createDataFrame(data, ("key", "jstring")) >>> df.select(df.key, get_json_object(df.jstring, '$.f1').alias("c0"),get_json_object(df.jstring, '$.f2').alias("c1") ).collect() [Row(key=u'1', c0=u'value1', c1=u'value2'), Row(key=u'2', c0=u'value12', c1=None)]
60.pyspark.sql.functions.greatest(*cols)
返回列名称列表的最大值,跳过空值。 该功能至少需要2个参数。 如果所有参数都为空,它将返回null
>>> df = sqlContext.createDataFrame([(1, 4, 3)], ['a', 'b', 'c']) >>> df.select(greatest(df.a, df.b, df.c).alias("greatest")).collect() [Row(greatest=4)]
61.pyspark.sql.functions.hex(col)
计算给定列的十六进制值,可以是StringType,BinaryType,IntegerType或LongType
>>> sqlContext.createDataFrame([('ABC', 3)], ['a', 'b']).select(hex('a'), hex('b')).collect() [Row(hex(a)=u'414243', hex(b)=u'3')]
62.pyspark.sql.functions.hour(col)
将给定日期的小时数提取为整数。
>>> df = sqlContext.createDataFrame([('2015-04-08 13:08:15',)], ['a']) >>> df.select(hour('a').alias('hour')).collect() [Row(hour=13)]
63.pyspark.sql.functions.hypot(col1, col2)
计算sqrt(a ^ 2 ^ + b ^ 2 ^),无中间上溢或下溢。
64.pyspark.sql.functions.initcap(col)
在句子中将每个单词的第一个字母翻译成大写。
>>> sqlContext.createDataFrame([('ab cd',)], ['a']).select(initcap("a").alias('v')).collect() [Row(v=u'Ab Cd')]
65.pyspark.sql.functions.input_file_name()
为当前Spark任务的文件名创建一个字符串列。
66.pyspark.sql.functions.instr(str, substr)
找到给定字符串中第一次出现substr列的位置。 如果其中任一参数为null,则返回null。
注:位置不是从零开始的,但是基于1的索引,如果在str中找不到substr,则返回0。
>>> df = sqlContext.createDataFrame([('abcd',)], ['s',]) >>> df.select(instr(df.s, 'b').alias('s')).collect() [Row(s=2)]
67.pyspark.sql.functions.isnan(col)
如果列是NaN,则返回true的表达式。
>>> df = sqlContext.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b")) >>> df.select(isnan("a").alias("r1"), isnan(df.a).alias("r2")).collect() [Row(r1=False, r2=False), Row(r1=True, r2=True)]
68.pyspark.sql.functions.isnull(col)
如果列为null,则返回true的表达式
>>> df = sqlContext.createDataFrame([(1, None), (None, 2)], ("a", "b")) >>> df.select(isnull("a").alias("r1"), isnull(df.a).alias("r2")).collect() [Row(r1=False, r2=False), Row(r1=True, r2=True)]
69.pyspark.sql.functions.json_tuple(col, *fields)
根据给定的字段名称为json列创建一个新行。
参数:● col – json格式的字符串列
● fields – 要提取的字段列表
>>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')] >>> df = sqlContext.createDataFrame(data, ("key", "jstring")) >>> df.select(df.key, json_tuple(df.jstring, 'f1', 'f2')).collect() [Row(key=u'1', c0=u'value1', c1=u'value2'), Row(key=u'2', c0=u'value12', c1=None)]
70.pyspark.sql.functions.kurtosis(col)
聚合函数:返回组中的值的峰度。
71.pyspark.sql.functions.lag(col, count=1, default=None)
窗口函数:返回当前行之前偏移行的值;如果当前行之前的行数小于偏移量,则返回defaultValue。 例如,一个偏移量将返回窗口分区中任何给定点的前一行。
这相当于SQL中的LAG函数。
参数:● col – 列或表达式的名称
● count – 要延伸的行数
● default – 默认值
72.pyspark.sql.functions.last(col)
聚合函数:返回组中的最后一个值。
73.pyspark.sql.functions.last_day(date)
返回给定日期所属月份的最后一天。
>>> df = sqlContext.createDataFrame([('1997-02-10',)], ['d']) >>> df.select(last_day(df.d).alias('date')).collect() [Row(date=datetime.date(1997, 2, 28))]
74.pyspark.sql.functions.lead(col, count=1, default=None)
Window函数:返回当前行之后偏移行的值;如果当前行之后的行数小于偏移行,则返回defaultValue。 例如,一个偏移量将返回窗口分区中任意给定点的下一行。
这相当于SQL中的LEAD函数。
参数:● col – 列或表达式的名称
● count – 要延伸的行数
● default – 默认值
75.pyspark.sql.functions.least(*cols)
返回列名称列表的最小值,跳过空值。 该功能至少需要2个参数。 如果所有参数都为空,它将返回null
>>> df = sqlContext.createDataFrame([(1, 4, 3)], ['a', 'b', 'c']) >>> df.select(least(df.a, df.b, df.c).alias("least")).collect() [Row(least=1)]
76.pyspark.sql.functions.length(col)
计算字符串或二进制表达式的长度
>>> sqlContext.createDataFrame([('ABC',)], ['a']).select(length('a').alias('length')).collect() [Row(length=3)]
77.pyspark.sql.functions.levenshtein(left, right)
计算两个给定字符串的Levenshtein距离。
>>> from pyspark.sql.functions import * >>> df0 = sqlContext.createDataFrame([('kitten', 'sitting',)], ['l', 'r']) >>> df0.select(levenshtein('l', 'r').alias('d')).collect() [Row(d=3)]
78.pyspark.sql.functions.lit(col)
创建一个文字值的列
79.pyspark.sql.functions.locate(substr, str, pos=0)
找到第一个出现的位置在位置pos后面的字符串列中。
注:位置不是从零开始,而是从1开始。 如果在str中找不到substr,则返回0。
参数: substr – 一个字符串
str – 一个StringType的列
pos – 起始位置(基于零)
>>> df = sqlContext.createDataFrame([('abcd',)], ['s',]) >>> df.select(locate('b', df.s, 1).alias('s')).collect() [Row(s=2)]
80.pyspark.sql.functions.log(arg1, arg2=None)
返回第二个参数的第一个基于参数的对数。
如果只有一个参数,那么这个参数就是自然对数。
>>> df.select(log(10.0, df.age).alias('ten')).map(lambda l: str(l.ten)[:7]).collect() ['0.30102', '0.69897'] >>> df.select(log(df.age).alias('e')).map(lambda l: str(l.e)[:7]).collect() ['0.69314', '1.60943']
81.pyspark.sql.functions.log10(col)
计算Base 10中给定值的对数。
82.pyspark.sql.functions.log1p(col)
计算给定值的自然对数加1。
83.pyspark.sql.functions.log2(col)
返回参数的基数为2的对数。
>>> sqlContext.createDataFrame([(4,)], ['a']).select(log2('a').alias('log2')).collect()
[Row(log2=2.0)]
84.pyspark.sql.functions.lower(col)
将字符串列转换为小写。
85.pyspark.sql.functions.lpad(col, len, pad)
用pad填充字符串列的宽度len
>>> df = sqlContext.createDataFrame([('abcd',)], ['s',]) >>> df.select(lpad(df.s, 6, '#').alias('s')).collect() [Row(s=u'##abcd')]
86.pyspark.sql.functions.ltrim(col)
从左端修剪指定字符串值的空格。
87.pyspark.sql.functions.max(col)
聚合函数:返回组中表达式的最大值。
88.pyspark.sql.functions.md5(col)
计算MD5摘要并以32个字符的十六进制字符串的形式返回值。
>>> sqlContext.createDataFrame([('ABC',)], ['a']).select(md5('a').alias('hash')).collect()
[Row(hash=u'902fbdd2b1df0c4f70b4a5d23525e932')]
89.pyspark.sql.functions.mean(col)
聚合函数:返回组中的值的平均值
90.pyspark.sql.functions.min(col)
聚合函数:返回组中表达式的最小值。
91.pyspark.sql.functions.minute(col)
提取给定日期的分钟数为整数
>>> df = sqlContext.createDataFrame([('2015-04-08 13:08:15',)], ['a']) >>> df.select(minute('a').alias('minute')).collect() [Row(minute=8)]
92.pyspark.sql.functions.monotonicallyIncreasingId()
注意在1.6中不推荐使用monotonically_increasing_id
93.pyspark.sql.functions.monotonically_increasing_id()
生成单调递增的64位整数的列。
生成的ID保证是单调递增和唯一的,但不是连续的。 当前的实现将分区ID放在高31位,并将每个分区内的记录号放在低33位。 假设
数据帧的分区少于10亿个,每个分区少于80亿条记录
例如,考虑一个DataFrame有两个分区,每个分区有三个记录。 该表达式将返回以下ID:0,1,2,8589934592(1L << 33),
8589934593,8589934594
>>> df0 = sc.parallelize(range(2), 2).mapPartitions(lambda x: [(1,), (2,), (3,)]).toDF(['col1']) >>> df0.select(monotonically_increasing_id().alias('id')).collect() [Row(id=0), Row(id=1), Row(id=2), Row(id=8589934592), Row(id=8589934593), Row(id=8589934594)]
94.pyspark.sql.functions.month(col)
将给定日期的月份提取为整数
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a']) >>> df.select(month('a').alias('month')).collect() [Row(month=4)]
95.pyspark.sql.functions.months_between(date1, date2)
返回date1和date2之间的月数。
>>> df = sqlContext.createDataFrame([('1997-02-28 10:30:00', '1996-10-30')], ['t', 'd']) >>> df.select(months_between(df.t, df.d).alias('months')).collect() [Row(months=3.9495967...)]
96.pyspark.sql.functions.nanvl(col1, col2)
如果不是NaN,则返回col1;如果col1是NaN,则返回col2
两个输入都应该是浮点列(DoubleType或FloatType)
>>> df = sqlContext.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b")) >>> df.select(nanvl("a", "b").alias("r1"), nanvl(df.a, df.b).alias("r2")).collect() [Row(r1=1.0, r2=1.0), Row(r1=2.0, r2=2.0)]
97.pyspark.sql.functions.next_day(date, dayOfWeek)
返回晚于日期列值的第一个日期
星期几参数不区分大小写,并接受:“Mon”, “Tue”, “Wed”, “Thu”, “Fri”, “Sat”, “Sun”.
>>> df = sqlContext.createDataFrame([('2015-07-27',)], ['d']) >>> df.select(next_day(df.d, 'Sun').alias('date')).collect() [Row(date=datetime.date(2015, 8, 2))]
98.pyspark.sql.functions.ntile(n)
窗口函数:在有序的窗口分区中返回ntile组ID(从1到n)。 例如,如果n是4,则第一季度行将得到值1,第二季度将得到2,第三季
度将得到3,并且最后一个季度将得到4。
这相当于SQL中的NTILE函数。
99.pyspark.sql.functions.percentRank()
窗口函数:.. note ::在1.6中不推荐使用,而是使用percent_rank
100.pyspark.sql.functions.percent_rank()
窗口函数:返回窗口分区内行的相对等级(即百分位数)
101.pyspark.sql.functions.pow(col1, col2)
返回引发第二个参数的第一个参数的值。
102.pyspark.sql.functions.quarter(col)
提取给定日期的四分之一整数
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a']) >>> df.select(quarter('a').alias('quarter')).collect() [Row(quarter=2)]
103.pyspark.sql.functions.rand(seed=None)
用i.i.d生成一个随机列 来自U的样本[0.0,1.0]。
104.pyspark.sql.functions.randn(seed=None)
用i.i.d生成一列 来自标准正态分布的样本。
105.pyspark.sql.functions.rank()
窗口函数:返回窗口分区内的行的等级
rank和denseRank的区别在于,当有关系时,denseRank在排序顺序上没有差距。 也就是说,如果你使用密集排名进行比赛,并且有
三个人排在第二位,那么你会说所有三个排在第二位,下一个排在第三位。
这相当于SQL中的RANK函数。
106.pyspark.sql.functions.regexp_extract(str, pattern, idx)
从指定的字符串列中提取由java正则表达式标识的特定(idx)组
>>> df = sqlContext.createDataFrame([('100-200',)], ['str']) >>> df.select(regexp_extract('str', '(d+)-(d+)', 1).alias('d')).collect() [Row(d=u'100')]
107.pyspark.sql.functions.regexp_replace(str, pattern, replacement)
将与regexp匹配的指定字符串值的所有子字符串替换为rep
>>> df = sqlContext.createDataFrame([('100-200',)], ['str']) >>> df.select(regexp_replace('str', '(d+)', '--').alias('d')).collect() [Row(d=u'-----')]
108.pyspark.sql.functions.repeat(col, n)
重复一个字符串列n次,并将其作为新的字符串列返回
>>> df = sqlContext.createDataFrame([('ab',)], ['s',]) >>> df.select(repeat(df.s, 3).alias('s')).collect() [Row(s=u'ababab')]
109.pyspark.sql.functions.reverse(col)
反转字符串列并将其作为新的字符串列返回
110.pyspark.sql.functions.rint(col)
返回值最接近参数的double值,等于一个数学整数。
111.pyspark.sql.functions.round(col, scale=0)
如果scale> = 0,将e的值舍入为小数点的位数,或者在scale <0的时候将其舍入到整数部分。
>>> sqlContext.createDataFrame([(2.546,)], ['a']).select(round('a', 1).alias('r')).collect()
[Row(r=2.5)]
112.pyspark.sql.functions.rowNumber()
窗口函数:.. note:1.6中不推荐使用,而是使用row_number
113.pyspark.sql.functions.row_number()
窗口函数:返回窗口分区内从1开始的连续编号。
114.pyspark.sql.functions.rpad(col, len, pad)
右键将字符串列填充到宽度为len的pad
>>> df = sqlContext.createDataFrame([('abcd',)], ['s',]) >>> df.select(rpad(df.s, 6, '#').alias('s')).collect() [Row(s=u'abcd##')]
115.pyspark.sql.functions.rtrim(col)
从右端修剪指定字符串值的空格
116.pyspark.sql.functions.second(col)
将给定日期的秒数提取为整数
>>> df = sqlContext.createDataFrame([('2015-04-08 13:08:15',)], ['a']) >>> df.select(second('a').alias('second')).collect() [Row(second=15)]
117.pyspark.sql.functions.sha1(col)
返回SHA-1的十六进制字符串结果
>>> sqlContext.createDataFrame([('ABC',)], ['a']).select(sha1('a').alias('hash')).collect()
[Row(hash=u'3c01bdbb26f358bab27f267924aa2c9a03fcfdb8')]
118.pyspark.sql.functions.sha2(col, numBits)
返回SHA-2系列散列函数(SHA-224,SHA-256,SHA-384和SHA-512)的十六进制字符串结果。 numBits表示结果的所需位长度,其值
必须为224,256,384,512或0(相当于256)
>>> digests = df.select(sha2(df.name, 256).alias('s')).collect() >>> digests[0] Row(s=u'3bc51062973c458d5a6f2d8d64a023246354ad7e064b1e4e009ec8a0699a3043') >>> digests[1] Row(s=u'cd9fb1e148ccd8442e5aa74904cc73bf6fb54d1d54d333bd596aa9bb4bb4e961')
119.pyspark.sql.functions.shiftLeft(col, numBits)
移动给定值numBits左侧
>>> sqlContext.createDataFrame([(21,)], ['a']).select(shiftLeft('a', 1).alias('r')).collect()
[Row(r=42)]
120.pyspark.sql.functions.shiftRight(col, numBits)
将给定值numBits右移
>>> sqlContext.createDataFrame([(42,)], ['a']).select(shiftRight('a', 1).alias('r')).collect()
[Row(r=21)]
121.pyspark.sql.functions.shiftRightUnsigned(col, numBits)
无符号移位给定值numBits的权利
>>> df = sqlContext.createDataFrame([(-42,)], ['a']) >>> df.select(shiftRightUnsigned('a', 1).alias('r')).collect() [Row(r=9223372036854775787)]
122.pyspark.sql.functions.signum(col)
计算给定值的符号
123.pyspark.sql.functions.sin(col)
计算给定值的正弦值
124.pyspark.sql.functions.sinh(col)
计算给定值的双曲正弦值
125.pyspark.sql.functions.size(col)
集合函数:返回存储在列中的数组或映射的长度
参数:col – 列或表达式名称
>>> df = sqlContext.createDataFrame([([1, 2, 3],),([1],),([],)], ['data']) >>> df.select(size(df.data)).collect() [Row(size(data)=3), Row(size(data)=1), Row(size(data)=0)]
126.pyspark.sql.functions.skewness(col)
聚合函数:返回组中值的偏度
127.pyspark.sql.functions.sort_array(col, asc=True)
集合函数:按升序对给定列的输入数组进行排序。
参数:col – 列或表达式名称
>>> df = sqlContext.createDataFrame([([2, 1, 3],),([1],),([],)], ['data']) >>> df.select(sort_array(df.data).alias('r')).collect() [Row(r=[1, 2, 3]), Row(r=[1]), Row(r=[])] >>> df.select(sort_array(df.data, asc=False).alias('r')).collect() [Row(r=[3, 2, 1]), Row(r=[1]), Row(r=[])]
128.pyspark.sql.functions.soundex(col)
返回字符串的SoundEx编码
>>> df = sqlContext.createDataFrame([("Peters",),("Uhrbach",)], ['name']) >>> df.select(soundex(df.name).alias("soundex")).collect() [Row(soundex=u'P362'), Row(soundex=u'U612')]
129.pyspark.sql.functions.sparkPartitionId()
注意在1.6中不推荐使用spark_partition_id。
130.pyspark.sql.functions.spark_partition_id()
Spark任务的分区ID列
请注意,这是不确定的,因为它取决于数据分区和任务调度
>>> df.repartition(1).select(spark_partition_id().alias("pid")).collect()
[Row(pid=0), Row(pid=0)]
131.pyspark.sql.functions.split(str, pattern)
将模式分割(模式是正则表达式)。
注:pattern是一个字符串表示正则表达式。
>>> df = sqlContext.createDataFrame([('ab12cd',)], ['s',]) >>> df.select(split(df.s, '[0-9]+').alias('s')).collect() [Row(s=[u'ab', u'cd'])]
132.pyspark.sql.functions.sqrt(col)
计算指定浮点值的平方根
133.pyspark.sql.functions.stddev(col)
聚合函数:返回组中表达式的无偏样本标准差
134.pyspark.sql.functions.stddev_pop(col)
聚合函数:返回一个组中表达式的总体标准差
135.pyspark.sql.functions.stddev_samp(col)
聚合函数:返回组中表达式的无偏样本标准差
136.pyspark.sql.functions.struct(*cols)
创建一个新的结构列。
列:cols – 列名称(字符串)列表或列表达式列表
>>> df.select(struct('age', 'name').alias("struct")).collect() [Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))] >>> df.select(struct([df.age, df.name]).alias("struct")).collect() [Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
137.pyspark.sql.functions.substring(str, pos, len)
子字符串从pos开始,长度为len,当str是字符串类型时,或者返回从字节pos开始的字节数组的片段,当str是二进制类型时,长度
为len
>>> df = sqlContext.createDataFrame([('abcd',)], ['s',]) >>> df.select(substring(df.s, 1, 2).alias('s')).collect() [Row(s=u'ab')]
138.pyspark.sql.functions.substring_index(str, delim, count)
在计数定界符delimiter之前,返回字符串str的子串。 如果count是正数,则返回最后一个分隔符左边的数字(从左数起)。 如果
计数为负数,则返回最后一个分隔符右边的数字(从右数起)。 substring_index搜索delim时执行区分大小写的匹配
>>> df = sqlContext.createDataFrame([('a.b.c.d',)], ['s']) >>> df.select(substring_index(df.s, '.', 2).alias('s')).collect() [Row(s=u'a.b')] >>> df.select(substring_index(df.s, '.', -3).alias('s')).collect() [Row(s=u'b.c.d')]
139.pyspark.sql.functions.sum(col)
聚合函数:返回表达式中所有值的总和。
140.pyspark.sql.functions.sumDistinct(col)
聚合函数:返回表达式中不同值的总和
141.pyspark.sql.functions.tan(col)
计算给定值的正切值
142.pyspark.sql.functions.tanh(col)
计算给定值的双曲正切
143.pyspark.sql.functions.toDegrees(col)
将以弧度度量的角度转换为以度数度量的近似等效角度。
144.pyspark.sql.functions.toRadians(col)
将以度数度量的角度转换为以弧度测量的近似等效角度
145.pyspark.sql.functions.to_date(col)
将StringType或TimestampType的列转换为DateType
>>> df = sqlContext.createDataFrame([('1997-02-28 10:30:00',)], ['t']) >>> df.select(to_date(df.t).alias('date')).collect() [Row(date=datetime.date(1997, 2, 28))]
146.pyspark.sql.functions.to_utc_timestamp(timestamp, tz)
假定给定的时间戳在给定的时区并转换为UTC
>>> df = sqlContext.createDataFrame([('1997-02-28 10:30:00',)], ['t']) >>> df.select(to_utc_timestamp(df.t, "PST").alias('t')).collect() [Row(t=datetime.datetime(1997, 2, 28, 18, 30))]
147.pyspark.sql.functions.translate(srcCol, matching, replace)
一个函数通过匹配的字符转换srcCol中的任何字符。 替换中的字符对应于匹配的字符。当字符串中的任何字符与匹配中的字符匹配
时,翻译将发生
>>> sqlContext.createDataFrame([('translate',)], ['a']).select(translate('a', "rnlt", "123").alias('r')).collect()
[Row(r=u'1a2s3ae')]
148.pyspark.sql.functions.trim(col)
修剪指定字符串列的两端空格。
149.pyspark.sql.functions.trunc(date, format)
返回截断到格式指定单位的日期
参数: format – ‘year’, ‘YYYY’, ‘yy’ or ‘month’, ‘mon’, ‘mm’
>>> df = sqlContext.createDataFrame([('1997-02-28',)], ['d']) >>> df.select(trunc(df.d, 'year').alias('year')).collect() [Row(year=datetime.date(1997, 1, 1))] >>> df.select(trunc(df.d, 'mon').alias('month')).collect() [Row(month=datetime.date(1997, 2, 1))]
150.pyspark.sql.functions.udf(f, returnType=StringType)
创建一个表示用户定义函数(UDF)的列表达式。
>>> from pyspark.sql.types import IntegerType >>> slen = udf(lambda s: len(s), IntegerType()) >>> df.select(slen(df.name).alias('slen')).collect() [Row(slen=5), Row(slen=3)]
151.pyspark.sql.functions.unbase64(col)
解码BASE64编码的字符串列并将其作为二进制列返回
152.pyspark.sql.functions.unhex
十六进制的反转。 将每对字符解释为十六进制数字,并转换为数字的字节表示形式
>>> sqlContext.createDataFrame([('414243',)], ['a']).select(unhex('a')).collect()
[Row(unhex(a)=bytearray(b'ABC'))]
153.pyspark.sql.functions.unix_timestamp(timestamp=None, format='yyyy-MM-dd HH:mm:ss')
使用默认时区和默认语言环境,将具有给定模式的时间字符串(默认为'yyyy-MM-dd HH:mm:ss')转换为Unix时间戳(以秒为单位
),如果失败则返回null。
如果时间戳记为“无”,则返回当前时间戳。
154.pyspark.sql.functions.upper(col)
将字符串列转换为大写
155.pyspark.sql.functions.var_pop(col)
聚合函数:返回组中值的总体方差
156.pyspark.sql.functions.var_samp(col)
聚合函数:返回组中值的无偏差
157.pyspark.sql.functions.variance(col)
聚合函数:返回组中值的总体方差
158.pyspark.sql.functions.weekofyear(col)
将一个给定日期的星期数解压为整数。
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a']) >>> df.select(weekofyear(df.a).alias('week')).collect() [Row(week=15)]
159.pyspark.sql.functions.when(condition, value)
评估条件列表并返回多个可能的结果表达式之一。 如果不调用Column.otherwise(),则不匹配条件返回None
参数:condition – 一个布尔的列表达式.
value – 一个文字值或一个Column表达式
>>> df.select(when(df['age'] == 2, 3).otherwise(4).alias("age")).collect()
[Row(age=3), Row(age=4)]
>>> df.select(when(df.age == 2, df.age + 1).alias("age")).collect()
[Row(age=3), Row(age=None)]
160.pyspark.sql.functions.year(col)
将给定日期的年份提取为整数
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a']) >>> df.select(year('a').alias('year')).collect() [Row(year=2015)]