Transformation 算子
Transformation 算子 (不会引发 Shuffle )
Transformation 算子返回新的 RDD
这些算子在原始数据的本地分区中完成,不涉及跨节点的数据传输。
map 算子 - Transformation
-
功能:对 RDD 的每个元素应用指定的函数,并返回一个新的 RDD。
-
语法:
rdd.map(func)
func
:函数,接受 RDD 中的每个元素并返回新的值。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([1,2,3,4,5]) rdd2 = rdd.map(lambda x: x * 10) print(rdd2.collect())
-
输出:
[10, 20, 30, 40, 50]
flatMap 算子 - Transformation
-
功能:对 RDD 执行
map
操作后,再对结果进行解除嵌套操作。 -
解除嵌套:假设有一个嵌套的列表
[[1,2,3],[4,5,6],[7,8,9]]
,flatMap 解除嵌套后得到[1,2,3,4,5,6,7,8,9]
。 -
语法:
rdd.flatMap(func)
func
:函数,接受 RDD 中的每个元素并返回一个可迭代对象。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize(['aa aa aa', 'bb bb bb', 'cc cc cc']) rdd2 = rdd.flatMap(lambda strs: strs.split()) print(rdd2.collect())
-
输出:
['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc']
mapValues 算子 - Transformation
-
功能:对每个
(key, value)
形式的 RDD 元素,应用函数到其value
部分。 -
语法:
rdd.mapValues(func)
func
:函数,作用于二元元组的value
值。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([('a', 1), ('b', 2), ('c', 3)]) rdd2 = rdd.mapValues(lambda x: x * 10) print(rdd2.collect())
-
输出:
[1. **aggregate** - **功能**:跨分区聚合,初始值和分区间操作都可自定义。 - **引发 Shuffle**:否 - **示例**: ```python rdd = sc.parallelize([1, 2, 3, 4]) result = rdd.aggregate((0, 0), lambda acc, value: (acc[0] + value, acc[1] + 1), lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) print(result) # 输出:(10, 4)
treeAggregate - Transformation
- 功能:以树的方式进行聚合以提高性能,适用于大规模数据集。
- 引发 Shuffle:否
- 示例:
rdd = sc.parallelize([1, 2, 3, 4]) result = rdd.treeAggregate(0, lambda acc, x: acc + x, lambda acc1, acc2: acc1 + acc2) print(result)
treeReduce - Transformation
- 功能:以树的方式进行全局聚合以提高性能。
- 引发 Shuffle:否
- 示例:
rdd = sc.parallelize([1, 2, 3, 4]) result = rdd.treeReduce(lambda x, y: x + y) print(result)#[('a', 10), ('b', 20), ('c', 30)]
filter 算子 - Transformation
-
功能:通过指定条件过滤 RDD 元素,仅保留符合条件的元素。
-
语法:
rdd.filter(func)
func
:过滤条件函数,返回True
保留元素,False
则丢弃。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([1,2,3,4,5,6]) rdd2 = rdd.filter(lambda x: x % 2 == 0) print(rdd2.collect())
-
输出:
[2, 4, 6]
union 算子 - Transformation
-
功能:合并两个 RDD,不去重。
-
语法:
rdd.union(other_rdd)
other_rdd
:要合并的另一个 RDD。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd1 = sc.parallelize([1, 2, 3]) rdd2 = sc.parallelize([4, 5, 6]) rdd_union = rdd1.union(rdd2) print(rdd_union.collect())
-
输出:
[1, 2, 3, 4, 5, 6]
glom 算子 - Transformation
-
功能:将 RDD 的每个分区内容合并为一个列表。
-
语法:
rdd.glom()
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3) rdd_glom = rdd.glom() print(rdd_glom.collect())
-
输出:
[[1, 2], [3, 4], [5, 6]]
mapPartitions 算子 - Transformation
-
功能:对 RDD 的每个分区数据应用指定的函数,返回一个新的 RDD。
-
语法:
rdd.mapPartitions(func)
func
:分区级别的处理函数,接受一个分区的迭代器作为输入。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 2) def multiply_partition(iterator): return [x * 10 for x in iterator] rdd2 = rdd.mapPartitions(multiply_partition) print(rdd2.collect())
-
输出:
[10, 20, 30, 40, 50, 60]
sample - Transformation
- 功能:按比例对 RDD 数据进行采样。
- 示例:
rdd = sc.parallelize([1, 2, 3, 4, 5]) sampled_rdd = rdd.sample(False, 0.6) print(sampled_rdd.collect())
zip - Transformation
- 功能:将两个 RDD 按相同分区的索引位置配对,生成一个新的 RDD。
- 引发 Shuffle:否(要求两个 RDD 分区数和元素数量一致)
- 示例:
rdd1 = sc.parallelize([1, 2, 3]) rdd2 = sc.parallelize(["a", "b", "c"]) print(rdd1.zip(rdd2).collect())
- 输出:
[(1, 'a'), (2, 'b'), (3, 'c')]
coalesce - Transformation
- 功能:减少分区数,用于优化减少分区和减少 shuffle。
- 引发 Shuffle:否(默认为不引发,如果
shuffle=True
则会引发) - 示例:
rdd = sc.parallelize([1, 2, 3, 4, 5], numSlices=5) coalesced_rdd = rdd.coalesce(2) print(coalesced_rdd.getNumPartitions())
- 输出:
2
aggregate - Transformation
-
功能:跨分区聚合,初始值和分区间操作都可自定义。
-
引发 Shuffle:否
-
示例:
rdd = sc.parallelize([1, 2, 3, 4]) result = rdd.aggregate((0, 0), lambda acc, value: (acc[0] + value, acc[1] + 1), lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) print(result)
-
输出:
(10, 4)
treeAggregate - Transformation
- 功能:以树的方式进行聚合以提高性能,适用于大规模数据集。
- 引发 Shuffle:否
- 示例:
rdd = sc.parallelize([1, 2, 3, 4]) result = rdd.treeAggregate(0, lambda acc, x: acc + x, lambda acc1, acc2: acc1 + acc2) print(result)
treeReduce - Transformation
- 功能:以树的方式进行全局聚合以提高性能。
- 引发 Shuffle:否
- 示例:
rdd = sc.parallelize([1, 2, 3, 4]) result = rdd.treeReduce(lambda x, y: x + y) print(result)
RDD Transformation 算子(会引发 Shuffle)
这些算子会导致数据在分区间或节点间进行数据传输和重新分布。
reduceByKey 算子 - Transformation
-
功能:基于键对值进行聚合。
-
语法:
rdd.reduceByKey(func)
func
:接受两个值作为输入并返回一个聚合值。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4)]) rdd2 = rdd.reduceByKey(lambda x, y: x + y) print(rdd2.collect())
-
输出:
[('a', 4), ('b', 6)]
groupByKey 算子 - Transformation
-
功能:将相同键的值聚合为一个集合。
-
语法:
rdd.groupByKey()
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4)]) rdd2 = rdd.groupByKey() print([(key, list(values)) for key, values in rdd2.collect()])
-
输出:
[('a', [1, 3]), ('b', [2, 4])]
sortByKey 算子 - Transformation
-
功能:对键进行排序。
-
语法:
rdd.sortByKey(ascending=True)
ascending
:是否升序,默认为True
。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([('b', 2), ('a', 1), ('c', 3)]) rdd2 = rdd.sortByKey() print(rdd2.collect())
-
输出:
[('a', 1), ('b', 2), ('c', 3)]
distinct 算子 - Transformation
-
功能:对 RDD 进行去重操作。
-
语法:
rdd.distinct()
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([1, 2, 2, 3, 4, 4, 5]) rdd2 = rdd.distinct() print(rdd2.collect())
-
输出:
[1, 2, 3, 4, 5]
join 算子 - Transformation
-
功能:基于键的内连接。
-
语法:
rdd1.join(rdd2)
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd1 = sc.parallelize([('a', 1), ('b', 2)]) rdd2 = sc.parallelize([('a', 3), ('b', 4)]) rdd3 = rdd1.join(rdd2) print(rdd3.collect())
-
输出:
[('a', (1, 3)), ('b', (2, 4))]
leftOuterJoin 和 rightOuterJoin 算子 - Transformation
-
功能:对 RDD 执行左外连接或右外连接。
-
语法:
rdd1.leftOuterJoin(rdd2) rdd1.rightOuterJoin(rdd2)
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd1 = sc.parallelize([('a', 1), ('b', 2)]) rdd2 = sc.parallelize([('a', 3), ('c', 4)]) left_join = rdd1.leftOuterJoin(rdd2) right_join = rdd1.rightOuterJoin(rdd2) print("Left Outer Join:", left_join.collect()) print("Right Outer Join:", right_join.collect())
-
输出:
Left Outer Join: [('a', (1, 3)), ('b', (2, None))] Right Outer Join: [('a', (1, 3)), ('c', (None, 4))]
cogroup 算子 - Transformation
-
功能:对两个 RDD 以键为基础进行分组。
-
语法:
rdd1.cogroup(rdd2)
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd1 = sc.parallelize([('a', 1), ('b', 2)]) rdd2 = sc.parallelize([('a', 3), ('b', 4), ('c', 5)]) rdd3 = rdd1.cogroup(rdd2) print([(key, (list(values1), list(values2))) for key, (values1, values2) in rdd3.collect()])
-
输出:
[('a', ([1], [3])), ('b', ([2], [4])), ('c', ([], [5]))]
repartition 算子 - Transformation
-
功能:重新设置 RDD 的分区数目。
-
语法:
rdd.repartition(numPartitions)
numPartitions
:新的分区数。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([1, 2, 3, 4, 5], 2) print("Before repartition:", rdd.getNumPartitions()) rdd2 = rdd.repartition(4) print("After repartition:", rdd2.getNumPartitions())
-
输出:
Before repartition: 2 After repartition: 4
partitionBy 算子 - Transformation
-
功能:对 RDD 进行分区(仅适用于键值对 RDD)。
-
语法:
rdd.partitionBy(numPartitions, partitionFunc)
numPartitions
:分区数。partitionFunc
:自定义分区函数。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([('a', 1), ('b', 2), ('c', 3)]) rdd2 = rdd.partitionBy(2, lambda x: ord(x[0]) % 2) print(rdd2.glom().collect())
-
输出:
[[('a', 1), ('c', 3)], [('b', 2)]]
groupBy 算子 - Transformation
-
功能:根据指定函数分组。
-
语法:
rdd.groupBy(func)
func
:分组依据的函数。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([1, 2, 3, 4, 5, 6]) rdd2 = rdd.groupBy(lambda x: x % 2) print([(key, list(values)) for key, values in rdd2.collect()])
-
输出:
[(0, [2, 4, 6]), (1, [1, 3, 5])]
cartesian - Transformation
-
功能:返回两个 RDD 的笛卡尔积,形成每对元素的组合。
-
示例:
rdd1 = sc.parallelize([1, 2]) rdd2 = sc.parallelize(["a", "b"]) print(rdd1.cartesian(rdd2).collect())
-
输出:
[(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]
aggregateByKey - Transformation
-
功能:按 key 聚合值,可定义初始值和聚合逻辑,支持在分区内和分区间分别定义不同的聚合规则。
-
示例:
rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4)]) result = rdd.aggregateByKey(0, lambda x, y: x + y, lambda x, y: x + y).collect() print(result)
-
输出:
[('a', 3), ('b', 7)]
combineByKey - Transformation
-
功能:一种更灵活的按 key 聚合方式,可以定义创建组合器、合并和聚合操作。
-
示例:
rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4)]) result = rdd.combineByKey(lambda x: (x, 1), lambda x, y: (x[0] + y, x[1] + 1), lambda x, y: (x[0] + y[0], x[1] + y[1])).collect() print(result)
-
输出:
[('a', (3, 2)), ('b', (7, 2))]
sortBy
sortBy
适合用于非键值对类型的 RDD,或需要自定义排序规则的情况。
rdd = sc.parallelize([5, 3, 1, 4, 2])
sorted_rdd = rdd.sortBy(lambda x: x, ascending=True) # 升序排序
print(sorted_rdd.collect())
输出:
[1, 2, 3, 4, 5]
reduce
如果你的 RDD 不是键值对格式,而是一个简单的数值列表,例如 [1, 2, 3, 4, 5]
,你可以使用 reduce
方法直接对整个 RDD 进行聚合:
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
# 使用 reduce 求和
result = rdd.reduce(lambda a, b: a + b)
print(result) # 输出 15