Spark Python API Docs(part two) - Go语言中文社区

Spark Python API Docs(part two)


pyspark.sql module

Module context

Spark SQL和DataFrames中的重要类:

  • pyspark.sql.SparkSession - DataFrame和SQL功能的主要入口点。
  • pyspark.sql.DataFrame - 分布式数据集合分组到命名的列。
  • pyspark.sql.Column - DataFrame中的列表达式。
  • pyspark.sql.Row - DataFrame中的一行数据。
  • pyspark.sql.GroupedData - 由DataFrame.groupBy()返回的聚合方法。
  • pyspark.sql.DataFrameNaFunctions - 处理缺失数据的方法(空值)。
  • pyspark.sql.DataFrameStatFunctions - 统计功能的方法。
  • pyspark.sql.functions - 可用于DataFrame的内置函数列表。
  • pyspark.sql.types - 可用的数据类型列表。
  • pyspark.sql.Window - 用于处理窗口函数。

class pyspark.sql.SparkSession(sparkContext, jsparkSession=None)

使用Dataset和DataFrame API编程Spark的入口点。
SparkSession可用于创建DataFrame,将DataFrame注册为表格,在表格上执行SQL,缓存表格以及读取parquet文件。 要创建SparkSession,请使用以下构建器模式:

spark = SparkSession.builder.master('spark://cn01:7077').appName("Word Count").getOrCreate()

class Builder

SparkSession的生成器。

  1. appName(name)
    为应用程序设置一个名称,该名称将显示在Spark Web UI中。
    如果没有设置应用程序名称,则会随机生成名称。
  2. config(key=None, value=None, conf=None)
    设置一个配置选项。 使用此方法设置的选项会自动传播到SparkConf和SparkSession自己的配置中。
    对于现有的SparkConf,请使用conf参数。
>>> from pyspark import SparkConf
>>> SparkSession.builder.config(conf=SparkConf())
<pyspark.sql.session.Builder object at 0x2ab7d2ab7650>
  1. enableHiveSupport()
    启用Hive支持,包括连接到持久化的Hive Metastore,支持Hive serdes和Hive用户定义的功能。
  2. getOrCreate()
    获取现有的SparkSession,或者,如果没有现有的SparkSession,则根据此构建器中设置的选项创建一个新的SparkSession。
    此方法首先检查是否存在有效的全局默认SparkSession,如果是,则返回该值。 如果不存在有效的全局默认SparkSession,则该方法创建一个新的SparkSession,并将新创建的SparkSession指定为全局默认值。
  3. master(master)
    设置要连接到的Spark master URL,例如本地运行的“local”,本地运行4核的“local [4]”或运行在Spark独立群集上的“spark:// master:7077”。
  1. SparkSession.builder = <pyspark.sql.session.Builder object at 0x7f51f134a110>

  2. SparkSession.catalog
    用户可以通过它创建,删除,修改或查询底层数据库,表格,函数等的接口

  3. SparkSession.conf
    Spark的运行时配置接口。
    这是用户可以获取并设置与Spark SQL相关的所有Spark和Hadoop配置的接口。 获取配置的值时,默认为基础SparkContext中设置的值(如果有)。

  4. SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
    从RDD,列表或pandas.DataFrame创建一个DataFrame。
    当schema是列名称的列表时,每列的类型将从数据中推断出来。
    当schema为None时,它将尝试从数据中推断出schema(列名和类型),数据应该是Row的RDD,或者是namedtuple或者dict。
    当schema是pyspark.sql.types.DataType或数据类型字符串时,它必须匹配真实数据,否则将在运行时引发异常。 如果给定的schema不是pyspark.sql.types.StructType,它将被封装成一个pyspark.sql.types.StructType作为唯一的字段,字段名称将是“值”,每个记录也将被包装成一个 元组,可以稍后转换为行。
    如果需要(schema)模式推断,则使用samplingRatio来确定用于模式推断的行的比例。 如果samplingRatio为None,则使用第一行。
    Parameters:

  • data - 任何类型的SQL数据表示d的RDD(例如行,元组,int,布尔等)或列表或pandas.DataFrame。
  • schema - 一个pyspark.sql.types.DataType或一个数据类型字符串或列名称列表,默认值为None。 数据类型字符串格式等于pyspark.sql.types.DataType.simpleString,除了顶层结构类型可以省略struct <>,原子类型使用typeName()作为它们的格式。 使用字节而不是tinyint为pyspark.sql.types.ByteType。 我们也可以使用int作为IntegerType的简称。
  • samplingRatio - 用于推断行的样本比例。
  • verifySchema - 根据模式验证每一行的数据类型。
    Returns: DataFrame
# list to DataFrame
>>> l = [("name", 1), ("Bob", 2)]
>>> spark.createDataFrame(l, ["name", "age"]).collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
>>> d = [{'name': 'Alice', 'age': 1}, {"name": "Bob", "age": 2}]
>>> spark.createDataFrame(d, ["name", "age"]).collect()
[Row(name=1, age=u'Alice'), Row(name=2, age=u'Bob')]
#RDD to DataFrame
>>> spark.createDataFrame(sc.parallelize(l), ["name", "age"]).collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
#pandas.DataFrame to DataFrame
>>> spark.createDataFrame(pandas.DataFrame([[1, 2]])).collect()  
[Row(0=1, 1=2)]
>>> from pyspark.sql.types import *
>>> schema = StructType([StructField("name", StringType(), True),StructField("age", IntegerType(), True)])
>>> spark.createDataFrame(l, schema).collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
>>> spark.createDataFrame(l, "name: string, age: int").collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
>>> from pyspark.sql import Row
>>> Person = Row('name', 'age')
>>> spark.createDataFrame(l, Person).collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
  1. SparkSession.newSession()
    以新会话形式返回一个新的SparkSession,它具有单独的SQLConf,注册的临时视图和UDF,但共享SparkContext和表缓存。
  2. SparkSession.range(start, end=None, step=1, numPartitions=None)
>>> spark.range(1,7,2).collect()
[Row(id=1), Row(id=3), Row(id=5)]
  1. SparkSession.read
    返回可用于读取DataFrame中的数据的DataFrameReader。
  2. SparkSession.readStream
    返回一个DataStreamReader,它可以用来读取数据流作为一个数据流DataFrame。
  3. SparkSession.sparkContext
    返回底层的SparkContext。
  4. SparkSession.sql(sqlQuery)
    返回表示给定查询结果的DataFrame。
>>> l
[('name', 1), ('Bob', 2)]
>>> df = spark.createDataFrame(l, ["name", "age"])
#使用dataFrame(df)创建或替换本地临时视图。
>>> df.createOrReplaceTempView("table1")
>>> spark.sql("select * from table1").collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
  1. SparkSession.stop()
    停止底层的SparkContext。
  2. SparkSession.streams
    返回一个StreamingQueryManager,它允许管理所有的StreamingQuery ,在此上下文中激活的StreamingQueries。
  3. SparkSession.table(tableName)
    以DataFrame的形式返回指定的表。
>>> spark.table("table1").collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
  1. SparkSession.udf
    返回UDF注册的UDFRegistration。
  2. SparkSession.version
    运行此应用程序的Spark版本。

class pyspark.sql.SQLContext(sparkContext, sparkSession=None, jsqlContext=None)

