菜单
本页目录

RDD Action 算子

  • Action 算子触发整个 DAG(有向无环图)的执行,将数据拉取到 Driver 或输出到文件等。

collect 算子 - Action

  • 功能:将 RDD 的所有数据收集到 Driver,形成一个 List 对象。

  • 语法

    rdd.collect()
    
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd = sc.parallelize([1, 2, 3, 4])
        print(rdd.collect())
    
  • 输出

    [1, 2, 3, 4]
    

count 算子 - Action

  • 功能:计算 RDD 的元素条目数。

  • 语法

    rdd.count()
    
  • 示例代码

    rdd = sc.parallelize([1, 2, 3, 4])
    print(rdd.count())
    
  • 输出

    4
    

first 算子 - Action

  • 功能:获取 RDD 的第一个元素。

  • 语法

    rdd.first()
    
  • 示例代码

    rdd = sc.parallelize([1, 2, 3, 4])
    print(rdd.first())
    
  • 输出

    1
    

take 算子 - Action

  • 功能:获取 RDD 的前 N 个元素。

  • 语法

    rdd.take(n)
    
    • n:获取的元素数量。
  • 示例代码

    rdd = sc.parallelize([1, 2, 3, 4])
    print(rdd.take(2))
    
  • 输出

    [1, 2]
    

top 算子 - Action

  • 功能:按降序获取 RDD 中的前 N 个最大值。

  • 语法

    rdd.top(n)
    
    • n:获取的最大值数量。
  • 示例代码

    rdd = sc.parallelize([4, 1, 3, 2])
    print(rdd.top(2))
    
  • 输出

    [4, 3]
    

reduce 算子 - Action

  • 功能:对 RDD 数据进行全局聚合。

  • 语法

    rdd.reduce(func)
    
    • func:接受两个输入并返回一个聚合值的函数。
  • 示例代码

    rdd = sc.parallelize([1, 2, 3, 4])
    print(rdd.reduce(lambda x, y: x + y))
    
  • 输出

    10
    

countByKey 算子 - Action

  • 功能:统计每个键的数量(适用于 KV 型 RDD)。

  • 语法

    rdd.countByKey()
    
  • 示例代码

    rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])
    print(rdd.countByKey())
    
  • 输出

    {'a': 2, 'b': 1}
    

takeOrdered 算子 - Action

  • 功能:按排序规则获取前 N 个元素。

  • 语法

    rdd.takeOrdered(n, key=func)
    
    • n:获取的元素数量。
    • key:排序规则函数。
  • 示例代码

    rdd = sc.parallelize([4, 1, 3, 2])
    print(rdd.takeOrdered(3, key=lambda x: -x))
    
  • 输出

    [4, 3, 2]
    

foreach 算子 - Action

  • 功能:对 RDD 的每个元素执行指定操作,但没有返回值。

  • 语法

    rdd.foreach(func)
    
    • func:执行操作的函数。
  • 示例代码

    rdd = sc.parallelize([1, 2, 3, 4])
    rdd.foreach(lambda x: print(x))
    

saveAsTextFile 算子 - Action

  • 功能:将 RDD 数据保存到文件系统中。

  • 语法

    rdd.saveAsTextFile(path)
    
    • path:文件保存路径。
  • 示例代码

    rdd = sc.parallelize([1, 2, 3, 4])
    rdd.saveAsTextFile("/path/to/output")
    

takeSample 算子 - Action

  • 功能:随机采样获取 RDD 数据。

  • 语法

    rdd.takeSample(withReplacement, num, seed)
    
    • withReplacement:是否允许重复采样。
    • num:采样数。
    • seed:随机数种子。
  • 示例代码

    rdd = sc.parallelize([1, 2, 3, 4])
    print(rdd.takeSample(False, 2))
    

fold 算子 - Action

  • 功能:带初始值的全局聚合。

  • 语法

    rdd.fold(zeroValue, func)
    
    • zeroValue:初始值。
    • func:聚合函数。
  • 示例代码

    rdd = sc.parallelize([1, 2, 3, 4])
    print(rdd.fold(0, lambda x, y: x + y))
    
  • 输出

    10
    

foreachPartition 算子 - Action

  • 功能:对每个分区的数据执行指定操作,无返回值。

  • 语法

    rdd.foreachPartition(func)
    
    • func:对分区数据执行操作的函数。
  • 示例代码

    def process_partition(partition):
        for x in partition:
            print(x)
    
    rdd = sc.parallelize([1, 2, 3, 4], 2)
    rdd.foreachPartition(process_partition)
    

saveAsSequenceFile - Action

  • 功能:将 RDD 的内容保存为 Hadoop SequenceFile 格式。
  • 引发 Shuffle:否
  • 示例
    rdd = sc.parallelize([("a", 1), ("b", 2)])
    rdd.saveAsSequenceFile("/path/to/sequence_file")
    

saveAsObjectFile - Action

  • 功能:将 RDD 的内容保存为 Spark 原生的对象文件格式,方便读取。
  • 引发 Shuffle:否
  • 示例
    rdd = sc.parallelize([("a", 1), ("b", 2)])
    rdd.saveAsObjectFile("/path/to/object_file")