菜单
本页目录

SQL风格的API

本质就是创建一个视图用正常的SQL去查询

在 PySpark 中,除了使用 DSL 风格的 API 进行操作,还可以使用类似 SQL 的语法来进行数据查询和处理。这种 SQL 风格的 API 主要通过 sparkSession.sql() 来完成。为了使用 SQL 风格的查询,需要先将 DataFrame 注册为一个临时视图,然后就可以用 SQL 语句对数据进行查询和处理。下面是一些示例,帮助您更好地了解如何在 PySpark 中使用 SQL 风格的 API。

1. 注册临时视图 (Temporary View)

为了对 DataFrame 使用 SQL 查询,首先需要将 DataFrame 注册为一个临时视图。使用 createOrReplaceTempView()createGlobalTempView() 方法。

# 将 DataFrame 注册为临时视图
df.createOrReplaceTempView('students')

# 全局视图的使用,适用于跨 session 的查询
df.createGlobalTempView('global_students')

2. 基本查询 (Basic Query)

使用 spark.sql() 来执行 SQL 语句并返回一个 DataFrame。

# 查询所有列,相当于 SELECT * FROM students
result_df = spark.sql("SELECT * FROM students")
result_df.show()

# 查询部分列
result_df = spark.sql("SELECT name, age, grade FROM students")
result_df.show()

3. 条件查询 (Conditional Query)

通过 WHERE 子句对数据进行条件过滤。

# 查询年龄小于 20 的学生
result_df = spark.sql("SELECT * FROM students WHERE age < 20")
result_df.show()

# 查询年级为 'A' 且年龄大于 18 的学生
result_df = spark.sql("SELECT * FROM students WHERE grade = 'A' AND age > 18")
result_df.show()

4. 分组与聚合 (Group By and Aggregation)

使用 GROUP BY 进行分组,并结合聚合函数 COUNT()SUM()AVG()MIN()MAX() 等来聚合数据。

# 按年级统计每个年级的学生数量
result_df = spark.sql("SELECT grade, COUNT(*) AS student_count FROM students GROUP BY grade")
result_df.show()

# 按年级统计每个年级的学生总分和平均分
result_df = spark.sql("""
    SELECT grade, SUM(total_score) AS total_score_sum, AVG(total_score) AS avg_score
    FROM students
    GROUP BY grade
""")
result_df.show()

5. 排序 (Order By)

使用 ORDER BY 对查询结果进行排序,默认为升序,可以使用 DESC 实现降序。

# 按年龄升序排序
result_df = spark.sql("SELECT * FROM students ORDER BY age")
result_df.show()

# 按总分降序排序
result_df = spark.sql("SELECT * FROM students ORDER BY total_score DESC")
result_df.show()

6. 去重 (Distinct)

使用 DISTINCT 去除重复的记录。

# 查询所有不同的年级
result_df = spark.sql("SELECT DISTINCT grade FROM students")
result_df.show()

# 查询去重后的学生姓名和年龄
result_df = spark.sql("SELECT DISTINCT name, age FROM students")
result_df.show()

7. 连接 (Join)

通过 JOIN 语句连接多个 DataFrame,例如内连接、左连接、右连接、全连接等。

# 假设有两个 DataFrame:students 和 scores
students.createOrReplaceTempView('students')
scores.createOrReplaceTempView('scores')

# 内连接:查询学生和他们的成绩
result_df = spark.sql("""
    SELECT students.student_id, students.name, scores.subject, scores.score
    FROM students
    JOIN scores ON students.student_id = scores.student_id
""")
result_df.show()

# 左连接:保留所有学生,即使他们没有对应的成绩
result_df = spark.sql("""
    SELECT students.student_id, students.name, scores.subject, scores.score
    FROM students
    LEFT JOIN scores ON students.student_id = scores.student_id
""")
result_df.show()

8. 条件逻辑 (CASE WHEN)

使用 CASE WHEN 来实现类似条件分支的逻辑。

# 创建一个新的列,标记是否及格,如果 total_score 大于 200,则标记为 'Pass',否则为 'Fail'
result_df = spark.sql("""
    SELECT *,
           CASE
               WHEN total_score > 200 THEN 'Pass'
               ELSE 'Fail'
           END AS pass_status
    FROM students
""")
result_df.show()

9. 处理空值 (Handling Nulls)

通过 SQL 语法处理空值,例如 IS NULLIS NOT NULL 等。

# 查询成绩为空的学生
result_df = spark.sql("SELECT * FROM students WHERE total_score IS NULL")
result_df.show()

# 查询成绩不为空的学生
result_df = spark.sql("SELECT * FROM students WHERE total_score IS NOT NULL")
result_df.show()

10. 分页查询 (Limit)

使用 LIMIT 对查询结果进行分页,获取前 n 行。

# 查询前 10 个学生
result_df = spark.sql("SELECT * FROM students LIMIT 10")
result_df.show()

11. 多表联合查询 (Union)

通过 UNION 将两个表的数据合并,但要求两个表的列数和列类型相同。

# 假设有两个相同结构的 DataFrame:students_1 和 students_2
students_1.createOrReplaceTempView('students_1')
students_2.createOrReplaceTempView('students_2')

# 合并两个表的数据
result_df = spark.sql("SELECT * FROM students_1 UNION SELECT * FROM students_2")
result_df.show()

# 如果允许重复行,使用 UNION ALL
result_df = spark.sql("SELECT * FROM students_1 UNION ALL SELECT * FROM students_2")
result_df.show()

12. 聚合函数 (Aggregate Functions)

通过 SQL 查询中的聚合函数,如 SUM()AVG() 等,进行数据汇总分析。

# 计算学生总分的平均值
result_df = spark.sql("SELECT AVG(total_score) AS avg_total_score FROM students")
result_df.show()

# 统计总学生数
result_df = spark.sql("SELECT COUNT(*) AS total_students FROM students")
result_df.show()