在Spark 1.x中使用Spark中结构化数据(行和列)的入口点。
从Spark 2.0开始,它被SparkSession所取代。 但是,为了向后兼容,我们在这里保留这个类。
可以使用SQLContext创建DataFrame,将DataFrame注册为表,在表上执行SQL,缓存表和读取parquet文件。

  1. cacheTable(tableName)
    在内存中缓存指定的表。
  2. clearCache()
    从内存缓存中删除所有缓存的表。
  3. createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
  4. createExternalTable(tableName, path=None, source=None, schema=None, **options)
    根据数据源中的数据集创建外部表。
  5. dropTempTable(tableName)
    从目录中删除临时表。
  6. getConf(key, defaultValue=None)
    返回Spark SQL配置属性中给定键的值。
  7. classmethod getOrCreate(sc)
    获取现有的SQLContext或使用给定的SparkContext创建一个新的SQLContext。
  8. newSession()
    将新的SQLContext作为新会话返回,它具有单独的SQLConf,注册的临时视图和UDF,但是共享SparkContext和表缓存。
  9. range(start, end=None, step=1, numPartitions=None)
  10. read
  11. readStream
  12. registerDataFrameAsTable(df, tableName)
    将给定的DataFrame注册为目录中的临时表。
    临时表仅在此SQLContext实例的生命周期中存在。
  13. registerFunction(name, f, returnType=StringType)
    将一个python函数(包括lambda函数)注册为UDF(自定义函数),以便在SQL语句中使用。
    除了名称和函数本身之外,还可以指定返回类型。 当返回类型没有给出它默认为一个字符串和转换将自动完成。 对于任何其他返回类型,生成的对象必须匹配指定的类型。
    Parameters:
  • name - udf的名字
  • f - python 函数
  • returnType - 一个pyspark.sql.types.DataType对象
>>> from pyspark.sql.types import IntegerType
>>> sqlContext.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
[Row(stringLengthInt(test)=4)]
  1. registerJavaFunction(name, javaClassName, returnType=None)
    注册一个Java UDF,以便在SQL语句中使用它。
  2. setConf(key, value)
    设置给定的Spark SQL配置属性。
  3. sql(sqlQuery)
  4. streams
  5. table(tableName)
  6. tableNames(dbName=None)
    返回数据库dbName中表的名称列表。
  7. tables(dbName=None)
    返回包含给定数据库中表的名称的DataFrame。
    如果未指定dbName,则将使用当前数据库。
  8. udf
  9. uncacheTable(tableName)
    从内存缓存中删除指定的表。

class pyspark.sql.HiveContext(sparkContext, jhiveContext=None)

Spark SQL的一个变体,与存储在Hive中的数据整合在一起。
Hive配置是从classpath的hive-site.xml中读取的。 它支持同时运行SQL和HiveQL命令。

  1. refreshTable(tableName)

class pyspark.sql.UDFRegistration(sqlContext)

用户自定义函数注册的包装器。

  1. register(name, f, returnType=StringType)
    将一个python函数(包括lambda函数)注册为UDF,以便在SQL语句中使用。

class pyspark.sql.DataFrame(jdf, sql_ctx)

分布式数据集合分组到命名的列。
DataFrame相当于Spark SQL中的关系表,可以使用SQLContext中的各种函数创建:
创建后,可以使用DataFrame,Column中定义的各种domain-specific-language(DSL)函数对其进行操作。

  1. agg(*exprs)
    在没有组的情况下汇总整个DataFrame(df.groupBy.agg()的简写)。
>>> df.agg({"age": "max"}).collect()
[Row(max(age)=2)]
>>> from pyspark.sql import functions as f
>>> df.agg(f.min(df.age)).collect()
[Row(min(age)=1)]
  1. alias(alias)
    返回一个带有别名集的新DataFrame。
  2. approxQuantile(col, probabilities, relativeError)
    计算DataFrame的数值列的近似分位数。
  3. cache()
    使用默认存储级别(MEMORY_AND_DISK)存储DataFrame。
  4. checkpoint(eager=True)
    返回此数据集的检查点版本。 检查点可用于截断此DataFrame的逻辑计划,这在计划可能呈指数增长的迭代算法中特别有用。 它将被保存到使用SparkContext.setCheckpointDir()设置的检查点目录内的文件中。
  5. coalesce(numPartitions)
    返回具有完全numPartitions分区的新DataFrame。
  6. collect()
  7. columns
    以列表形式返回所有列名称。
>>> df.columns
['name', 'age']
  1. corr(col1, col2, method=None)
    以双精度值计算DataFrame的两列的相关性。 目前只支持Pearson Correlation Coefficient。 DataFrame.corr()和DataFrameStatFunctions.corr()是彼此的别名。
    Parameters:
  • col1 - 第一列的名称
  • col2 - 第二列的名称
  • method - 相关方法。 目前只支持“皮尔森”
  1. count()
    返回此DataFrame中的行数。
>>> df.count()
2
  1. cov(col1, col2)
    计算给定列的样本协方差(由它们的名称指定)作为双精度值。 DataFrame.cov()和DataFrameStatFunctions.cov()是别名。
  2. createGlobalTempView(name)
    使用此DataFrame创建全局临时视图。
    这个临时视图的生命周期与这个Spark应用程序有关。 如果视图名称已经存在于目录中,则抛出TempTableAlreadyExistsException。
  3. createOrReplaceGlobalTempView(name)
    使用给定名称创建或替换全局临时视图。
    这个临时视图的生命周期与这个Spark应用程序有关。
  4. createOrReplaceTempView(name)
    使用此DataFrame创建或替换本地临时视图。
    此临时表的生命周期与用于创建此DataFrame的SparkSession绑定。
  5. createTempView(name)
    使用此DataFrame创建本地临时视图。
    此临时表的生命周期与用于创建此DataFrame的SparkSession绑定。如果视图名称已经存在于目录中,抛出TempTableAlreadyExistsException。
  6. crossJoin(other)
    用另一个DataFrame相互作用返回笛卡尔积。
>>> df1 = spark.createDataFrame([("Alice", 1), ("Bob", 5)], ["name", "age"])
>>> df2 = spark.createDataFrame([("Alice", 66), ("Bob", 88)], ["name", "height"])
>>> df1.select(["name", "age"]).collect()
[Row(name=u'Alice', age=1), Row(name=u'Bob', age=5)]
>>> df2.select(["name", "height"]).collect()
[Row(name=u'Alice', height=66), Row(name=u'Bob', height=88)]
>>> df1.crossJoin(df2.select("height")).select("age", "name", "height").collect()
[Row(age=1, name=u'Alice', height=66), Row(age=1, name=u'Alice', height=88), Row(age=5, name=u'Bob', height=66), Row(age=5, name=u'Bob', height=88)]
  1. crosstab(col1, col2)
  2. cube(*cols)
    使用指定的列为当前的DataFrame创建一个多维数据集,所以我们可以对它们进行聚合。
>>> df.cube("name", df.age).count().orderBy("name", "age").show()
+----+----+-----+                                                               
|name| age|count|
+----+----+-----+
|null|null|    2|
|null|   1|    1|
|null|   2|    1|
| Bob|null|    1|
| Bob|   2|    1|
|name|null|    1|
|name|   1|    1|
+----+----+-----+
  1. describe(*cols)
    计算数字和字符串列的统计信息。
    这包括count,mean,stddev,min和max。 如果未给出具体的列名,则此函数计算所有数字或字符串列的统计信息。
