菜单
本页目录

SparkSQL使用窗口函数

好的,接下来我将详细全面地讲解PySpark SQL中DSL和SQL风格的窗口函数。两种风格各有优劣,在不同场景中可以选择最合适的一种方式来使用。以下内容涵盖如何定义窗口、使用各种窗口函数,以及两种风格的具体区别和实现。

一、窗口函数概述

窗口函数在分区内对每一行进行聚合计算,并且保持原始行级数据。不同于groupBy聚合,窗口函数并不减少行数,而是增加一个基于窗口的计算列。

窗口函数的典型应用包括:

  • 行号和排名:如 row_number()rank()
  • 累积聚合:如 sum()avg()
  • 滑动窗口:如前n个元素的总和。

二、窗口函数的核心概念

  1. 定义窗口:在PySpark中使用窗口函数需要先定义一个窗口,这个窗口是由Window对象定义的。
  2. 窗口函数:窗口函数是对特定分区和排序的窗口应用计算,比如row_number()rank()sum()等。
  3. 风格
    • DSL风格:使用DataFrame API风格编写窗口函数,更接近Python程序员的习惯。
    • SQL风格:使用SQL查询窗口函数,适合数据分析人员或习惯SQL语法的用户。

三、DSL 风格的窗口函数使用

在DSL风格中,我们使用pyspark.sql.window模块来定义窗口,并使用DataFrame API来应用窗口函数。

1. 准备工作

首先,我们创建一个示例DataFrame来演示窗口函数的使用。

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 初始化SparkSession
spark = SparkSession.builder.appName("DSL Window Functions Example").getOrCreate()

# 创建示例DataFrame
data = [
    ("Alice", "Sales", 5000),
    ("Bob", "Sales", 4000),
    ("Cathy", "Sales", 6000),
    ("David", "HR", 4500),
    ("Eva", "HR", 3500),
    ("Frank", "HR", 5000),
    ("Grace", "IT", 7000),
    ("Henry", "IT", 7500),
    ("Isabel", "IT", 8000)
]
df = spark.createDataFrame(data, ["employee_name", "department", "salary"])

2. 定义窗口

在DSL风格中,使用Window来定义窗口的分区和排序规范。

window_spec = Window.partitionBy("department").orderBy(F.desc("salary"))
  • partitionBy("department"):定义窗口在部门内分区。
  • orderBy(F.desc("salary")):定义窗口内的排序规则,按工资降序。

3. 使用窗口函数

3.1 使用 row_number()

计算每个部门内员工的排名。

df_with_row_number = df.withColumn("row_number", F.row_number().over(window_spec))
df_with_row_number.show()

输出

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|        Cathy|     Sales|  6000|         1|
|        Alice|     Sales|  5000|         2|
|          Bob|     Sales|  4000|         3|
|       Isabel|        IT|  8000|         1|
|        Henry|        IT|  7500|         2|
|        Grace|        IT|  7000|         3|
|        Frank|        HR|  5000|         1|
|        David|        HR|  4500|         2|
|          Eva|        HR|  3500|         3|
+-------------+----------+------+----------+
3.2 使用 rank()

计算部门内的工资排名,工资相同的员工排名也相同。

df_with_rank = df.withColumn("rank", F.rank().over(window_spec))
df_with_rank.show()

