Transformation 算子

Transformation 算子 (不会引发 Shuffle )

Transformation 算子返回新的 RDD

这些算子在原始数据的本地分区中完成,不涉及跨节点的数据传输。

map 算子 - Transformation

  • 功能:对 RDD 的每个元素应用指定的函数,并返回一个新的 RDD。

  • 语法

    rdd.map(func)
    
    • func:函数,接受 RDD 中的每个元素并返回新的值。
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
    
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd = sc.parallelize([1,2,3,4,5])
        rdd2 = rdd.map(lambda x: x * 10)
        
        print(rdd2.collect())
    
  • 输出

    [10, 20, 30, 40, 50]
    

flatMap 算子 - Transformation

  • 功能:对 RDD 执行 map 操作后,再对结果进行解除嵌套操作。

  • 解除嵌套:假设有一个嵌套的列表 [[1,2,3],[4,5,6],[7,8,9]],flatMap 解除嵌套后得到 [1,2,3,4,5,6,7,8,9]

  • 语法

    rdd.flatMap(func)
    
    • func:函数,接受 RDD 中的每个元素并返回一个可迭代对象。
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
    
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd = sc.parallelize(['aa aa aa', 'bb bb bb', 'cc cc cc'])
        rdd2 = rdd.flatMap(lambda strs: strs.split())
    
        print(rdd2.collect())
    
  • 输出

    ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc']
    