>>> df.describe(["age"]).show()
+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|                 2|
|   mean|               1.5|
| stddev|0.7071067811865476|
|    min|                 1|
|    max|                 2|
+-------+------------------+
  1. distinct()
    返回包含此DataFrame中不相同行的新DataFrame。(去除相同的行)
  2. drop(*cols)
    返回删除指定列的新DataFrame。 如果模式不包含给定的列名,这是一个无意义操作。
  3. dropDuplicates(subset=None)
    返回删除重复行的新DataFrame,可选地仅考虑某些列。
>>> df3 = sc.parallelize([Row(name='Alice', age=5, height=80),Row(name='Alice', age=5, height=80),Row(name='Alice', age=10, height=80)]).toDF()
>>> df3.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+
  1. drop_duplicates(subset=None)
    dropDuplicates()的别名。
  2. dropna(how='any', thresh=None, subset=None)
    返回一个新的DataFrame,省略含有空值的行。 DataFrame.dropna()和DataFrameNaFunctions.drop()是彼此的别名。
    Parameters:
  • how - “any”或“all”。 如果“any”,如果它包含任何空值,则删除一行。 如果'all',只有当所有的值都为null时才删除一行。
  • thresh -
  • subset -
  1. dtypes
    以列表形式返回所有列名称及其数据类型。
>>> df3.dtypes
[('age', 'bigint'), ('height', 'bigint'), ('name', 'string')]
  1. explain(extended=False)
    打印(逻辑和物理)计划到控制台进行调试。
    Parameters:
  • extended - 布尔值,默认为False。 如果为False,则仅打印物理计划。