输出

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        Cathy|     Sales|  6000|   1|
|        Alice|     Sales|  5000|   2|
|          Bob|     Sales|  4000|   3|
|       Isabel|        IT|  8000|   1|
|        Henry|        IT|  7500|   2|
|        Grace|        IT|  7000|   3|
|        Frank|        HR|  5000|   1|
|        David|        HR|  4500|   2|
|          Eva|        HR|  3500|   3|
+-------------+----------+------+----+
3.3 使用聚合函数(sum()

计算每个部门内工资的累计总和。

df_with_sum = df.withColumn("department_salary_total", F.sum("salary").over(Window.partitionBy("department")))
df_with_sum.show()

输出

+-------------+----------+------+----------------------+
|employee_name|department|salary|department_salary_total|
+-------------+----------+------+----------------------+
|        Alice|     Sales|  5000|                 15000|
|          Bob|     Sales|  4000|                 15000|
|        Cathy|     Sales|  6000|                 15000|
|        David|        HR|  4500|                 13000|
|          Eva|        HR|  3500|                 13000|
|        Frank|        HR|  5000|                 13000|
|        Grace|        IT|  7000|                 22500|
|        Henry|        IT|  7500|                 22500|
|       Isabel|        IT|  8000|                 22500|
+-------------+----------+------+----------------------+

四、SQL 风格的窗口函数使用

PySpark SQL 风格允许你编写类似标准SQL的查询,结合窗口函数来对数据进行操作。这种风格更符合SQL用户的习惯。

1. 创建临时表

首先,将DataFrame注册为临时表,以便使用SQL语句查询。

df.createOrReplaceTempView("employee")

2. 使用 SQL 查询窗口函数

2.1 使用 row_number()

通过SQL查询计算每个部门内的员工排名。

result = spark.sql("""
    SELECT employee_name, department, salary,
           ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS row_number
    FROM employee
""")
result.show()

输出

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|        Cathy|     Sales|  6000|         1|
|        Alice|     Sales|  5000|         2|
|          Bob|     Sales|  4000|         3|
|       Isabel|        IT|  8000|         1|
|        Henry|        IT|  7500|         2|
|        Grace|        IT|  7000|         3|
|        Frank|        HR|  5000|         1|
|        David|        HR|  4500|         2|
|          Eva|        HR|  3500|         3|
+-------------+----------+------+----------+
2.2 使用 rank()

通过SQL查询计算部门内的工资排名。

result = spark.sql("""
    SELECT employee_name, department, salary,
           RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rank
    FROM employee
""")
result.show()

输出

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        Cathy|     Sales|  6000|   1|
|        Alice|     Sales|  5000|   2|
|          Bob|     Sales|  4000|   3|
|       Isabel|        IT|  8000|   1|
|        Henry|        IT|  7500|   2|
|        Grace|        IT|  7000|   3|
|        Frank|        HR|  5000|   1|
|        David|        HR|  4500|   2|
|          Eva|        HR|  3500|   3|
+-------------+----------+------+----+
2.3 使用聚合函数(sum()

计算每个部门的工资总和。

result = spark.sql("""
    SELECT employee_name, department, salary,
           SUM(salary) OVER (PARTITION BY department) AS department_salary_total
    FROM employee
""")
result.show()

输出

+-------------+----------+------+----------------------+
|employee_name|department|salary|department_salary_total|
+-------------+----------+------+----------------------+
|        Alice|     Sales|  5000|                 15000|
|          Bob|     Sales|  4000|                 15000|
|        Cathy|     Sales|  6000|                 15000|
|        David|        HR|  4500|                 13000|
|          Eva|        HR|  3500|                 13000|
|        Frank|        HR|  5000|                 13000|
|        Grace|        IT|  7000|                 22500|
|        Henry|        IT|  7500|                 22500|
|       Isabel|        IT|  8000|                 22500|
+-------------+----------+------+----------------------+

五、DSL 风格与 SQL 风格的对比

特性DSL 风格SQL 风格
风格更接近Python,函数调用链式处理类似标准SQL语法,更适合SQL用户
灵活性更适合程序化数据处理,便于与Python代码结合更适合复杂查询和数据分析任务,易读性好
窗口定义通过Window对象定义通过SQL语法直接定义
使用场景适合与其他Spark DataFrame API一起使用适合需要使用复杂SQL查询的场景

六、总结

  • 窗口函数可以让你对每一行保留原始数据并增加一个基于窗口的计算列。
  • PySpark提供了DSL风格SQL风格两种方式来使用窗口函数。
    • DSL风格:使用DataFrame API和Window模块来定义和应用窗口函数。
    • SQL风格:通过SQL查询和窗口函数直接操作数据,语法和传统SQL类似。
  • 在选择风格时,可以根据具体的业务需求、团队成员的背景以及代码的可读性来决定使用哪种风格。

希望这些内容能够帮助你全面理解和使用PySpark SQL中的窗口函数。根据你的实际需求,可以灵活选择DSL或SQL风格来操作数据。