mapValues 算子 - Transformation

  • 功能:对每个 (key, value) 形式的 RDD 元素,应用函数到其 value 部分。

  • 语法

    rdd.mapValues(func)
    
    • func:函数,作用于二元元组的 value 值。
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
    
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd = sc.parallelize([('a', 1), ('b', 2), ('c', 3)])
        rdd2 = rdd.mapValues(lambda x: x * 10)
    
        print(rdd2.collect())
    
  • 输出

    [1. **aggregate**
     - **功能**:跨分区聚合,初始值和分区间操作都可自定义。
     - **引发 Shuffle**:否
     - **示例**:
       ```python
       rdd = sc.parallelize([1, 2, 3, 4])
       result = rdd.aggregate((0, 0),
                              lambda acc, value: (acc[0] + value, acc[1] + 1),
                              lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))
       print(result)
       # 输出:(10, 4)
    

treeAggregate - Transformation

  • 功能:以树的方式进行聚合以提高性能,适用于大规模数据集。
  • 引发 Shuffle:否
  • 示例
    rdd = sc.parallelize([1, 2, 3, 4])
    result = rdd.treeAggregate(0, lambda acc, x: acc + x, lambda acc1, acc2: acc1 + acc2)
    print(result)
    

treeReduce - Transformation

  • 功能:以树的方式进行全局聚合以提高性能。
  • 引发 Shuffle:否
  • 示例
    rdd = sc.parallelize([1, 2, 3, 4])
    result = rdd.treeReduce(lambda x, y: x + y)
    print(result)#[('a', 10), ('b', 20), ('c', 30)]
    

filter 算子 - Transformation

  • 功能:通过指定条件过滤 RDD 元素,仅保留符合条件的元素。

  • 语法

    rdd.filter(func)
    
    • func:过滤条件函数,返回 True 保留元素,False 则丢弃。
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
    
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd = sc.parallelize([1,2,3,4,5,6])
        rdd2 = rdd.filter(lambda x: x % 2 == 0)
    
        print(rdd2.collect())
    
  • 输出

    [2, 4, 6]
    

union 算子 - Transformation

  • 功能:合并两个 RDD,不去重。

  • 语法

    rdd.union(other_rdd)
    
    • other_rdd:要合并的另一个 RDD。
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
    
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd1 = sc.parallelize([1, 2, 3])
        rdd2 = sc.parallelize([4, 5, 6])
        rdd_union = rdd1.union(rdd2)
    
        print(rdd_union.collect())
    
  • 输出

    [1, 2, 3, 4, 5, 6]
    

glom 算子 - Transformation

  • 功能:将 RDD 的每个分区内容合并为一个列表。

  • 语法

    rdd.glom()
    
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
    
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
        rdd_glom = rdd.glom()
    
        print(rdd_glom.collect())
    
  • 输出

    [[1, 2], [3, 4], [5, 6]]
    

mapPartitions 算子 - Transformation

  • 功能:对 RDD 的每个分区数据应用指定的函数,返回一个新的 RDD。

  • 语法

    rdd.mapPartitions(func)
    
    • func:分区级别的处理函数,接受一个分区的迭代器作为输入。
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
    
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 2)
        
        def multiply_partition(iterator):
            return [x * 10 for x in iterator]
        
        rdd2 = rdd.mapPartitions(multiply_partition)
    
        print(rdd2.collect())
    
  • 输出

    [10, 20, 30, 40, 50, 60]
    

sample - Transformation

  • 功能:按比例对 RDD 数据进行采样。
  • 示例
    rdd = sc.parallelize([1, 2, 3, 4, 5])
    sampled_rdd = rdd.sample(False, 0.6)
    print(sampled_rdd.collect())
    

zip - Transformation

  • 功能:将两个 RDD 按相同分区的索引位置配对,生成一个新的 RDD。
  • 引发 Shuffle:否(要求两个 RDD 分区数和元素数量一致)
  • 示例
    rdd1 = sc.parallelize([1, 2, 3])
    rdd2 = sc.parallelize(["a", "b", "c"])
    print(rdd1.zip(rdd2).collect())
    
  • 输出
    [(1, 'a'), (2, 'b'), (3, 'c')]
    

coalesce - Transformation

  • 功能:减少分区数,用于优化减少分区和减少 shuffle。
  • 引发 Shuffle:否(默认为不引发,如果 shuffle=True 则会引发)
  • 示例
    rdd = sc.parallelize([1, 2, 3, 4, 5], numSlices=5)
    coalesced_rdd = rdd.coalesce(2)
    print(coalesced_rdd.getNumPartitions())
    
  • 输出
    2
    

aggregate - Transformation

  • 功能:跨分区聚合,初始值和分区间操作都可自定义。

  • 引发 Shuffle:否

  • 示例

    rdd = sc.parallelize([1, 2, 3, 4])
    result = rdd.aggregate((0, 0),
                           lambda acc, value: (acc[0] + value, acc[1] + 1),
                           lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))
    print(result)
    
  • 输出

    (10, 4)
    

treeAggregate - Transformation

  • 功能:以树的方式进行聚合以提高性能,适用于大规模数据集。
  • 引发 Shuffle:否
  • 示例
    rdd = sc.parallelize([1, 2, 3, 4])
    result = rdd.treeAggregate(0, lambda acc, x: acc + x, lambda acc1, acc2: acc1 + acc2)
    print(result)
    

treeReduce - Transformation

  • 功能:以树的方式进行全局聚合以提高性能。
  • 引发 Shuffle:否
  • 示例
    rdd = sc.parallelize([1, 2, 3, 4])
    result = rdd.treeReduce(lambda x, y: x + y)
    print(result)
    

RDD Transformation 算子(会引发 Shuffle)

这些算子会导致数据在分区间或节点间进行数据传输和重新分布。

reduceByKey 算子 - Transformation

  • 功能:基于键对值进行聚合。

  • 语法

    rdd.reduceByKey(func)
    
    • func:接受两个值作为输入并返回一个聚合值。
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
    
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4)])
        rdd2 = rdd.reduceByKey(lambda x, y: x + y)
    
        print(rdd2.collect())
    
  • 输出

    [('a', 4), ('b', 6)]
    

groupByKey 算子 - Transformation

  • 功能:将相同键的值聚合为一个集合。

  • 语法

    rdd.groupByKey()
    
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
    
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4)])
        rdd2 = rdd.groupByKey()
    
        print([(key, list(values)) for key, values in rdd2.collect()])
    
  • 输出

    [('a', [1, 3]), ('b', [2, 4])]
    

sortByKey 算子 - Transformation

  • 功能:对键进行排序。

  • 语法

    rdd.sortByKey(ascending=True)
    
    • ascending:是否升序,默认为 True
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
    
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd = sc.parallelize([('b', 2), ('a', 1), ('c', 3)])
        rdd2 = rdd.sortByKey()
    
        print(rdd2.collect())
    
  • 输出

    [('a', 1), ('b', 2), ('c', 3)]
    

distinct 算子 - Transformation

  • 功能:对 RDD 进行去重操作。

  • 语法

    rdd.distinct()
    
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
    
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd = sc.parallelize([1, 2, 2, 3, 4, 4, 5])
        rdd2 = rdd.distinct()
    
        print(rdd2.collect())
    
  • 输出

    [1, 2, 3, 4, 5]
    

join 算子 - Transformation

  • 功能:基于键的内连接。

  • 语法

    rdd1.join(rdd2)
    
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
    
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd1 = sc.parallelize([('a', 1), ('b', 2)])
        rdd2 = sc.parallelize([('a', 3), ('b', 4)])
        rdd3 = rdd1.join(rdd2)
    
        print(rdd3.collect())
    
  • 输出

    [('a', (1, 3)), ('b', (2, 4))]
    

leftOuterJoin 和 rightOuterJoin 算子 - Transformation

  • 功能:对 RDD 执行左外连接或右外连接。

  • 语法

    rdd1.leftOuterJoin(rdd2)
    rdd1.rightOuterJoin(rdd2)
    
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
    
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd1 = sc.parallelize([('a', 1), ('b', 2)])
        rdd2 = sc.parallelize([('a', 3), ('c', 4)])
        
        left_join = rdd1.leftOuterJoin(rdd2)
        right_join = rdd1.rightOuterJoin(rdd2)
    
        print("Left Outer Join:", left_join.collect())
        print("Right Outer Join:", right_join.collect())
    
  • 输出

    Left Outer Join: [('a', (1, 3)), ('b', (2, None))]
    Right Outer Join: [('a', (1, 3)), ('c', (None, 4))]
    

cogroup 算子 - Transformation

  • 功能:对两个 RDD 以键为基础进行分组。

  • 语法

    rdd1.cogroup(rdd2)
    
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
    
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd1 = sc.parallelize([('a', 1), ('b', 2)])
        rdd2 = sc.parallelize([('a', 3), ('b', 4), ('c', 5)])
        rdd3 = rdd1.cogroup(rdd2)
    
        print([(key, (list(values1), list(values2))) for key, (values1, values2) in rdd3.collect()])
    
  • 输出

    [('a', ([1], [3])), ('b', ([2], [4])), ('c', ([], [5]))]
    

repartition 算子 - Transformation

  • 功能:重新设置 RDD 的分区数目。

  • 语法

    rdd.repartition(numPartitions)
    
    • numPartitions:新的分区数。
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
    
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd = sc.parallelize([1, 2, 3, 4, 5], 2)
        print("Before repartition:", rdd.getNumPartitions())
    
        rdd2 = rdd.repartition(4)
        print("After repartition:", rdd2.getNumPartitions())
    
  • 输出

    Before repartition: 2
    After repartition: 4
    

partitionBy 算子 - Transformation

  • 功能:对 RDD 进行分区(仅适用于键值对 RDD)。

  • 语法

    rdd.partitionBy(numPartitions, partitionFunc)
    
    • numPartitions:分区数。
    • partitionFunc:自定义分区函数。
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
    
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd = sc.parallelize([('a', 1), ('b', 2), ('c', 3)])
        rdd2 = rdd.partitionBy(2, lambda x: ord(x[0]) % 2)
    
        print(rdd2.glom().collect())
    
  • 输出

    
    
    [[('a', 1), ('c', 3)], [('b', 2)]]
    

groupBy 算子 - Transformation

  • 功能:根据指定函数分组。

  • 语法

    rdd.groupBy(func)
    
    • func:分组依据的函数。
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
    
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
        rdd2 = rdd.groupBy(lambda x: x % 2)
    
        print([(key, list(values)) for key, values in rdd2.collect()])
    
  • 输出

    [(0, [2, 4, 6]), (1, [1, 3, 5])]
    

cartesian - Transformation

  • 功能:返回两个 RDD 的笛卡尔积,形成每对元素的组合。

  • 示例

    rdd1 = sc.parallelize([1, 2])
    rdd2 = sc.parallelize(["a", "b"])
    print(rdd1.cartesian(rdd2).collect())
    
  • 输出

    [(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]
    

aggregateByKey - Transformation

  • 功能:按 key 聚合值,可定义初始值和聚合逻辑,支持在分区内和分区间分别定义不同的聚合规则。

  • 示例

    rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4)])
    result = rdd.aggregateByKey(0, lambda x, y: x + y, lambda x, y: x + y).collect()
    print(result)
    
  • 输出

    [('a', 3), ('b', 7)]
    

combineByKey - Transformation

  • 功能:一种更灵活的按 key 聚合方式,可以定义创建组合器、合并和聚合操作。

  • 示例

    rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4)])
    result = rdd.combineByKey(lambda x: (x, 1),
                              lambda x, y: (x[0] + y, x[1] + 1),
                              lambda x, y: (x[0] + y[0], x[1] + y[1])).collect()
    print(result)
    
  • 输出

    [('a', (3, 2)), ('b', (7, 2))]
    

RDD Action 算子

  • Action 算子触发整个 DAG(有向无环图)的执行,将数据拉取到 Driver 或输出到文件等。

collect 算子 - Action

  • 功能:将 RDD 的所有数据收集到 Driver,形成一个 List 对象。

  • 语法

    rdd.collect()
    
  • 示例代码

    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
        conf = SparkConf().setAppName("test").set("spark.driver.bindAddress", "127.0.0.1")
        sc = SparkContext(conf=conf)
    
        rdd = sc.parallelize([1, 2, 3, 4])
        print(rdd.collect())
    
  • 输出

    [1, 2, 3, 4]
    

count 算子 - Action

  • 功能:计算 RDD 的元素条目数。

  • 语法

    rdd.count()
    
  • 示例代码

    rdd = sc.parallelize([1, 2, 3, 4])
    print(rdd.count())
    
  • 输出

    4
    

first 算子 - Action

  • 功能:获取 RDD 的第一个元素。

  • 语法

    rdd.first()
    
  • 示例代码

    rdd = sc.parallelize([1, 2, 3, 4])
    print(rdd.first())
    
  • 输出

    1
    

take 算子 - Action

  • 功能:获取 RDD 的前 N 个元素。

  • 语法

    rdd.take(n)
    
    • n:获取的元素数量。
  • 示例代码

    rdd = sc.parallelize([1, 2, 3, 4])
    print(rdd.take(2))
    
  • 输出

    [1, 2]
    

top 算子 - Action

  • 功能:按降序获取 RDD 中的前 N 个最大值。

  • 语法

    rdd.top(n)
    
    • n:获取的最大值数量。
  • 示例代码

    rdd = sc.parallelize([4, 1, 3, 2])
    print(rdd.top(2))
    
  • 输出

    [4, 3]
    

reduce 算子 - Action

  • 功能:对 RDD 数据进行全局聚合。

  • 语法

    rdd.reduce(func)
    
    • func:接受两个输入并返回一个聚合值的函数。
  • 示例代码

    rdd = sc.parallelize([1, 2, 3, 4])
    print(rdd.reduce(lambda x, y: x + y))
    
  • 输出

    10
    

countByKey 算子 - Action

  • 功能:统计每个键的数量(适用于 KV 型 RDD)。

  • 语法

    rdd.countByKey()
    
  • 示例代码

    rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])
    print(rdd.countByKey())
    
  • 输出

    {'a': 2, 'b': 1}
    

takeOrdered 算子 - Action

  • 功能:按排序规则获取前 N 个元素。

  • 语法

    rdd.takeOrdered(n, key=func)
    
    • n:获取的元素数量。
    • key:排序规则函数。
  • 示例代码

    rdd = sc.parallelize([4, 1, 3, 2])
    print(rdd.takeOrdered(3, key=lambda x: -x))
    
  • 输出

    [4, 3, 2]
    

foreach 算子 - Action

  • 功能:对 RDD 的每个元素执行指定操作,但没有返回值。

  • 语法

    rdd.foreach(func)
    
    • func:执行操作的函数。
  • 示例代码

    rdd = sc.parallelize([1, 2, 3, 4])
    rdd.foreach(lambda x: print(x))
    

saveAsTextFile 算子 - Action

  • 功能:将 RDD 数据保存到文件系统中。

  • 语法

    rdd.saveAsTextFile(path)
    
    • path:文件保存路径。
  • 示例代码

    rdd = sc.parallelize([1, 2, 3, 4])
    rdd.saveAsTextFile("/path/to/output")
    

takeSample 算子 - Action

  • 功能:随机采样获取 RDD 数据。

  • 语法

    rdd.takeSample(withReplacement, num, seed)
    
    • withReplacement:是否允许重复采样。
    • num:采样数。
    • seed:随机数种子。
  • 示例代码

    rdd = sc.parallelize([1, 2, 3, 4])
    print(rdd.takeSample(False, 2))
    

fold 算子 - Action

  • 功能:带初始值的全局聚合。

  • 语法

    rdd.fold(zeroValue, func)
    
    • zeroValue:初始值。
    • func:聚合函数。
  • 示例代码

    rdd = sc.parallelize([1, 2, 3, 4])
    print(rdd.fold(0, lambda x, y: x + y))
    
  • 输出

    10
    

foreachPartition 算子 - Action

  • 功能:对每个分区的数据执行指定操作,无返回值。

  • 语法

    rdd.foreachPartition(func)
    
    • func:对分区数据执行操作的函数。
  • 示例代码

    def process_partition(partition):
        for x in partition:
            print(x)
    
    rdd = sc.parallelize([1, 2, 3, 4], 2)
    rdd.foreachPartition(process_partition)
    

saveAsSequenceFile - Action

  • 功能:将 RDD 的内容保存为 Hadoop SequenceFile 格式。
  • 引发 Shuffle:否
  • 示例
    rdd = sc.parallelize([("a", 1), ("b", 2)])
    rdd.saveAsSequenceFile("/path/to/sequence_file")
    

saveAsObjectFile - Action

  • 功能:将 RDD 的内容保存为 Spark 原生的对象文件格式,方便读取。
  • 引发 Shuffle:否
  • 示例
    rdd = sc.parallelize([("a", 1), ("b", 2)])
    rdd.saveAsObjectFile("/path/to/object_file")