>>> df3.explain()
== Physical Plan ==
Scan ExistingRDD[age#277L,height#278L,name#279]
  1. fillna(value, subset=None)
    替换空值,na.fill()的别名。 DataFrame.fillna()和DataFrameNaFunctions.fill()是彼此的别名。
  2. filter(condition)
    使用给定的条件过滤行。
    where()是filter()的别名。
>>> df.filter(df.age > 1).collect()
[Row(name=u'Bob', age=2)]
>>> df.filter("age > 1").collect()
[Row(name=u'Bob', age=2)]
  1. first()
    将第一行作为Row返回。
>>> df.first()
Row(name=u'name', age=1)
  1. foreach(f)
    将f函数应用于此DataFrame的所有行。
    这是df.rdd.foreach()的简写。
  2. foreachPartition(f)
    将f函数应用于此DataFrame的每个分区。
    这是df.rdd.foreachPartition()的简写。
  3. freqItems(cols, support=None)
    找到列的频繁项,可能有误报。
  4. groupBy(*cols)
    使用指定的列对DataFrame进行分组,所以我们可以对它们进行聚合。 有关所有可用的聚合函数,请参阅GroupedData。groupby()是groupBy()的别名。
>>> df.groupBy("name").agg({"age":"mean"}).collect()
[Row(name=u'name', avg(age)=1.0), Row(name=u'Bob', avg(age)=2.0)]
>>> df.groupBy(["name",df.age]).count().collect()
[Row(name=u'Bob', age=2, count=1), Row(name=u'name', age=1, count=1)]           
  1. groupby(*cols)
  2. head(n=None)
    返回前n行。
  3. hint(name, *parameters)
    在当前的DataFrame上指定一些提示。
  4. intersect(other)
    仅返回包含此frame和另一frame中的行的新DataFrame。(两者的交集)
  5. isLocal()
    如果collect()和take()方法可以在本地运行(没有任何Spark执行器),则返回True。
  6. isStreaming
    如果此Dataset包含一个或多个在到达时连续返回数据的源,则返回true。 从流源读取数据的数据集必须使用DataStreamWriter中的start()方法作为StreamingQuery执行。 返回单个答案的方法(例如,count()或collect())将在存在流式源时引发AnalysisException。
  7. join(other, on=None, how=None)
    使用给定的连接表达式与另一个DataFrame进行连接。
    Parameters:
  • other -
  • on - 连接列名称的字符串,列名称列表,连接表达式(列)或列的列表。 如果on是一个字符串或者一个表示连接列名的字符串列表,那么这个列必须存在于两边,并且执行一个等连接。
  • how - str, default inner. Must be one of: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.
  1. limit(num)
    将结果计数限制为指定的数字。
  2. na
    返回一个DataFrameNaFunctions来处理缺失的值。
  3. orderBy(*cols, **kwargs)
    返回按指定列排序的新DataFrame。
>>> df1.orderBy(["name","age"],ascending=[0,1]).collect()
[Row(name=u'Bob', age=5), Row(name=u'Alice', age=1)]
  1. persist(storageLevel=StorageLevel(True, True, False, False, 1))
  2. printSchema()
    以树形结构打印schema。
>>> df1.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
  1. randomSplit(weights, seed=None)
    用提供的权重随机分割这个DataFrame。
  2. rdd
    将内容作为行的pyspark.RDD返回。
  3. registerTempTable(name)
    使用给定名称将此RDD注册为临时表。
    此临时表的生命周期与用于创建此DataFrame的SQLContext相关联。
>>> df1.registerTempTable("people")
>>> spark.sql("select * from people").collect()
[Row(name=u'Alice', age=1), Row(name=u'Bob', age=5)]
>>> spark.catalog.dropTampView("people")
  1. repartition(numPartitions, *cols)
    返回由给定分区表达式分区的新DataFrame。 生成的DataFrame是hash分区的。
    numPartitions可以是一个int来指定目标分区数量或一个Column。 如果它是一个列,它将被用作第一个分区列。 如果未指定,则使用默认的分区数量。
  2. replace(to_replace, value=None, subset=None)
    返回一个新的DataFrame,用另一个值替换一个值。 DataFrame.replace()和DataFrameNaFunctions.replace()是彼此的别名。
>>> df1.replace(["Alice", "Bob"], ["A", "B"]).show()
+----+---+
|name|age|
+----+---+
|   A|  1|
|   B|  5|
+----+---+
  1. rollup(*cols)
    使用指定的列为当前的DataFrame创建一个多维汇总,所以我们可以在它上运行聚合函数。
  2. sample(withReplacement, fraction, seed=None)
  3. sampleBy(col, fractions, seed=None)
  4. schema
    以pyspark.sql.types.StructType的形式返回此DataFrame的schema。
>>> df1.schema
StructType(List(StructField(name,StringType,true),StructField(age,LongType,true)))
  1. select(*cols)
    投影一组表达式并返回一个新的DataFrame。
>>> df.select(df.name, (df.age + 10).alias("height")).show()
+----+------+
|name|height|
+----+------+
|name|    11|
| Bob|    12|
+----+------+
  1. selectExpr(*expr)
    这是接受SQL表达式的select()的变体。
  2. show(n=20, truncate=True)
    将前n行打印到控制台。
  3. sort(*cols, **kwargs)
    返回按指定列排序的新DataFrame。
  4. sortWithinPartitions(*cols, **kwargs)
    返回一个新的DataFrame,每个分区按指定的列排序。
  5. stat
    为统计函数返回一个DataFrameStatFunctions。
  6. storageLevel
    获取DataFrame的当前存储级别。
>>> df1.storageLevel
StorageLevel(False, False, False, False, 1)
>>> df1.cache().storageLevel
StorageLevel(True, True, False, True, 1)
  1. subtract(other)
    返回一个新的DataFrame,它包含这个frame中的行,但不包含在另一个frame中。
  2. take(num)
  3. toDF(*cols)
    返回一个新的类:带有新指定列名的DataFrame。
  4. toJSON(use_unicode=True)
    将DataFrame转换为字符串的RDD。
    每行都被转换成一个JSON文档作为返回的RDD中的一个元素。
>>> df1.toJSON().collect()
[u'{"name":"Alice","age":1}', u'{"name":"Bob","age":5}']
  1. toLocalIterator()
    返回包含此DataFrame中所有行的迭代器。 迭代器将占用与此DataFrame中最大分区一样多的内存。
  2. toPandas()
    以Pandas中的pandas.DataFrame的形式返回此DataFrame的内容。
  3. union(other)
    在这个和另一个frame中返回一个包含行联合的新DataFrame。
  4. unpersist(blocking=False)
    将DataFrame标记为非持久性,并从内存和磁盘中删除所有的块。
  5. where(condition)
    与filter()相同。
  6. withColumn(colName, col)
    通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
>>> df1.withColumn("height", df1.age + 50).show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  1|    51|
|  Bob|  5|    55|
+-----+---+------+
  1. withColumnRenamed(existing, new)
    通过重命名现有列来返回新的DataFrame。 如果模式不包含给定的列名,则这是一个无意义操作。
  2. withWatermark(eventTime, delayThreshold)
    为此DataFrame定义事件时间水印。 一个水印跟踪一个时间点,在这个时间点之前,我们假设没有更晚的数据将要到达。
  3. write
    用于将非流式DataFrame的内容保存到外部存储器的接口。
  4. writeStream
    用于将流式DataFrame的内容保存到外部存储的接口。

class pyspark.sql.GroupedData(jgd, sql_ctx)

由DataFrame.groupBy()创建的DataFrame上的一组聚合方法。

  • Note: 实验阶段
  1. agg(*exprs)
    计算聚合并将结果作为DataFrame返回。
    可用的集合函数是avg,max,min,sum,count。
    如果exprs是从字符串到字符串的单个字典映射,则key是要执行聚合的列名,并且该value是聚合函数名。
    或者,exprs也可以是聚合列表达式的列表。
  2. avg(*cols)
    计算每个组的每个数字列的平均值。
    mean()是avg()的别名。
  3. count()
    统计每个组的记录数。
  4. max(*cols)
    计算每个组的每个数字列的最大值。
  5. mean(*cols)
    计算每个组的每个数字列的平均值。
  6. min(*cols)
    计算每个组的每个数字列的最小值。
  7. pivot(pivot_col, values=None)
    旋转当前[[DataFrame]]的列并执行指定的聚合。 有两个版本的透视函数:一个需要调用者指定不同值的列表以进行透视,另一个不支持。 后者更简洁但效率更低,因为Spark需要首先在内部计算不同值的列表。
    Parameters:
  • pivot_col - 要转移的列的名称。
  • values - 将被转换为输出DataFrame中的列的值的列表。
  1. sum(*cols)
    计算每个组的每个数字列的总和。

class pyspark.sql.Column(jc)

DataFrame中的一列。

  1. alias(*alias, **kwargs)
    使用新名称返回此列的别名。
  2. asc()
    基于给定列名称的升序返回一个排序表达式。
  3. astype(dataType)
    astype()是cast()的别名。
  4. between(lowerBound, upperBound)
    一个布尔表达式,如果此表达式的值位于给定列之间,则该表达式的值为true。
>>> df1.select(d1.name, df1.age.between(2, 4)).show()
+-----+---------------------------+
| name|((age >= 2) AND (age <= 4))|
+-----+---------------------------+
|Alice|                      false|
|  Bob|                      false|
+-----+---------------------------+

5 .bitwiseAND(other)
二元运算符

  1. bitwiseOR(other)
    二元运算符
  2. bitwiseXOR(other)
    二元运算符
  3. cast(dataType)
    将列转换为dataType类型。(转换某列的类型)
>>> df.select(df.name, df.age.cast("string").alias("ages")).collect()
[Row(name=u'Alice', ages=u'1'), Row(name=u'Bob', ages=u'5')]
  1. contains(other)
    二元运算符
  2. desc()
    基于给定列名称的降序返回一个排序表达式。
  3. endswith(other)
    根据匹配的字符串结尾返回一个布尔列。
>>> df.filter(df.name.endswith("ce")).collect()
[Row(name=u'Alice', age=1)]
  1. getField(name)
    在StructField中通过名称获取字段的表达式。
  2. getItem(key)
    从列表中获取位置序号的项,或者通过字典获取项的表达式。
  3. isNotNull()
    如果当前表达式为null,则为真。 通常结合DataFrame.filter()来选择具有非空值的行。
>>> df2 = sc.parallelize([Row(name=u'Tom', height=80), Row(name=u'Alice', height=None)]).toDF()
>>> df2.filter(df2.height.isNotNull()).collect()
[Row(height=80, name=u'Tom')]
  1. isNull()
    如果当前表达式为null,则为真。 通常与DataFrame.filter()结合来选择具有空值的行。
  2. isin(*cols)
    一个布尔表达式,如果此表达式的值由参数的评估值包含,则该值被评估为true。
>>> df[df.age.isin([1,2,3])].collect()
[Row(name=u'Alice', age=1)]
  1. like(other)
    返回基于SQL LIKE匹配的布尔列。
>>> df.filter(df.name.like("Al%")).collect()
[Row(name=u'Alice', age=1)]
  1. name(*alias, **kwargs)
    name()是alias()的别名。
  2. otherwise(value)
    评估条件列表并返回多个可能的结果表达式之一。 如果不调用Column.otherwise(),则不匹配条件返回None。
>>> from pyspark.sql import functions as f
>>> df.select(df.name, f.when(df.age > 3, 1).otherwise(0)).show()
+-----+-------------------------------------+
| name|CASE WHEN (age > 3) THEN 1 ELSE 0 END|
+-----+-------------------------------------+
|Alice|                                    0|
|  Bob|                                    1|
+-----+-------------------------------------+
  1. over(window)
    定义一个窗口列。
  2. rlike(other)
    基于正则表达式匹配返回一个布尔列。
>>> df.filter(df.name.rlike('ice$')).collect()
[Row(age=2, name=u'Alice')]
  1. startswith(other)
    根据字符串匹配返回一个布尔列。
  2. substr(startPos, length)
    返回一个列,它是该列的一个子字符串。
  3. when(condition, value)
    评估条件列表并返回多个可能的结果表达式之一。 如果不调用Column.otherwise(),则不匹配条件返回None。
>>> df.select(df.name, f.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
+-----+------------------------------------------------------------+
| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0 END|
+-----+------------------------------------------------------------+
|Alice|                                                          -1|
|  Bob|                                                           1|
+-----+------------------------------------------------------------+

class pyspark.sql.Row

DataFrame中的一行。 其中的字段可以被访问:row.key或者row[key]

>>> from pyspark.sql import Row
>>> row = Row(name="Alice", age=1)
>>> row.name
'Alice'
>>> row["age"]
1
>>> row
Row(age=1, name='Alice')
>>> "name" in row
True
  1. asDict(recursive=False)
    recursive - 将嵌套的行转换为字典(默认为False)。
>>> row = Row(name="Alice",value=Row(age=1, height=88))
>>> row.asDict()
{'name': 'Alice', 'value': Row(age=1, height=88)}
>>> row.asDict(True)
{'name': 'Alice', 'value': {'age': 1, 'height': 88}}

class pyspark.sql.DataFrameNaFunctions(df)

在DataFrame中处理丢失的数据的功能。

  1. drop(how='any', thresh=None, subset=None)
    返回一个新的DataFrame,省略含有空值的行。 DataFrame.dropna()和DataFrameNaFunctions.drop()是彼此的别名。
  2. fill(value, subset=None)
    替换空值,na.fill()的别名。 DataFrame.fillna()和DataFrameNaFunctions.fill()是彼此的别名。
  3. replace(to_replace, value, subset=None)

class pyspark.sql.DataFrameStatFunctions(df)

DataFrame的统计函数的功能。

  1. approxQuantile(col, probabilities, relativeError)
    计算DataFrame的数值列的近似分位数。
  2. corr(col1, col2, method=None)
    以双精度值计算DataFrame的两列的相关性。 目前只支持Pearson Correlation Coefficient。 DataFrame.corr()和DataFrameStatFunctions.corr()是彼此的别名。
  3. cov(col1, col2)
    计算给定列的样本协方差(由它们的名称指定)作为双精度值。 DataFrame.cov()和DataFrameStatFunctions.cov()是别名。
  4. crosstab(col1, col2)
    计算给定列的成对频率表。 也被称为应急表。
  5. freqItems(cols, support=None)
    找到列的频繁项,可能有误报。
  6. sampleBy(col, fractions, seed=None)
    根据每层上给出的分数返回一个没有更换的分层样本。

class pyspark.sql.Window

用于在DataFrame中定义窗口的实用函数。

>>> # ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
>>> window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
>>> # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
>>> window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)
  • Note: 实验阶段
    currentRow = 0
    static orderBy(cols) - 用定义的顺序创建一个WindowSpec。
    static partitionBy(
    cols) - 用定义的分区创建一个WindowSpec。
    static rangeBetween(start, end) -
    static rowsBetween(start, end) -
    unboundedFollowing = 9223372036854775807L
    unboundedPreceding = -9223372036854775808L

class pyspark.sql.WindowSpec(jspec)

定义 partitioning, ordering, and frame的窗口规范。
使用Window中的静态方法创建一个WindowSpec

  1. orderBy(*cols)
    定义WindowSpec中的排序列。
  2. partitionBy(*cols)
    定义WindowSpec中的分区列。
  3. rangeBetween(start, end)
    定义从开始(包含)到结束(包含)的框架边界。
  4. rowsBetween(start, end)
    定义从开始(包含)到结束(包含)的框架边界。

class pyspark.sql.DataFrameReader(spark)

用于从外部存储系统(例如文件系统,键值存储等)加载DataFrame的接口。 使用spark.read()来访问它。

  1. csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None)
    加载CSV文件并将结果作为DataFrame返回。
