菜单
本页目录

DSL风格API

1. 列操作 (Column)

列操作可以用于选择、计算或者创建新列。可以通过 select()alias()withColumn() 等方法对列进行操作。

from pyspark.sql.functions import col

# 列的选择
student_id_column = df['student_id']  # 使用 [] 操作符选择列
grade_column = col('grade')           # 使用 col() 函数选择列

# 创建新列
df = df.withColumn('total_score', df['math_score'] + df['english_score'] + df['science_score'])

# 修改列名
df = df.withColumnRenamed('student_id', 'id')

2. 过滤操作 (Filter)

filter()where() 方法等价,用于根据条件过滤数据。

# 使用字符串表达式
df.filter('age > 18').show(5)

# 使用 Column 对象
df.filter(df['age'] > 18).show(5)

# 使用多个条件 (注意使用 & | 操作符,且条件需用括号)
df.filter((df['age'] > 18) & (df['grade'] == 'A')).show(5)

3. 聚合操作 (Group By)

groupBy() 用于对指定列进行分组,返回 GroupedData 对象,然后可以调用聚合函数,如 sum()avg()count()min()max()

# 分组并计算每个年级的学生数量
df.groupBy('grade').count().show()

# 分组并计算每个年级的平均分
df.groupBy('grade').avg('total_score').show()

# 分组并计算每个年级的总分
df.groupBy('grade').sum('total_score').show()

4. 排序操作 (Sort)

orderBy() 用于对 DataFrame 按指定列进行排序,升序或降序。

# 按 'age' 升序排序
df.orderBy('age').show(5)

# 按 'age' 降序排序
df.orderBy(df['age'].desc()).show(5)

5. 去重操作 (Distinct)

distinct() 用于从 DataFrame 中去除重复的行。

# 去除重复的行
df.select('name', 'age').distinct().show()

# 按特定列去重 (dropDuplicates())
df.dropDuplicates(['name']).show()

6. 聚合函数 (Aggregate Functions)

可以使用聚合函数对数据进行计算,如 sum()avg()count() 等。可以通过 agg() 方法结合聚合函数对数据进行多列、多条件的聚合。

from pyspark.sql import functions as F

# 计算某列的总和
df.select(F.sum('total_score')).show()

# 多列聚合
df.groupBy('grade').agg(
    F.avg('total_score').alias('avg_score'),
    F.max('total_score').alias('max_score')
).show()

7. 连接操作 (Join)

join() 用于将两个 DataFrame 按指定列连接,支持内连接、左连接、右连接、全连接等。

# 假设有两个 DataFrame: students 和 scores
# 按 'student_id' 进行内连接
joined_df = students.join(scores, on='student_id', how='inner')
joined_df.show()

8. 条件操作 (When and Otherwise)

when()otherwise() 用于基于条件创建新列,类似于 SQL 中的 CASE WHEN

from pyspark.sql.functions import when

# 创建一个新列 'pass',如果 'total_score' 大于 200,则为 'Yes',否则为 'No'
df = df.withColumn('pass', when(df['total_score'] > 200, 'Yes').otherwise('No'))
df.show()

9. 空值处理 (Handling Nulls)

na 子模块用于处理空值,如 drop()fill()replace()

# 删除包含空值的行
df.na.drop().show()

# 将空值填充为特定值
df.na.fill({'age': 0, 'grade': 'Unknown'}).show()

# 替换特定值
df.na.replace('NA', None).show()

10. 其他常用函数

  • limit(n): 获取前 n 行
  • sample(fraction): 以指定比例随机抽样
  • count(): 获取 DataFrame 的总行数
# 获取前 10 行
df.limit(10).show()

# 随机抽样 20% 的数据
df.sample(0.2).show()

# 统计总行数
print(f"Total rows: {df.count()}")