RDD Action 算子
- Action 算子触发整个 DAG(有向无环图)的执行,将数据拉取到 Driver 或输出到文件等。
collect 算子 - Action
-
功能:将 RDD 的所有数据收集到 Driver,形成一个 List 对象。
-
语法:
rdd.collect()
-
示例代码:
from pyspark import SparkConf, SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1") sc = SparkContext(conf=conf) rdd = sc.parallelize([1, 2, 3, 4]) print(rdd.collect())
-
输出:
[1, 2, 3, 4]
count 算子 - Action
-
功能:计算 RDD 的元素条目数。
-
语法:
rdd.count()
-
示例代码:
rdd = sc.parallelize([1, 2, 3, 4]) print(rdd.count())
-
输出:
4
first 算子 - Action
-
功能:获取 RDD 的第一个元素。
-
语法:
rdd.first()
-
示例代码:
rdd = sc.parallelize([1, 2, 3, 4]) print(rdd.first())
-
输出:
1
take 算子 - Action
-
功能:获取 RDD 的前 N 个元素。
-
语法:
rdd.take(n)
n
:获取的元素数量。
-
示例代码:
rdd = sc.parallelize([1, 2, 3, 4]) print(rdd.take(2))
-
输出:
[1, 2]
top 算子 - Action
-
功能:按降序获取 RDD 中的前 N 个最大值。
-
语法:
rdd.top(n)
n
:获取的最大值数量。
-
示例代码:
rdd = sc.parallelize([4, 1, 3, 2]) print(rdd.top(2))
-
输出:
[4, 3]
reduce 算子 - Action
-
功能:对 RDD 数据进行全局聚合。
-
语法:
rdd.reduce(func)
func
:接受两个输入并返回一个聚合值的函数。
-
示例代码:
rdd = sc.parallelize([1, 2, 3, 4]) print(rdd.reduce(lambda x, y: x + y))
-
输出:
10
countByKey 算子 - Action
-
功能:统计每个键的数量(适用于 KV 型 RDD)。
-
语法:
rdd.countByKey()
-
示例代码:
rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)]) print(rdd.countByKey())
-
输出:
{'a': 2, 'b': 1}
takeOrdered 算子 - Action
-
功能:按排序规则获取前 N 个元素。
-
语法:
rdd.takeOrdered(n, key=func)
n
:获取的元素数量。key
:排序规则函数。
-
示例代码:
rdd = sc.parallelize([4, 1, 3, 2]) print(rdd.takeOrdered(3, key=lambda x: -x))
-
输出:
[4, 3, 2]
foreach 算子 - Action
-
功能:对 RDD 的每个元素执行指定操作,但没有返回值。
-
语法:
rdd.foreach(func)
func
:执行操作的函数。
-
示例代码:
rdd = sc.parallelize([1, 2, 3, 4]) rdd.foreach(lambda x: print(x))
saveAsTextFile 算子 - Action
-
功能:将 RDD 数据保存到文件系统中。
-
语法:
rdd.saveAsTextFile(path)
path
:文件保存路径。
-
示例代码:
rdd = sc.parallelize([1, 2, 3, 4]) rdd.saveAsTextFile("/path/to/output")
takeSample 算子 - Action
-
功能:随机采样获取 RDD 数据。
-
语法:
rdd.takeSample(withReplacement, num, seed)
withReplacement
:是否允许重复采样。num
:采样数。seed
:随机数种子。
-
示例代码:
rdd = sc.parallelize([1, 2, 3, 4]) print(rdd.takeSample(False, 2))
fold 算子 - Action
-
功能:带初始值的全局聚合。
-
语法:
rdd.fold(zeroValue, func)
zeroValue
:初始值。func
:聚合函数。
-
示例代码:
rdd = sc.parallelize([1, 2, 3, 4]) print(rdd.fold(0, lambda x, y: x + y))
-
输出:
10
foreachPartition 算子 - Action
-
功能:对每个分区的数据执行指定操作,无返回值。
-
语法:
rdd.foreachPartition(func)
func
:对分区数据执行操作的函数。
-
示例代码:
def process_partition(partition): for x in partition: print(x) rdd = sc.parallelize([1, 2, 3, 4], 2) rdd.foreachPartition(process_partition)
saveAsSequenceFile - Action
- 功能:将 RDD 的内容保存为 Hadoop SequenceFile 格式。
- 引发 Shuffle:否
- 示例:
rdd = sc.parallelize([("a", 1), ("b", 2)]) rdd.saveAsSequenceFile("/path/to/sequence_file")
saveAsObjectFile - Action
- 功能:将 RDD 的内容保存为 Spark 原生的对象文件格式,方便读取。
- 引发 Shuffle:否
- 示例:
rdd = sc.parallelize([("a", 1), ("b", 2)]) rdd.saveAsObjectFile("/path/to/object_file")