>>> df = spark.read.csv("file:/home/spark_sql_test.csv",header=True)
>>> df.show()
+---+-----+---+------+
| id| name|age|salary|
+---+-----+---+------+
|  1|Alice| 11|   111|
|  2|  Bob| 22|   222|
+---+-----+---+------+
  1. format(source)
    指定输入数据源格式。
>>> df = spark.read.format('json').load('python/test_support/sql/people.json')
>>> df.dtypes
[('age', 'bigint'), ('name', 'string')]
  1. jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)
    Parameters:
  • url – a JDBC URL of the form jdbc:subprotocol:subname
  • table – the name of the table
  1. json(path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None)
    加载JSON文件并将结果作为DataFrame返回。
>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
[('age', 'bigint'), ('name', 'string')]
  1. load(path=None, format=None, schema=None, **options)
    从数据源加载数据并将其作为:class DataFrame返回。
  2. option(key, value)
    为基础数据源添加一个输入选项。
    您可以设置以下选项来读取文件:
  • timeZone: 设置指示用于分析时间戳的时区的字符串
    在JSON / CSV数据源或分区值。 如果没有设置,它使用默认值,会话本地时区。
  1. options(**options)
  2. orc(path)
    加载ORC文件,将结果作为DataFrame返回。
  3. parquet(*paths)
    加载Parquet文件,将结果作为DataFrame返回。
  4. schema(schema)
    指定输入模式。
  5. table(tableName)
    以DataFrame的形式返回指定的表。
  6. text(paths)
    加载文本文件并返回一个DataFrame,该DataFrame的架构以名为“value”的字符串列开头,如果有的话,后跟分区列。

class pyspark.sql.DataFrameWriter(df)

用于将DataFrame写入外部存储系统(例如文件系统,键值存储等)的接口。 使用DataFrame.write()来访问这个。

  1. csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None)
    以指定的路径以CSV格式保存DataFrame的内容。
>>> df1 = spark.createDataFrame([(3,"Tom",33,333),],["id","name","age","salary"])
>>> df1.show()
+---+----+---+------+
| id|name|age|salary|
+---+----+---+------+
|  3| Tom| 33|   333|
+---+----+---+------+
>>> df1.write.csv("file:/home/spark_sql_test",mode="overwrite",header=True)
  1. format(source)
    指定基础输出数据源。
>>> df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data'))
  1. insertInto(tableName, overwrite=False)
    将DataFrame的内容插入到指定的表中。
    它要求类的架构:DataFrame与表的架构相同。
    可以覆盖任何现有的数据。
  2. jdbc(url, table, mode=None, properties=None)
    将DataFrame的内容通过JDBC保存到外部数据库表中。
  3. json(path, mode=None, compression=None, dateFormat=None, timestampFormat=None)
    将DataFrame的内容以JSON格式保存在指定的路径中。
  4. mode(saveMode)
    指定数据或表已经存在的行为。
    选项包括:
  • append:将此DataFrame的内容附加到现有数据。
  • overwrite:覆盖现有数据。
  • ignore: 如果数据已经存在,静默地忽略这个操作。
  • error:如果数据已经存在,则抛出异常。
  1. option(key, value)
  2. options(**options)
  3. orc(path, mode=None, partitionBy=None, compression=None)
    以指定的路径以ORC格式保存DataFrame的内容。
  4. parquet(path, mode=None, partitionBy=None, compression=None)
    将DataFrame的内容以Parquet格式保存在指定的路径中。
  5. partitionBy(*cols)
    按文件系统上的给定列对输出进行分区。
    如果指定,则输出将在文件系统上进行布局,类似于Hive的分区方案。
  6. save(path=None, format=None, mode=None, partitionBy=None, **options)
    将DataFrame的内容保存到数据源。
    数据源由格式和一组选项指定。 如果未指定format,则将使用由spark.sql.sources.default配置的缺省数据源。
  7. saveAsTable(name, format=None, mode=None, partitionBy=None, **options)
    将DataFrame的内容保存为指定的表格。
  8. text(path, compression=None)
    将DataFrame的内容保存在指定路径的文本文件中。

