RDD算子基础
Transformation 算子
Transformation 算子 (不会引发 Shuffle )
Transformation 算子返回新的 RDD
这些算子在原始数据的本地分区中完成,不涉及跨节点的数据传输。
map 算子 - Transformation
-
功能:对 RDD 的每个元素应用指定的函数,并返回一个新的 RDD。
-
语法:
rdd.map(func)
func
:函数,接受 RDD 中的每个元素并返回新的值。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([1,2,3,4,5]) rdd2 = rdd.map(lambda x: x * 10) print(rdd2.collect())
-
输出:
[10, 20, 30, 40, 50]
flatMap 算子 - Transformation
-
功能:对 RDD 执行
map
操作后,再对结果进行解除嵌套操作。 -
解除嵌套:假设有一个嵌套的列表
[[1,2,3],[4,5,6],[7,8,9]]
,flatMap 解除嵌套后得到[1,2,3,4,5,6,7,8,9]
。 -
语法:
rdd.flatMap(func)
func
:函数,接受 RDD 中的每个元素并返回一个可迭代对象。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize(['aa aa aa', 'bb bb bb', 'cc cc cc']) rdd2 = rdd.flatMap(lambda strs: strs.split()) print(rdd2.collect())
-
输出:
['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc']
mapValues 算子 - Transformation
-
功能:对每个
(key, value)
形式的 RDD 元素,应用函数到其value
部分。 -
语法:
rdd.mapValues(func)
func
:函数,作用于二元元组的value
值。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([('a', 1), ('b', 2), ('c', 3)]) rdd2 = rdd.mapValues(lambda x: x * 10) print(rdd2.collect())
-
输出:
[1. **aggregate** - **功能**:跨分区聚合,初始值和分区间操作都可自定义。 - **引发 Shuffle**:否 - **示例**: ```python rdd = sc.parallelize([1, 2, 3, 4]) result = rdd.aggregate((0, 0), lambda acc, value: (acc[0] + value, acc[1] + 1), lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) print(result) # 输出:(10, 4)
treeAggregate - Transformation
- 功能:以树的方式进行聚合以提高性能,适用于大规模数据集。
- 引发 Shuffle:否
- 示例:
rdd = sc.parallelize([1, 2, 3, 4]) result = rdd.treeAggregate(0, lambda acc, x: acc + x, lambda acc1, acc2: acc1 + acc2) print(result)
treeReduce - Transformation
- 功能:以树的方式进行全局聚合以提高性能。
- 引发 Shuffle:否
- 示例:
rdd = sc.parallelize([1, 2, 3, 4]) result = rdd.treeReduce(lambda x, y: x + y) print(result)#[('a', 10), ('b', 20), ('c', 30)]
filter 算子 - Transformation
-
功能:通过指定条件过滤 RDD 元素,仅保留符合条件的元素。
-
语法:
rdd.filter(func)
func
:过滤条件函数,返回True
保留元素,False
则丢弃。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([1,2,3,4,5,6]) rdd2 = rdd.filter(lambda x: x % 2 == 0) print(rdd2.collect())
-
输出:
[2, 4, 6]
union 算子 - Transformation
-
功能:合并两个 RDD,不去重。
-
语法:
rdd.union(other_rdd)
other_rdd
:要合并的另一个 RDD。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd1 = sc.parallelize([1, 2, 3]) rdd2 = sc.parallelize([4, 5, 6]) rdd_union = rdd1.union(rdd2) print(rdd_union.collect())
-
输出:
[1, 2, 3, 4, 5, 6]
glom 算子 - Transformation
-
功能:将 RDD 的每个分区内容合并为一个列表。
-
语法:
rdd.glom()
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3) rdd_glom = rdd.glom() print(rdd_glom.collect())
-
输出:
[[1, 2], [3, 4], [5, 6]]
mapPartitions 算子 - Transformation
-
功能:对 RDD 的每个分区数据应用指定的函数,返回一个新的 RDD。
-
语法:
rdd.mapPartitions(func)
func
:分区级别的处理函数,接受一个分区的迭代器作为输入。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 2) def multiply_partition(iterator): return [x * 10 for x in iterator] rdd2 = rdd.mapPartitions(multiply_partition) print(rdd2.collect())
-
输出:
[10, 20, 30, 40, 50, 60]
sample - Transformation
- 功能:按比例对 RDD 数据进行采样。
- 示例:
rdd = sc.parallelize([1, 2, 3, 4, 5]) sampled_rdd = rdd.sample(False, 0.6) print(sampled_rdd.collect())
zip - Transformation
- 功能:将两个 RDD 按相同分区的索引位置配对,生成一个新的 RDD。
- 引发 Shuffle:否(要求两个 RDD 分区数和元素数量一致)
- 示例:
rdd1 = sc.parallelize([1, 2, 3]) rdd2 = sc.parallelize(["a", "b", "c"]) print(rdd1.zip(rdd2).collect())
- 输出:
[(1, 'a'), (2, 'b'), (3, 'c')]
coalesce - Transformation
- 功能:减少分区数,用于优化减少分区和减少 shuffle。
- 引发 Shuffle:否(默认为不引发,如果
shuffle=True
则会引发) - 示例:
rdd = sc.parallelize([1, 2, 3, 4, 5], numSlices=5) coalesced_rdd = rdd.coalesce(2) print(coalesced_rdd.getNumPartitions())
- 输出:
2
aggregate - Transformation
-
功能:跨分区聚合,初始值和分区间操作都可自定义。
-
引发 Shuffle:否
-
示例:
rdd = sc.parallelize([1, 2, 3, 4]) result = rdd.aggregate((0, 0), lambda acc, value: (acc[0] + value, acc[1] + 1), lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) print(result)
-
输出:
(10, 4)
treeAggregate - Transformation
- 功能:以树的方式进行聚合以提高性能,适用于大规模数据集。
- 引发 Shuffle:否
- 示例:
rdd = sc.parallelize([1, 2, 3, 4]) result = rdd.treeAggregate(0, lambda acc, x: acc + x, lambda acc1, acc2: acc1 + acc2) print(result)
treeReduce - Transformation
- 功能:以树的方式进行全局聚合以提高性能。
- 引发 Shuffle:否
- 示例:
rdd = sc.parallelize([1, 2, 3, 4]) result = rdd.treeReduce(lambda x, y: x + y) print(result)
RDD Transformation 算子(会引发 Shuffle)
这些算子会导致数据在分区间或节点间进行数据传输和重新分布。
reduceByKey 算子 - Transformation
-
功能:基于键对值进行聚合。
-
语法:
rdd.reduceByKey(func)
func
:接受两个值作为输入并返回一个聚合值。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4)]) rdd2 = rdd.reduceByKey(lambda x, y: x + y) print(rdd2.collect())
-
输出:
[('a', 4), ('b', 6)]
groupByKey 算子 - Transformation
-
功能:将相同键的值聚合为一个集合。
-
语法:
rdd.groupByKey()
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4)]) rdd2 = rdd.groupByKey() print([(key, list(values)) for key, values in rdd2.collect()])
-
输出:
[('a', [1, 3]), ('b', [2, 4])]
sortByKey 算子 - Transformation
-
功能:对键进行排序。
-
语法:
rdd.sortByKey(ascending=True)
ascending
:是否升序,默认为True
。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([('b', 2), ('a', 1), ('c', 3)]) rdd2 = rdd.sortByKey() print(rdd2.collect())
-
输出:
[('a', 1), ('b', 2), ('c', 3)]
distinct 算子 - Transformation
-
功能:对 RDD 进行去重操作。
-
语法:
rdd.distinct()
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([1, 2, 2, 3, 4, 4, 5]) rdd2 = rdd.distinct() print(rdd2.collect())
-
输出:
[1, 2, 3, 4, 5]
join 算子 - Transformation
-
功能:基于键的内连接。
-
语法:
rdd1.join(rdd2)
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd1 = sc.parallelize([('a', 1), ('b', 2)]) rdd2 = sc.parallelize([('a', 3), ('b', 4)]) rdd3 = rdd1.join(rdd2) print(rdd3.collect())
-
输出:
[('a', (1, 3)), ('b', (2, 4))]
leftOuterJoin 和 rightOuterJoin 算子 - Transformation
-
功能:对 RDD 执行左外连接或右外连接。
-
语法:
rdd1.leftOuterJoin(rdd2) rdd1.rightOuterJoin(rdd2)
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd1 = sc.parallelize([('a', 1), ('b', 2)]) rdd2 = sc.parallelize([('a', 3), ('c', 4)]) left_join = rdd1.leftOuterJoin(rdd2) right_join = rdd1.rightOuterJoin(rdd2) print("Left Outer Join:", left_join.collect()) print("Right Outer Join:", right_join.collect())
-
输出:
Left Outer Join: [('a', (1, 3)), ('b', (2, None))] Right Outer Join: [('a', (1, 3)), ('c', (None, 4))]
cogroup 算子 - Transformation
-
功能:对两个 RDD 以键为基础进行分组。
-
语法:
rdd1.cogroup(rdd2)
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd1 = sc.parallelize([('a', 1), ('b', 2)]) rdd2 = sc.parallelize([('a', 3), ('b', 4), ('c', 5)]) rdd3 = rdd1.cogroup(rdd2) print([(key, (list(values1), list(values2))) for key, (values1, values2) in rdd3.collect()])
-
输出:
[('a', ([1], [3])), ('b', ([2], [4])), ('c', ([], [5]))]
repartition 算子 - Transformation
-
功能:重新设置 RDD 的分区数目。
-
语法:
rdd.repartition(numPartitions)
numPartitions
:新的分区数。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([1, 2, 3, 4, 5], 2) print("Before repartition:", rdd.getNumPartitions()) rdd2 = rdd.repartition(4) print("After repartition:", rdd2.getNumPartitions())
-
输出:
Before repartition: 2 After repartition: 4
partitionBy 算子 - Transformation
-
功能:对 RDD 进行分区(仅适用于键值对 RDD)。
-
语法:
rdd.partitionBy(numPartitions, partitionFunc)
numPartitions
:分区数。partitionFunc
:自定义分区函数。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([('a', 1), ('b', 2), ('c', 3)]) rdd2 = rdd.partitionBy(2, lambda x: ord(x[0]) % 2) print(rdd2.glom().collect())
-
输出:
[[('a', 1), ('c', 3)], [('b', 2)]]
groupBy 算子 - Transformation
-
功能:根据指定函数分组。
-
语法:
rdd.groupBy(func)
func
:分组依据的函数。
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([1, 2, 3, 4, 5, 6]) rdd2 = rdd.groupBy(lambda x: x % 2) print([(key, list(values)) for key, values in rdd2.collect()])
-
输出:
[(0, [2, 4, 6]), (1, [1, 3, 5])]
cartesian - Transformation
-
功能:返回两个 RDD 的笛卡尔积,形成每对元素的组合。
-
示例:
rdd1 = sc.parallelize([1, 2]) rdd2 = sc.parallelize(["a", "b"]) print(rdd1.cartesian(rdd2).collect())
-
输出:
[(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]
aggregateByKey - Transformation
-
功能:按 key 聚合值,可定义初始值和聚合逻辑,支持在分区内和分区间分别定义不同的聚合规则。
-
示例:
rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4)]) result = rdd.aggregateByKey(0, lambda x, y: x + y, lambda x, y: x + y).collect() print(result)
-
输出:
[('a', 3), ('b', 7)]
combineByKey - Transformation
-
功能:一种更灵活的按 key 聚合方式,可以定义创建组合器、合并和聚合操作。
-
示例:
rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4)]) result = rdd.combineByKey(lambda x: (x, 1), lambda x, y: (x[0] + y, x[1] + 1), lambda x, y: (x[0] + y[0], x[1] + y[1])).collect() print(result)
-
输出:
[('a', (3, 2)), ('b', (7, 2))]
RDD Action 算子
- Action 算子触发整个 DAG(有向无环图)的执行,将数据拉取到 Driver 或输出到文件等。
collect 算子 - Action
-
功能:将 RDD 的所有数据收集到 Driver,形成一个 List 对象。
-
语法:
rdd.collect()
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([1, 2, 3, 4]) print(rdd.collect())
-
输出:
[1, 2, 3, 4]
count 算子 - Action
-
功能:计算 RDD 的元素条目数。
-
语法:
rdd.count()
-
示例代码:
rdd = sc.parallelize([1, 2, 3, 4]) print(rdd.count())
-
输出:
4
first 算子 - Action
-
功能:获取 RDD 的第一个元素。
-
语法:
rdd.first()
-
示例代码:
rdd = sc.parallelize([1, 2, 3, 4]) print(rdd.first())
-
输出:
1
take 算子 - Action
-
功能:获取 RDD 的前 N 个元素。
-
语法:
rdd.take(n)
n
:获取的元素数量。
-
示例代码:
rdd = sc.parallelize([1, 2, 3, 4]) print(rdd.take(2))
-
输出:
[1, 2]
top 算子 - Action
-
功能:按降序获取 RDD 中的前 N 个最大值。
-
语法:
rdd.top(n)
n
:获取的最大值数量。
-
示例代码:
rdd = sc.parallelize([4, 1, 3, 2]) print(rdd.top(2))
-
输出:
[4, 3]
reduce 算子 - Action
-
功能:对 RDD 数据进行全局聚合。
-
语法:
rdd.reduce(func)
func
:接受两个输入并返回一个聚合值的函数。
-
示例代码:
rdd = sc.parallelize([1, 2, 3, 4]) print(rdd.reduce(lambda x, y: x + y))
-
输出:
10
countByKey 算子 - Action
-
功能:统计每个键的数量(适用于 KV 型 RDD)。
-
语法:
rdd.countByKey()
-
示例代码:
rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)]) print(rdd.countByKey())
-
输出:
{'a': 2, 'b': 1}
takeOrdered 算子 - Action
-
功能:按排序规则获取前 N 个元素。
-
语法:
rdd.takeOrdered(n, key=func)
n
:获取的元素数量。key
:排序规则函数。
-
示例代码:
rdd = sc.parallelize([4, 1, 3, 2]) print(rdd.takeOrdered(3, key=lambda x: -x))
-
输出:
[4, 3, 2]
foreach 算子 - Action
-
功能:对 RDD 的每个元素执行指定操作,但没有返回值。
-
语法:
rdd.foreach(func)
func
:执行操作的函数。
-
示例代码:
rdd = sc.parallelize([1, 2, 3, 4]) rdd.foreach(lambda x: print(x))
saveAsTextFile 算子 - Action
-
功能:将 RDD 数据保存到文件系统中。
-
语法:
rdd.saveAsTextFile(path)
path
:文件保存路径。
-
示例代码:
rdd = sc.parallelize([1, 2, 3, 4]) rdd.saveAsTextFile("/path/to/output")
takeSample 算子 - Action
-
功能:随机采样获取 RDD 数据。
-
语法:
rdd.takeSample(withReplacement, num, seed)
withReplacement
:是否允许重复采样。num
:采样数。seed
:随机数种子。
-
示例代码:
rdd = sc.parallelize([1, 2, 3, 4]) print(rdd.takeSample(False, 2))
fold 算子 - Action
-
功能:带初始值的全局聚合。
-
语法:
rdd.fold(zeroValue, func)
zeroValue
:初始值。func
:聚合函数。
-
示例代码:
rdd = sc.parallelize([1, 2, 3, 4]) print(rdd.fold(0, lambda x, y: x + y))
-
输出:
10
foreachPartition 算子 - Action
-
功能:对每个分区的数据执行指定操作,无返回值。
-
语法:
rdd.foreachPartition(func)
func
:对分区数据执行操作的函数。
-
示例代码:
def process_partition(partition): for x in partition: print(x) rdd = sc.parallelize([1, 2, 3, 4], 2) rdd.foreachPartition(process_partition)
saveAsSequenceFile - Action
- 功能:将 RDD 的内容保存为 Hadoop SequenceFile 格式。
- 引发 Shuffle:否
- 示例:
rdd = sc.parallelize([("a", 1), ("b", 2)]) rdd.saveAsSequenceFile("/path/to/sequence_file")
saveAsObjectFile - Action
- 功能:将 RDD 的内容保存为 Spark 原生的对象文件格式,方便读取。
- 引发 Shuffle:否
- 示例:
rdd = sc.parallelize([("a", 1), ("b", 2)]) rdd.saveAsObjectFile("/path/to/object_file")