DSL风格API
1. 列操作 (Column)
列操作可以用于选择、计算或者创建新列。可以通过 select()
、alias()
、withColumn()
等方法对列进行操作。
from pyspark.sql.functions import col
# 列的选择
student_id_column = df['student_id'] # 使用 [] 操作符选择列
grade_column = col('grade') # 使用 col() 函数选择列
# 创建新列
df = df.withColumn('total_score', df['math_score'] + df['english_score'] + df['science_score'])
# 修改列名
df = df.withColumnRenamed('student_id', 'id')
2. 过滤操作 (Filter)
filter()
和 where()
方法等价,用于根据条件过滤数据。
# 使用字符串表达式
df.filter('age > 18').show(5)
# 使用 Column 对象
df.filter(df['age'] > 18).show(5)
# 使用多个条件 (注意使用 & | 操作符,且条件需用括号)
df.filter((df['age'] > 18) & (df['grade'] == 'A')).show(5)
3. 聚合操作 (Group By)
groupBy()
用于对指定列进行分组,返回 GroupedData
对象,然后可以调用聚合函数,如 sum()
、avg()
、count()
、min()
、max()
。
# 分组并计算每个年级的学生数量
df.groupBy('grade').count().show()
# 分组并计算每个年级的平均分
df.groupBy('grade').avg('total_score').show()
# 分组并计算每个年级的总分
df.groupBy('grade').sum('total_score').show()
4. 排序操作 (Sort)
orderBy()
用于对 DataFrame 按指定列进行排序,升序或降序。
# 按 'age' 升序排序
df.orderBy('age').show(5)
# 按 'age' 降序排序
df.orderBy(df['age'].desc()).show(5)
5. 去重操作 (Distinct)
distinct()
用于从 DataFrame 中去除重复的行。
# 去除重复的行
df.select('name', 'age').distinct().show()
# 按特定列去重 (dropDuplicates())
df.dropDuplicates(['name']).show()
6. 聚合函数 (Aggregate Functions)
可以使用聚合函数对数据进行计算,如 sum()
、avg()
、count()
等。可以通过 agg()
方法结合聚合函数对数据进行多列、多条件的聚合。
from pyspark.sql import functions as F
# 计算某列的总和
df.select(F.sum('total_score')).show()
# 多列聚合
df.groupBy('grade').agg(
F.avg('total_score').alias('avg_score'),
F.max('total_score').alias('max_score')
).show()
7. 连接操作 (Join)
join()
用于将两个 DataFrame 按指定列连接,支持内连接、左连接、右连接、全连接等。
# 假设有两个 DataFrame: students 和 scores
# 按 'student_id' 进行内连接
joined_df = students.join(scores, on='student_id', how='inner')
joined_df.show()
8. 条件操作 (When and Otherwise)
when()
和 otherwise()
用于基于条件创建新列,类似于 SQL 中的 CASE WHEN
。
from pyspark.sql.functions import when
# 创建一个新列 'pass',如果 'total_score' 大于 200,则为 'Yes',否则为 'No'
df = df.withColumn('pass', when(df['total_score'] > 200, 'Yes').otherwise('No'))
df.show()
9. 空值处理 (Handling Nulls)
na
子模块用于处理空值,如 drop()
、fill()
、replace()
。
# 删除包含空值的行
df.na.drop().show()
# 将空值填充为特定值
df.na.fill({'age': 0, 'grade': 'Unknown'}).show()
# 替换特定值
df.na.replace('NA', None).show()
10. 其他常用函数
limit(n)
: 获取前 n 行sample(fraction)
: 以指定比例随机抽样count()
: 获取 DataFrame 的总行数
# 获取前 10 行
df.limit(10).show()
# 随机抽样 20% 的数据
df.sample(0.2).show()
# 统计总行数
print(f"Total rows: {df.count()}")