pyspark.sql.types module

class pyspark.sql.types.DataType

数据类型的基类。

  1. fromInternal(obj)
    将内部SQL对象转换为本地Python对象。
  2. json()
  3. jsonValue()
  4. needConversion()
    这种类型是否需要在Python对象和内部SQL对象之间进行转换?
    这用于避免ArrayType / MapType / StructType的不必要的转换。
  5. simpleString()
  6. toInternal(obj)
    将Python对象转换为内部SQL对象。
  7. classmethod typeName()

class pyspark.sql.types.NullType

空类型。
表示None的数据类型,用于无法推断的类型。

class pyspark.sql.types.StringType

字符串数据类型。

class pyspark.sql.types.BinaryType

二进制(字节数组)数据类型。

class pyspark.sql.types.BooleanType

布尔数据类型。

class pyspark.sql.types.DateType

Date(datetime.date)数据类型。
EPOCH_ORDINAL = 719163

  1. fromInternal(v)
  2. needConversion()
  3. toInternal(d)

class pyspark.sql.types.TimestampType

时间戳(datetime.datetime)数据类型。

  1. fromInternal(ts)
  2. needConversion()
  3. toInternal(dt)

class pyspark.sql.types.DecimalType(precision=10, scale=0)

十进制(decimal.Decimal)数据类型。

  1. jsonValue()
  2. simpleString()

class pyspark.sql.types.DoubleType

双数据类型,表示双精度浮点数。

class pyspark.sql.types.FloatType

浮点数据类型,表示单精度浮点数。

class pyspark.sql.types.ByteType

字节数据类型,即单个字节中的有符号整数。

  1. simpleString()

class pyspark.sql.types.IntegerType

Int数据类型,即有符号的32位整数。

  1. simpleString()

class pyspark.sql.types.LongType

长数据类型,即有符号的64位整数。

  1. simpleString()

class pyspark.sql.types.ShortType

短数据类型,即有符号的16位整数。

  1. simpleString()

class pyspark.sql.types.ArrayType(elementType, containsNull=True)

数组数据类型。

  1. fromInternal(obj)
  2. classmethod fromJson(json)
  3. jsonValue()
  4. needConversion()
  5. simpleString()
  6. toInternal(obj)

class pyspark.sql.types.MapType(keyType, valueType, valueContainsNull=True)

Map数据类型。

  1. fromInternal(obj)
  2. classmethod fromJson(json)
  3. jsonValue()
  4. needConversion()
  5. simpleString()
  6. toInternal(obj)

class pyspark.sql.types.StructField(name, dataType, nullable=True, metadata=None)

StructType中的一个字段。

  1. fromInternal(obj)
  2. classmethod fromJson(json)
  3. jsonValue()
  4. needConversion()
  5. simpleString()
  6. toInternal(obj)

class pyspark.sql.types.StructType(fields=None)

结构类型,由StructField的列表组成。
这是表示一个行的数据类型。

  1. add(field, data_type=None, nullable=True, metadata=None)
    通过添加新元素来构造一个StructType来定义模式。 该方法接受:
    一个参数是一个StructField对象;介于2到4之间的参数(name,data_type,nullable(可选),metadata(可选))。data_type参数可以是String或DataType对象。
  2. fromInternal(obj)
  3. classmethod fromJson(json)
  4. jsonValue()
  5. needConversion()
  6. simpleString()
  7. toInternal(obj)

pyspark.sql.functions module

内建函数的集合

  1. pyspark.sql.functions.abs(col)
    计算绝对值。
  2. pyspark.sql.functions.acos(col)
    计算给定值的余弦逆; 返回的角度在0到π的范围内。
  3. pyspark.sql.functions.add_months(start, months)
    返回开始后几个月的日期
>>> from pyspark.sql import functions as f
>>> df = spark.createDataFrame([("2017-12-25",)],["d"])
>>> df.select(f.add_months(df.d,1).alias("d")).collect()
[Row(d=datetime.date(2018, 1, 25))]
  1. pyspark.sql.functions.approx_count_distinct(col, rsd=None)
    返回col的近似不同计数的新列。
  2. pyspark.sql.functions.array(*cols)
    创建一个新的数组列。
  3. pyspark.sql.functions.array_contains(col, value)
    集合函数:如果数组为null,则返回null;如果数组包含给定值,则返回true;否则返回false。
  4. pyspark.sql.functions.asc(col)
    基于给定列名称的升序返回一个排序表达式。
  5. pyspark.sql.functions.ascii(col)
    计算字符串列的第一个字符的数值。
  6. pyspark.sql.functions.asin(col)
    计算给定值的正弦倒数; 返回的角度在- π/ 2到π/ 2的范围内。
  7. pyspark.sql.functions.atan(col)
    计算给定值的正切倒数。
  8. pyspark.sql.functions.atan2(col1, col2)
    返回直角坐标(x,y)到极坐标(r,theta)转换的角度theta。
  9. pyspark.sql.functions.avg(col)
    聚合函数:返回组中的值的平均值。
  10. pyspark.sql.functions.base64(col)
    计算二进制列的BASE64编码并将其作为字符串列返回。
  11. pyspark.sql.functions.bin(col)
    返回给定列的二进制值的字符串表示形式。
  12. pyspark.sql.functions.bitwiseNOT(col)
    不按位计算。
  13. pyspark.sql.functions.broadcast(df)
    将DataFrame标记为足够小以用于广播连接。
  14. pyspark.sql.functions.bround(col, scale=0)
    如果scale> = 0,则使用HALF_EVEN舍入模式对给定值进行四舍五入以缩放小数点;如果scale <0,则将其舍入到整数部分。
  15. pyspark.sql.functions.cbrt(col)
    计算给定值的立方根。
  16. pyspark.sql.functions.ceil(col)
    计算给定值的上限。
  17. pyspark.sql.functions.coalesce(*cols)
    返回不为空的第一列。
  18. pyspark.sql.functions.col(col)
    根据给定的列名返回一个列。
  19. pyspark.sql.functions.collect_list(col)
    聚合函数:返回重复对象的列表。
  20. pyspark.sql.functions.collect_set(col)
    聚合函数:返回一组消除重复元素的对象。
  21. pyspark.sql.functions.column(col)
    根据给定的列名返回一个列。
  22. pyspark.sql.functions.concat(*cols)
    将多个输入字符串列连接成一个字符串列。
>>> df = spark.createDataFrame([('abcd','123')], ['s', 'd'])
>>> df.select(f.concat(df.s, df.d).alias('s')).collect()
[Row(s=u'abcd123')]
  1. pyspark.sql.functions.concat_ws(sep, *cols)
    使用给定的分隔符将多个输入字符串列连接到一个字符串列中。
>>> df = spark.createDataFrame([('abcd','123')], ['s', 'd'])
>>> df.select(concat_ws('-', df.s, df.d).alias('s')).collect()
[Row(s=u'abcd-123')]
  1. pyspark.sql.functions.conv(col, fromBase, toBase)
    将字符串列中的数字从一个进制转换为另一个进制。
>>> df = spark.createDataFrame([("010101",)], ['n'])
>>> df.select(conv(df.n, 2, 16).alias('hex')).collect()
[Row(hex=u'15')]
  1. pyspark.sql.functions.corr(col1, col2)
    返回col1和col2的Pearson相关系数的新列。
  2. pyspark.sql.functions.cos(col)
    计算给定值的余弦。
  3. pyspark.sql.functions.cosh(col)
    计算给定值的双曲余弦。
  4. pyspark.sql.functions.count(col)
    聚合函数:返回组中的项目数量。
  5. pyspark.sql.functions.countDistinct(col, *cols)
    返回col或col的不同计数的新列。
  6. pyspark.sql.functions.covar_pop(col1, col2)
    返回col1和col2的总体协方差的新列。
  7. pyspark.sql.functions.covar_samp(col1, col2)
    返回col1和col2的样本协方差的新列。
  8. pyspark.sql.functions.crc32(col)
    计算二进制列的循环冗余校验值(CRC32),并将该值作为bigint返回。
  9. pyspark.sql.functions.create_map(*cols)
    创建一个新的地图列。
  10. pyspark.sql.functions.cume_dist()
    窗口函数:返回窗口分区内值的累积分布,即在当前行下面的行的分数。
  11. pyspark.sql.functions.current_date()
    以日期列的形式返回当前日期。
  12. pyspark.sql.functions.current_timestamp()
    将当前时间戳作为时间戳列返回。
  13. pyspark.sql.functions.date_add(start, days)
    返回开始后几天的日期
>>> df = spark.createDataFrame([('2015-04-08',)], ['d'])
>>> df.select(date_add(df.d, 1).alias('d')).collect()
[Row(d=datetime.date(2015, 4, 9))]
  1. pyspark.sql.functions.date_format(date, format)
    将日期/时间戳/字符串转换为由第二个参数给定日期格式指定格式的字符串值。
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(date_format('a', 'MM/dd/yyy').alias('date')).collect()
[Row(date=u'04/08/2015')]
  1. pyspark.sql.functions.date_sub(start, days)
    返回开始前几天的日期
>>> df = spark.createDataFrame([('2015-04-08',)], ['d'])
>>> df.select(date_sub(df.d, 1).alias('d')).collect()
[Row(d=datetime.date(2015, 4, 7))]
  1. pyspark.sql.functions.datediff(end, start)
    返回从开始到结束的天数。
>>> df = spark.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 'd2'])
>>> df.select(datediff(df.d2, df.d1).alias('diff')).collect()
[Row(diff=32)]
  1. pyspark.sql.functions.dayofmonth(col)
    将给定日期的月份的日期解压为整数。(一月中第几天)
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(dayofmonth('a').alias('day')).collect()
[Row(day=8)]
  1. pyspark.sql.functions.dayofyear(col)
    将给定日期的年份中的某一天提取为整数。(一年中第几天)
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(dayofyear('a').alias('day')).collect()
[Row(day=98)]
  1. pyspark.sql.functions.decode(col, charset)
    Computes the first argument into a string from a binary using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’).
  2. pyspark.sql.functions.degrees(col)
    将以弧度度量的角度转换为以度数度量的近似等效角度。
  3. pyspark.sql.functions.dense_rank()
    窗口函数:返回窗口分区内的行的等级,没有任何间隙
  4. pyspark.sql.functions.desc(col)
    基于给定列名称的降序返回一个排序表达式。
  5. pyspark.sql.functions.encode(col, charset)
    Computes the first argument into a binary from a string using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’).
  6. pyspark.sql.functions.exp(col)
    计算给定值的指数。
  7. pyspark.sql.functions.explode(col)
    返回给定数组或映射中每个元素的新行。
  8. pyspark.sql.functions.expm1(col)
    计算给定值的指数减1。
  9. pyspark.sql.functions.expr(str)
    将表达式字符串解析到它表示的列中
  10. pyspark.sql.functions.factorial(col)
    计算给定值的阶乘。
>>> df = spark.createDataFrame([(5,)], ['n'])
>>> df.select(factorial(df.n).alias('f')).collect()
[Row(f=120)]
  1. pyspark.sql.functions.first(col, ignorenulls=False)
    聚合函数:返回组中的第一个值。
  2. pyspark.sql.functions.floor(col)
    计算给定值的floor。
  3. pyspark.sql.functions.format_number(col, d)
    将数字X格式化为像'#, - #, - #.-'这样的格式,用HALF_EVEN舍入模式四舍五入到小数点后的位置,然后以字符串形式返回结果。
  4. pyspark.sql.functions.format_string(format, *cols)
    以printf-style格式化参数,并将结果作为字符串列返回。
  5. pyspark.sql.functions.from_json(col, schema, options={})
    使用指定的模式将包含JSON字符串的列解析为[[StructType]]的[[StructType]]或[[ArrayType]]。 在不可解析的字符串的情况下返回null。
  6. pyspark.sql.functions.from_unixtime(timestamp, format='yyyy-MM-dd HH:mm:ss')
    将来自unix时期(1970-01-01 00:00:00 UTC)的秒数转换为以给定格式表示当前系统时区中该时刻的时间戳的字符串。
  7. pyspark.sql.functions.from_utc_timestamp(timestamp, tz)
    给定一个时间戳,对应于UTC中的某个特定时间,返回对应于给定时区中同一时间的另一个时间戳。
  8. pyspark.sql.functions.get_json_object(col, path)
    从基于指定的json路径的json字符串中提取json对象,并返回提取的json对象的json字符串。 如果输入的json字符串无效,它将返回null。
  9. pyspark.sql.functions.greatest(*cols)
    返回列名称列表的最大值,跳过空值。 该功能至少需要2个参数。 如果所有参数都为空,它将返回null。
  10. pyspark.sql.functions.grouping(col)
    聚合函数:指示GROUP BY列表中的指定列是否被聚合,在结果集中返回1表示聚合或0表示未聚合。
  11. pyspark.sql.functions.grouping_id(*cols)
    聚合函数:返回分组的级别,等于(grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn)
  12. pyspark.sql.functions.hash(*cols)
    计算给定列的哈希码,并将结果作为int列返回。
>>> spark.createDataFrame([('ABC',)], ['a']).select(hash('a').alias('hash')).collect()
[Row(hash=-757602832)]
  1. pyspark.sql.functions.hex(col)
    计算给定列的十六进制值,可能是pyspark.sql.types.StringType,pyspark.sql.types.BinaryType,pyspark.sql.types.IntegerType或pyspark.sql.types.LongType。
>>> spark.createDataFrame([('ABC', 3)], ['a', 'b']).select(hex('a'), hex('b')).collect()
[Row(hex(a)=u'414243', hex(b)=u'3')]
  1. pyspark.sql.functions.hour(col)
    将给定日期的小时数提取为整数。
>>> df = spark.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
>>> df.select(hour('a').alias('hour')).collect()
[Row(hour=13)]
  1. pyspark.sql.functions.hypot(col1, col2)
    计算sqrt(a ^ 2 + b ^ 2),无中间上溢或下溢。
  2. pyspark.sql.functions.initcap(col)
    在句子中将每个单词的第一个字母翻译成大写。
>>> spark.createDataFrame([('ab cd',)], ['a']).select(initcap("a").alias('v')).collect()
[Row(v=u'Ab Cd')]
  1. pyspark.sql.functions.input_file_name()
    为当前Spark任务的文件名创建一个字符串列。
  2. pyspark.sql.functions.instr(str, substr)
    找到给定字符串中第一次出现substr列的位置。 如果其中任一参数为null,则返回null。
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(instr(df.s, 'b').alias('s')).collect()
[Row(s=2)]
  1. pyspark.sql.functions.isnan(col)
    如果列是NaN,则返回true的表达式。
  2. pyspark.sql.functions.isnull(col)
    如果列为空,则返回true的表达式。
  3. pyspark.sql.functions.json_tuple(col, *fields)
    根据给定的字段名称为json列创建一个新行。
    Parameters:
  • col - json格式的字符串列
  • fields - 要提取的字段列表
  1. pyspark.sql.functions.kurtosis(col)
    聚合函数:返回组中值的峰度。
  2. pyspark.sql.functions.lag(col, count=1, default=None)
    窗口函数:返回当前行之前的偏移行值;如果当前行之前的行数小于偏移量,则返回defaultValue。例如,若偏移量为1,将返回窗口分区中任何给定点的前一行。
    这相当于SQL中的LAG函数。
    Parameters:
  • col - 列名或表达式的名称
  • count - 要扩展的行数
  • default - 默认值
  1. pyspark.sql.functions.last(col, ignorenulls=False)
    聚合函数:返回组中的最后一个值。
    该函数默认返回它看到的最后一个值。 当ignoreNulls设置为true时,它将返回它看到的最后一个非null值。 如果所有值都为空,则返回null。
  2. pyspark.sql.functions.last_day(date)
    返回给定日期所属月份的最后一天。
>>> df = spark.createDataFrame([('1997-02-10',)], ['d'])
>>> df.select(last_day(df.d).alias('date')).collect()
[Row(date=datetime.date(1997, 2, 28))]
  1. pyspark.sql.functions.lead(col, count=1, default=None)
    Window函数:返回当前行之后的偏移行值;如果当前行之后的行数小于偏移行,则返回defaultValue。 例如,偏移量为1,将返回窗口分区中任意给定点的下一行。
    这相当于SQL中的LEAD函数。
  2. pyspark.sql.functions.least(*cols)
    返回多列中的最小值,跳过空值。 该功能至少需要2个参数,及最少需要两个列名。 如果所有参数都为空,它将返回null。
>>> df = spark.createDataFrame([(1, 4, 3)], ['a', 'b', 'c'])
>>> df.select(least(df.a, df.b, df.c).alias("least")).collect()
[Row(least=1)]
  1. pyspark.sql.functions.length(col)
    计算字符串或二进制表达式的长度。
>>> spark.createDataFrame([('ABC',)], ['a']).select(length('a').alias('length')).collect()
[Row(length=3)]
  1. pyspark.sql.functions.levenshtein(left, right)
    计算两个给定字符串的Levenshtein距离。
    Levenshtein距离(编辑距离),是指两个字串之间,由一个转成另一个所需的最少编辑操作次数。具体可自行百度。
>>> df0 = spark.createDataFrame([('kitten', 'sitting',)], ['l', 'r'])
>>> df0.select(levenshtein('l', 'r').alias('d')).collect()
[Row(d=3)]
  1. pyspark.sql.functions.lit(col)
    创建一个字面值的列。
  2. pyspark.sql.functions.locate(substr, str, pos=1)
    在str字符串列中找到在pos位置后面第一个出现substr的位置。
  • Note: 该位置不是从零开始的,而是从1开始的。 如果在str中找不到substr,则返回0。
    Parameters:
  • substr - 要查找的字符串
  • str - pyspark.sql.types.StringType的列
  • pos - 起始位置
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(locate('b', df.s, 1).alias('s')).collect()
[Row(s=2)]
  1. pyspark.sql.functions.log(arg1, arg2=None)
    返回第二个参数的基于第一个参数的对数。
    如果只有一个参数,那么这个参数就是自然对数。
>>> df.select(log(10.0, df.age).alias('ten')).rdd.map(lambda l: str(l.ten)[:7]).collect()
['0.30102', '0.69897']
>>> df.select(log(df.age).alias('e')).rdd.map(lambda l: str(l.e)[:7]).collect()
['0.69314', '1.60943']
  1. pyspark.sql.functions.log10(col)
    计算给定一个数以10为底的对数。
  2. pyspark.sql.functions.log1p(col)
    Computes the natural logarithm of the given value plus one.
  3. pyspark.sql.functions.log2(col)
    返回参数的基数为2的对数。
>>> spark.createDataFrame([(4,)], ['a']).select(log2('a').alias('log2')).collect()
[Row(log2=2.0)]
  1. pyspark.sql.functions.lower(col)
    将字符串列转换为小写。
  2. pyspark.sql.functions.lpad(col, len, pad)
    左填充到指定长度。
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(lpad(df.s, 6, '#').alias('s')).collect()
[Row(s=u'##abcd')]
  1. pyspark.sql.functions.ltrim(col)
    去掉字符串左边的空格。
  2. pyspark.sql.functions.max(col)
    聚合函数:返回组中表达式的最大值。
  3. pyspark.sql.functions.md5(col)
    计算某给定值的MD5值,将该值作为32个字符的十六进制字符串返回。
>>> spark.createDataFrame([('ABC',)], ['a']).select(md5('a').alias('hash')).collect()
[Row(hash=u'902fbdd2b1df0c4f70b4a5d23525e932')]
  1. pyspark.sql.functions.mean(col)
    聚合函数:返回组中所有值的平均值。
  2. pyspark.sql.functions.min(col)
    聚合函数:返回组中表达式的最小值。
  3. pyspark.sql.functions.minute(col)
    提取给定日期的分钟数为整数。
>>> df = spark.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
>>> df.select(minute('a').alias('minute')).collect()
[Row(minute=8)]
  1. pyspark.sql.functions.monotonically_increasing_id()
    生成单调递增的64位整数的列。
    生成的ID保证是单调递增和唯一的,但不是连续的。 当前的实现将分区ID放在高31位,并将每个分区内的记录号放在低33位。 假设数据框的分区少于10亿个,每个分区少于80亿条记录。
    作为一个例子,考虑一个带有两个分区的DataFrame,每个分区有三个记录。 该表达式将返回以下ID:0,1,2,8589934592(1L << 33),8589934593,8589934594。
>>> df0 = sc.parallelize(range(2), 2).mapPartitions(lambda x: [(1,), (2,), (3,)]).toDF(['col1'])
>>> df0.select(monotonically_increasing_id().alias('id')).collect()
[Row(id=0), Row(id=1), Row(id=2), Row(id=8589934592), Row(id=8589934593), Row(id=8589934594)]
  1. pyspark.sql.functions.month(col)
    将给定日期的月份提取为整数。
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(month('a').alias('month')).collect()
[Row(month=4)]
  1. pyspark.sql.functions.months_between(date1, date2)
    返回date1和date2之间的月数。
  2. pyspark.sql.functions.nanvl(col1, col2)
    如果col1不是NaN,则返回col1;如果col1是NaN,则返回col2。
    两个输入都应该是浮点列(DoubleType或FloatType)。
>>> df = spark.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b"))
>>> df.select(nanvl("a", "b").alias("r1"), nanvl(df.a, df.b).alias("r2
                        
版权声明:本文来源简书,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://www.jianshu.com/p/7e95b9804d93
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-01-08 06:59:16
  • 阅读 ( 1084 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