Spark简答题

1. Spark中Shuffle表示什么情况,那些算子能够触发Shuffle发生

Shuffle 是指数据在不同分区之间的重新分配和移动的过程,通常发生在任务阶段之间。触发 Shuffle 的情况包括需要对数据进行重新分区或分组计算,即数据需要被重新分区数据需要被聚合或组合

能够触发 Shuffle 的算子包括:

  • groupByKey0
  • reduceByKey
  • join
  • distinct
  • repartition
  • coalesce (减少分区数且 shuffle=true 时)
  • sortBy

2. Spark中的检查点和缓存什么区别和联系

  • 区别
    • 检查点 (Checkpoint):用于持久化 RDD,将数据写入磁盘以提供容错能力,一旦设置后会清除之前的RDD血缘关系。适合长时间运行的任务。
    • 缓存 (Cache):用于将数据存储在内存或磁盘中以加速访问,主要为提升性能,适用于短期存储。
  • 联系
    • 缓存 提供快速访问,检查点 提供容错能力。
    • 二者都可用于提高容错能力,减少计算开销。
    • 可以先缓存后检查点,提高检查点的效率。

3. Hadoop的基于进程的计算模型,Spark基于线程的计算模型,比较这两种模型的优缺点

  • Hadoop(进程)
    • 优点:隔离性高,适合大规模离线批处理任务。容错性好,进程失败不会影响其他进程。
    • 缺点:进程启动和切换开销大,性能较低。数据需要频繁读写磁盘,导致 I/O 开销高。
  • Spark(线程)
    • 优点:线程间共享内存,计算效率高,适合内存计算。启动和调度开销小,支持实时和迭代计算。
    • 缺点:线程隔离性较弱,可能存在数据共享冲突和内存泄漏风险。对内存管理要求高,容易因资源不足导致失败。

4. 什么是宽依赖和窄依赖,并列举相关的算子

  • 窄依赖:子 RDD 的每个分区只依赖父 RDD 的一个或少量分区,不需要 Shuffle。例如:mapfilterflatMapunion
  • 宽依赖:子 RDD 的一个分区依赖父 RDD 的多个分区,需要进行 Shuffle。例如:reduceByKeygroupByKeyjoindistinct

5. SparkSQL中的临时视图和全局视图是什么,有什么区别,如何创建

  • 临时视图 (Temporary View)
    • 只在当前 SparkSession 生命周期内有效,不能跨 SparkSession。
    • 创建方式:createOrReplaceTempView("viewName")
  • 全局视图 (Global View)
    • 可跨 SparkSession 使用,存储在全局 Catalog 中,生命周期与 Spark 应用程序一致。
    • 创建方式:createGlobalTempView("viewName")

6. RDD、DataFrame、DStream数据抽象之间的联系和区别是什么

  • 联系:都是 Spark 提供的核心抽象,用于处理不同场景的数据。RDD 是底层抽象,DataFrameDStream 基于 RDD 实现。
  • 区别
    • RDD:底层抽象,弹性分布式数据集,提供低级 API,提供了精确的控制,适合处理非结构化数据。
    • DataFrame:类似于关系型数据库的表,支持 SQL 查询,支持优化执行计划,适合处理结构化数据。
    • DStream:处理流式数据的抽象,基于 RDD 实现,适合实时计算,计算时间序列上的 RDD。

7. 比较Spark和MapReduce

  • Spark
    • 速度:内存计算,速度快,适合实时和迭代计算。
    • 编程模型:提供高级 API(RDD、DataFrame、SQL)。
    • 容错性:RDD 提供数据容错机制。
    • 应用场景:批处理、实时计算、流处理和机器学习。
  • MapReduce
    • 速度:基于磁盘 I/O,速度慢,适合离线批处理。
    • 编程模型:基于 mapreduce,开发复杂。
    • 容错性:通过数据冗余实现容错。
    • 应用场景:离线批量数据处理。

8. Spark算子有哪些类型,分别列举其中的算子

  • 转换算子 (Transformation):返回一个新的 RDD,不会触发计算。
    • 如:mapflatMapfiltergroupByKeyreduceByKeyjoin
  • 行动算子 (Action):触发计算,返回结果。
    • 如:collectcountsaveAsTextFilereduce

9. Spark中那些算子能够导致重新分区

能够导致重新分区的算子包括:

  • repartition
  • coalesce(减少分区数且 shuffle=true 时)
  • partitionBy
  • groupByKey
  • reduceByKey
  • aggregateByKey
  • cogroup
  • distinct
  • sortByKey
  • sortBy
  • join

10. Spark中的DStream是什么,它有什么特点,它支持哪些数据源

  • DStream(Discretized Stream)是 Spark Streaming 中的基本抽象,表示连续的数据流,由一系列时间间隔内的 RDD 组成。
  • 特点
    • 高吞吐、低延迟:适合实时数据处理。
    • 分布式:数据流被分成多个小批次(RDD)进行处理。
    • 弹性容错:基于 RDD 的容错机制,支持失败恢复。
    • API 丰富:支持 Transformation 和 Action 操作。
    • 可以与 RDD 转换互操作。
  • 支持的数据源
    • 文件系统:HDFS、S3 等。
    • 消息队列:Kafka、Flume。
    • Socket 数据:TCP Socket。
    • 自定义数据源:通过 receiver 接收数据。

11. Spark Streaming程序的基本步骤

  1. 通过创建输入 DStream 来定义输入源

    • DStream 是 Spark Streaming 中的基本抽象。
  2. 对 DStream 应用转换操作和输出操作来定义流计算

    • 例如使用 mapfilter 等转换算子。
  3. streamingContext.start() 来开始接收数据和处理流程

    • 开启流式计算的执行。
  4. 通过 streamingContext.awaitTermination() 方法来等待处理结束

    • 等待计算完成(可以是手动结束或因错误而结束)。
  5. 通过 streamingContext.stop() 来手动结束流计算进程

    • 手动终止流式计算。

Spark编程题

Spark的输入

RDD输入

  • 从文件读取·
rdd = sc.textFile("path/to/file.txt") # 基本的读法
rdd = sc.wholeTextFiles("path/to/file.txt") # 小文件读取

SparkSql输入

  • 文件读取
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
df = spark.read.json("path/to/file.json")
  • Mysql

    • format( ) 支持Spark支持的文件格式
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/db_name") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "table_name") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

DFStream 输入

  • DStream
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName("Text")
conf.setMaster('local[*]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc,5)
lines = ssc.textFileStream("path/to/directory")
  • socket
lines = ssc.socketTextStream("localhost", 9999)
  • Kafka
from pyspark.streaming.kafka import KafkaUtils

# 使用 DirectStream 读取 Kafka 数据
kafka_stream = KafkaUtils.createDirectStream(
    ssc,
    topics=["topic_name"],  # Kafka 主题列表
    kafkaParams={
        "metadata.broker.list": "broker1:9092,broker2:9092",  # Kafka broker 列表
        "group.id": "group_id",                              # 消费者组 ID
    }
)

# 处理 Kafka 数据
lines = kafka_stream.map(lambda message: message[1])  # 获取消息的 value
lines.pprint()

Structured Streaming 输入

  • 从文件流读取
df = spark.readStream \
    .format("csv") \
    .option("path", "path/to/directory") \
    .schema(schema) \
    .load()
  • Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "topic_name") \
    .load()
  • Socket
df = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

Spark的输出

RDD 输出

  • print()
print(rdd.collect())
print(rdd_data)
  • saveAsTextFile
rdd.saveAsTextFile("/")

SparkSQL 输出

  • show
df.show()  # 默认显示前 20 行
df.show(10)  # 显示前 10 行
  • CSV 文件

    • "append":追加数据到目标路径(如果目标路径存在)。

    • "ignore":如果目标路径存在,则跳过写入。

    • "error""errorifexists"(默认):如果目标路径存在,则抛出异常。

    • "overwrite":覆盖目标路径中现有的文件。

df.write \
  .mode("append") \  # 设置文件保存模式
  .options(header=True, delimiter=",") \ #其余参数
  .csv("output/path") # 保存位置
  • MySQL
df.write \
  .format("jdbc")\
  .option("url", "jdbc:mysql://localhost:3306/db_name")\
  .option("driver", "com.mysql.cj.jdbc.Driver")\
  .option("dbtable", "table_name")\
  .option("user", "username")\
  .option("password", "password")\
  .save()

DFStream 输出

  • pprint()
df_stream.pprint()  # 每批次打印到控制台
  • saveAsTextFiles
df_stream.saveAsTextFiles("/")

Structured Streaming 输出

  • console
query = df.writeStream \
    .outputMode("append") \  # 可选:append, complete, update
    .format("console") \
    .start()

query.awaitTermination()
  • 文件系统
query = df.writeStream \
    .outputMode("append") \
    .format("csv") \  # 保存为 CSV 格式
    .option("path", "output/path") \  # 保存路径
    .option("checkpointLocation", "output/checkpoint") \  # 检查点路径
    .start()

query.awaitTermination()

  • kafka
query = df.writeStream \
    .outputMode("append") \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "output_topic") \
    .option("checkpointLocation", "output/checkpoint") \
    .start()

query.awaitTermination()

RDD

1. 从1万个数中抽取100个,取这100个数里面奇数的最大3个和偶数最小3个

from pyspark import SparkContext
import random

sc = SparkContext("local" , "SampleAndMaxMin")
rdd = sc.parallelize(data)

# 随机抽取100个数
sampled_data = rdd.takeSample(False, 100)

# 转换为RDD以进一步处理
sampled_rdd = sc.parallelize(sampled_data)

# 筛选奇数和偶数
odd_numbers = sampled_rdd.filter(lambda x: x % 2 != 0).sortBy(lambda x: -x).take(3)
even_numbers = sampled_rdd.filter(lambda x: x % 2 == 0).sortBy(lambda x: x).take(3)

print(odd_numbers)
print(even_numbers)

2. 读取文件,提取所有的单词,按照单词中数字的大小降序排序

from pyspark import SparkContext
import re

sc = SparkContext("local", "WordNumberSort")

# 读取文件
file_path = "file.txt"  # 替换成实际的文件路径
lines = sc.textFile(file_path)

# 提取单词并筛选包含数字的单词
def extract_words_with_numbers(line):
    return [word for word in re.findall(r'\b\w*\d+\w*\b', line)]

words_rdd = lines.flatMap(extract_words_with_numbers)

# 提取单词中的数字并进行排序
def extract_number(word):
    return int("".join(filter(str.isdigit, word)))  # 提取单词中的数字

sorted_words = words_rdd.map(lambda word: (word, extract_number(word))) \
                        .sortBy(lambda x: -x[1]) \
                        .map(lambda x: x[0])
print(sorted_words.collect())

3. 读取文件,提取所有的单词,按照单词中数字的大小降序排序

from pyspark import SparkContext
import re

sc = SparkContext("local", "WordNumberSort")

# 读取文件
file_path = "file.txt"  # 替换成实际的文件路径
lines = sc.textFile(file_path)

# 提取单词并筛选包含数字的单词
def extract_words_with_numbers(line):
    return [word for word in re.findall(r'\b\w*\d+\w*\b', line)]

words_rdd = lines.flatMap(extract_words_with_numbers)

# 提取单词中的数字并进行排序
def extract_number(word):
    return int("".join(filter(str.isdigit, word)))  # 提取单词中的数字

sorted_words = words_rdd.map(lambda word: (word, extract_number(word))) \
                        .sortBy(lambda x: x[1]) \
                        .map(lambda x: x[0])
print(sorted_words.collect())

4. 生成一个整数RDD(1000个),随机抽取100个,计算每个分区的数据和,并降序排序

from pyspark import SparkContext
import random

sc = SparkContext("local", "PartitionSumSort")

# 生成一个包含1000个整数的RDD
data = [random.randint(1, 1000) for _ in range(1000)]
rdd = sc.parallelize(data, numSlices=10)  # 设置分区数

# 随机抽取100个数
sampled_rdd = rdd.takeSample(False, 100)

# 转换为RDD并重新分区
sampled_rdd = sc.parallelize(sampled_rdd, numSlices=5)

# 计算每个分区的数据和
partition_sums = sampled_rdd.mapPartitions(lambda partition: [sum(partition)])

# 对分区的和降序排序
sorted_sums = partition_sums.sortBy(lambda x: -x).collect()

print(sorted_sums)

5. 计算平均成绩

  • 计算平均成绩,数据形式包含名字、科目和分数: ('mike',‘math', 96) ...
from pyspark import SparkContext

sc = SparkContext("local", "AverageScore")

# 模拟数据
data = [
    ('mike', 'math', 96),
    ('mike', 'english', 88),
    ('lily', 'math', 78),
    ('lily', 'english', 85),
    ('john', 'math', 90),
    ('john', 'english', 75)
]
rdd = sc.parallelize(data)

# 计算平均成绩
average_rdd = (
    rdd.map(lambda x: (x[0], (x[2], 1)))  # 转换为 (name, (score, 1))
       .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))  # 累加分数和计数
       .mapValues(lambda x: x[0] / x[1])  # 计算平均值
)

print(average_rdd.collect())

6. 计算历史温度最高的两个月

  • 计算历史温度最高的两个月,数据来自文件,文件中每一行内容存放的是日期和温度,用空格分开( 2004-4-1 39)
from pyspark import SparkContext

sc = SparkContext("local", "MaxTemperatureMonths")

# 模拟数据:日期和温度
data = [
    "2004-01-01 15",
    "2004-01-15 30",
    "2004-02-10 25",
    "2004-02-20 40",
    "2004-03-01 45",
    "2004-03-10 50"
]
rdd = sc.parallelize(data)

# 提取月份和温度
def extract_month_temp(line):
    date, temp = line.split()
    month = "-".join(date.split("-")[:2])  # 提取月份 (2004-01)
    return (month, int(temp))

# 计算每个月的最高温度
month_max_temp = (
    rdd.map(extract_month_temp)
       .reduceByKey(max)  # 每个月的最高温度
       .sortBy(lambda x: -x[1])  # 降序排序
)

# 获取最高温度的前两个月
top2_months = month_max_temp.take(2)

print(top2_months)

7. 统计PV(页面访问量)

  • 通过Nginx日志统计站点页面访问量,要求:地址中如果是 /sys/user/list?page=5 形式的去掉?及后面的内容,转换成 /sys/user/list ,并统计访问量最高的10个页面
from pyspark import SparkContext
import re

sc = SparkContext("local", "PageViewStats")

# 模拟Nginx日志数据
data = [
    '123.161.202.231 - - [11/Oct/2024:11:47:47 +0800] "GET /api/crm/customer/list?page=5 HTTP/2.0"',
    '123.161.202.231 - - [11/Oct/2024:11:47:48 +0800] "GET /api/crm/customer/is_online?t=172 HTTP/2.0"',
    '123.161.202.231 - - [11/Oct/2024:11:47:50 +0800] "GET /sys/user/list?page=2 HTTP/2.0"',
    '123.161.202.231 - - [11/Oct/2024:11:47:51 +0800] "GET /sys/user/list?page=3 HTTP/2.0"'
]
rdd = sc.parallelize(data)

# 提取URL并去掉?后的内容
def extract_page(line):
    match = re.search(r'GET\s(\S+)', line)
    if match:
        url = match.group(1).split('?')[0]
        return url
    return None

# 统计访问量
page_counts = (
    rdd.map(extract_page)
       .filter(lambda x: x is not None)
       .map(lambda x: (x, 1))
       .reduceByKey(lambda a, b: a + b)
       .sortBy(lambda x: -x[1])  # 降序排序
)

# 获取访问量最高的10个页面
top_pages = page_counts.take(10)

print(top_pages)

8. 统计UV(用户访问量)

  • 统计UV,通过Nginx日志统计站点用户访问量,用户访问多次相同地址视为1次访问,最终统计放量最多的10个用户
from pyspark import SparkContext
import re

sc = SparkContext("local", "UserVisitStats")

# 模拟Nginx日志数据
data = [
    '123.161.202.231 - - [11/Oct/2024:11:47:47 +0800] "GET /api/crm/customer/is_online?t=172 HTTP/2.0"',
    '123.161.202.231 - - [11/Oct/2024:11:47:48 +0800] "GET /api/crm/customer/is_online?t=173 HTTP/2.0"',
    '456.161.202.232 - - [11/Oct/2024:11:47:50 +0800] "GET /sys/user/list?page=2 HTTP/2.0"',
    '456.161.202.232 - - [11/Oct/2024:11:47:51 +0800] "GET /sys/user/list?page=2 HTTP/2.0"'
]
rdd = sc.parallelize(data)

# 提取用户IP和页面URL
def extract_user_page(line):
    match = re.search(r'^(\S+).+GET\s(\S+)', line)
    if match:
        ip = match.group(1)  # 用户IP
        url = match.group(2).split('?')[0]  # 去掉?后的内容
        return (ip, url)
    return None

# 统计UV
uv_counts = (
    rdd.map(extract_user_page)
       .filter(lambda x: x is not None)
       .distinct()  # 去重,同一用户访问相同页面视为1次
       .map(lambda x: (x[0], 1))  # 以用户IP为key
       .reduceByKey(lambda a, b: a + b)
       .sortBy(lambda x: -x[1])  # 降序排序
)

# 获取访问量最多的10个用户
top_users = uv_counts.take(10)

print(top_users)

9. 访问记录分析

http://math.lynu.edu.cn/zhangsan 其中math表示科目, 张三表示教师

编写Spark程序统计下面的问题:

  • 各个科目所有教师访问量Top5的

  • 每个科目访问量Top3的教师

  • 访问量最高的5个科目

  • 访问量最高的5个教师

from pyspark import SparkContext
import re

sc = SparkContext("local", "AccessLogAnalysis")

# 模拟数据
data = [
    "http://math.lynu.edu.cn/zhangsan",
    "http://math.lynu.edu.cn/lisi",
    "http://physics.lynu.edu.cn/zhangsan",
    "http://math.lynu.edu.cn/zhangsan",
    "http://cs.lynu.edu.cn/wangwu",
    "http://cs.lynu.edu.cn/lisi",
    "http://math.lynu.edu.cn/lisi",
    "http://physics.lynu.edu.cn/wangwu",
    "http://physics.lynu.edu.cn/zhangsan",
    "http://math.lynu.edu.cn/wangwu",
]

rdd = sc.parallelize(data)

# 提取科目和教师
def extract_subject_teacher(url):
    match = re.match(r"http://(\w+)\.lynu\.edu\.cn/(\w+)", url)
    if match:
        subject, teacher = match.groups()
        return (subject, teacher)
    return None

subject_teacher_rdd = rdd.map(extract_subject_teacher).filter(lambda x: x is not None)

# 1. 各个科目所有教师访问量Top5
subject_teacher_count = subject_teacher_rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(lambda a, b: a + b)
subject_teacher_top5 = (
    subject_teacher_count.map(lambda x: (x[0][0], (x[0][1], x[1])))
    .groupByKey()
    .mapValues(lambda teachers: sorted(teachers, key=lambda t: -t[1])[:5])
)

print("各个科目所有教师访问量Top5:")
print(subject_teacher_top5.collect())

# 2. 每个科目访问量Top3的教师
subject_teacher_top3 = (
    subject_teacher_count.map(lambda x: (x[0][0], (x[0][1], x[1])))
    .groupByKey()
    .mapValues(lambda teachers: sorted(teachers, key=lambda t: -t[1])[:3])
)

print("每个科目访问量Top3的教师:")
print(subject_teacher_top3.collect())

# 3. 访问量最高的5个科目
subject_count = subject_teacher_rdd.map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)
top5_subjects = subject_count.sortBy(lambda x: -x[1]).take(5)

print("访问量最高的5个科目:", top5_subjects)

# 4. 访问量最高的5个教师
teacher_count = subject_teacher_rdd.map(lambda x: (x[1], 1)).reduceByKey(lambda a, b: a + b)
top5_teachers = teacher_count.sortBy(lambda x: -x[1]).take(5)

print("访问量最高的5个教师:", top5_teachers)

10. 成绩分析

有文本文件数据源格式如下:(姓名,科目,分数)
Tom,Database,80
Jim,Database,90
Tom,DataStructure,80
Jim,DataStructure,97

编程实现以下统计结果

  • 总共有多少名学生

  • 该学校开始了多少们课程

  • Tom总成绩平均分

  • 每名同学选修的课程门数

  • DataStructure课程有多少人选修

  • 各门课程的平均分是多少

  • 每位学生的总成绩

from pyspark import SparkContext

sc = SparkContext("local", "ScoreAnalysis")

# 模拟数据 (姓名, 科目, 分数)
data = [
    ("Tom", "Database", 80),
    ("Jim", "Database", 90),
    ("Tom", "DataStructure", 80),
    ("Jim", "DataStructure", 97),
    ("Ann", "Database", 85),
    ("Ann", "DataStructure", 88),
    ("Tom", "Math", 70),
    ("Jim", "Math", 95),
    ("Ann", "Math", 82)
]
rdd = sc.parallelize(data)

# 1. 总共有多少名学生
students = rdd.map(lambda x: x[0]).distinct().count()
print("总共有多少名学生:", students)

# 2. 学校开设了多少门课程
courses = rdd.map(lambda x: x[1]).distinct().count()
print("学校开设了多少门课程:", courses)

# 3. Tom 总成绩和平均分
tom_scores = rdd.filter(lambda x: x[0] == "Tom").map(lambda x: x[2])
tom_total = tom_scores.sum()
tom_avg = tom_total / tom_scores.count()
print("Tom的总成绩:", tom_total, "平均分:", tom_avg)

# 4. 每名同学选修的课程门数
student_courses = rdd.map(lambda x: (x[0], x[1])).distinct().groupByKey().mapValues(len)
print("每名同学选修的课程门数:", student_courses.collect())

# 5. DataStructure课程有多少人选修
ds_students = rdd.filter(lambda x: x[1] == "DataStructure").map(lambda x: x[0]).distinct().count()
print("DataStructure课程选修人数:", ds_students)

# 6. 各门课程的平均分
course_avg = rdd.map(lambda x: (x[1], (x[2], 1))) \
                .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
                .mapValues(lambda x: x[0] / x[1])
print("各门课程的平均分:", course_avg.collect())

# 7. 每位学生的总成绩
student_total_scores = rdd.map(lambda x: (x[0], x[2])).reduceByKey(lambda a, b: a + b)
print("每位学生的总成绩:", student_total_scores.collect())

SparkSQL

1. DSL风格实现 wordcount程序

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# 初始化SparkSession
spark = SparkSession.builder.master("local").appName("WordCount").getOrCreate()

# 模拟数据
data = ["hello world", "hello spark", "word count example", "hello world spark"]

# 将RDD转换为DataFrame
rdd = spark.sparkContext.parallelize(data)

# 将字符串拆分为单词,并转换成单列元组
words_rdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda word: (word,))
df = spark.createDataFrame(words_rdd, ["word"])

# 使用DSL风格计算单词计数
word_count = (
    df.groupBy("word")  # 按照 word 分组
      .count()          # 统计每个单词的数量
      .orderBy(col("count").desc())  # 按照count降序排序
)
print("WordCount using DSL:")
word_count.show()

2. SQL风格实现 wordcount程序

from pyspark.sql import SparkSession

# 初始化SparkSession
spark = SparkSession.builder.master("local").appName("WordCountSQL").getOrCreate()

# 模拟数据
data = ["hello world", "hello spark", "word count example", "hello world spark"]

# 将字符串拆分为单词,并转换成单列元组
rdd = spark.sparkContext.parallelize(data)
words_rdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda word: (word,))
df = spark.createDataFrame(words_rdd, ["word"])

# 注册临时视图
df.createOrReplaceTempView("words")

# 使用SQL查询实现WordCount
word_count_sql = spark.sql("""
    SELECT word, COUNT(*) as count
    FROM words
    GROUP BY word
    ORDER BY count DESC
""")

print("WordCount using SQL:")
word_count_sql.show()

3. 电影数据分析

数据格式userId::movieId::rate::ts

1::1193::5::978300760

  • 电影平均分

  • 每个用户评分的平均、最高、最低分

  • 查询评分超过100次的电影和平均分和排名top10

from pyspark.sql import SparkSession
import findspark
from pyspark.sql.types import *
from pyspark.sql import functions as F
findspark.init()
spark = SparkSession.builder.\
            appName("Sparkwork").\
            master("local[*]").\
            getOrCreate()
#数据结构
schema = StructType() \
    .add("userId", StringType())\
    .add("movieId", StringType())\
    .add("rate", IntegerType())\
    .add("ts", StringType())
#读数据
df = spark.read.format('csv') \
    .option("sep", "::") \
    .option("header", False)\
    .option("encoding", "utf-8")\
    .schema(schema=schema)\
    .load("./ratings.dat")
df.show(5)
df.createOrReplaceTempView("ratings")
# 1 电影平均分
# SQL风格
spark.sql("""
select movieId , 
       round(avg(rate),2) as avg_rate 
from ratings 
group by movieId
order by movieId
""").show(5)
# DSL风格
df.groupBy("movieId")\
    .avg("rate")\
    .withColumnRenamed("avg(rate)", "avg_rate")\
    .withColumn('avg_rate', F.round("avg_rate",2))\
    .orderBy("movieId", ascending=True)\
    .show(5)
# 2 每个用户评分的平均、最高、最低分
# SQL风格
spark.sql("""
select userId , 
       round(avg(rate),2) as avg_rate ,
       max(rate) as max_rate ,
       min(rate) as min_rate
from ratings 
group by userId
order by userId
""").show(5)
# DSL风格
df.groupBy("userId")\
    .agg(
        F.round(F.avg("rate"), 2).alias("avg_rate"),
        F.max("rate").alias("max_rate"),
        F.min("rate").alias("min_rate")
    )\
    .orderBy("userId", ascending=True)\
    .show(5)
# 3 查询评分超过100次的电影和平均分和排名top10
# SQL风格
spark.sql("""
select userId , 
       round(avg(rate),2) as avg_rate ,
       count(*) as count
from ratings 
group by userId
HAVING COUNT(*) > 100
order by avg_rate DESC
""").show(5)
# DSL风格
df.groupBy("userId") \
    .agg(
        F.round(F.avg("rate"), 2).alias("avg_rate"),
        F.count("*").alias("count")
    ) \
    .filter(F.col("count") > 100)\
    .orderBy(F.col("avg_rate").desc())\
    .show(5)

4. 使用Python语言编写Spark程序,使用DataFrame API实现下列要求,数据来自于一个CSV文件GCB2022v27_MtCO2_flat.csv,统计结果保存到CSV格式文件中。

  • 清洗数据,过滤掉某个国家某年数据全为空的行,并将所有空字段替换为0。

  • 使用SQL和DSL两种风格API编写程序统计2000年-2023年碳排放总量最高的10个国家。

  • 使用SQL和DSL两种风格API编写程序统计2021年碳排放增量同比(2020年)最高的10个国家。

from pyspark.sql import SparkSession
import findspark
from pyspark.sql.types import *
from pyspark.sql import functions as F
findspark.init()
spark = SparkSession.builder.\
            appName("Sparkwork").\
            master("local[*]").\
            getOrCreate()
schema = StructType([
    StructField("Country", StringType(), True),         # 国家名称
    StructField("ISO 3166-1 alpha-3", StringType(), True),  # ISO 三字母国家代码
    StructField("Year", IntegerType(), True),           # 数据记录的年份
    StructField("Total", DoubleType(), True),           # 总排放量
    StructField("Coal", DoubleType(), True),            # 煤炭燃烧的排放量
    StructField("Oil", DoubleType(), True),             # 石油燃烧的排放量
    StructField("Gas", DoubleType(), True),             # 天然气燃烧的排放量
    StructField("Cement", DoubleType(), True),          # 水泥工业相关的排放量
    StructField("Flaring", DoubleType(), True),         # 天然气燃烧(火炬燃烧)的排放量
    StructField("Other", DoubleType(), True),           # 其他来源的排放量
    StructField("Per Capita", DoubleType(), True)       # 人均排放量
])
df = spark.read.format('csv') \
    .option("sep", ",") \
    .option("header", True)\
    .option("encoding", "utf-8")\
    .schema(schema=schema)\
    .load("./GCB2022v27_MtCO2_flat.csv")
df.createOrReplaceTempView("MtCO2")
# 1. 清洗数据,过滤掉某个国家某年数据全为空的行,并将所有空字段替换为0。
# SQL风格
df_MtCO2 = spark.sql(
"""
SELECT 
    Country,
    `ISO 3166-1 alpha-3`,
    Year,
    COALESCE(Total, 0) AS Total,
    COALESCE(Coal, 0) AS Coal,
    COALESCE(Oil, 0) AS Oil,
    COALESCE(Gas, 0) AS Gas,
    COALESCE(Cement, 0) AS Cement,
    COALESCE(Flaring, 0) AS Flaring,
    COALESCE(Other, 0) AS Other,
    COALESCE(`Per Capita`, 0) AS `Per Capita`
FROM emission_data
WHERE NOT (
    Total IS NULL AND
    Coal IS NULL AND
    Oil IS NULL AND
    Gas IS NULL AND
    Cement IS NULL AND
    Flaring IS NULL AND
    Other IS NULL AND
    `Per Capita` IS NULL
)
"""
)
# DSL风格
df = df.filter(
    ~(
        col("Total").isNull() &
        col("Coal").isNull() &
        col("Oil").isNull() &
        col("Gas").isNull() &
        col("Cement").isNull() &
        col("Flaring").isNull() &
        col("Other").isNull() &
        col("Per Capita").isNull()
    )
)
df = df.fillna(0)
df_MtCO2.createOrReplaceTempView("MtCO2")
# 2. 使用SQL和DSL两种风格API编写程序统计2000年-2023年碳排放总量最高的10个国家。
# SQL风格
spark.sql("""
select 
    Country, 
    max(Total) as Total
from MtCO2 
where Year >= 2000 and Year <= 2023 
group by Country
order by Total desc
""").show(10)
# DSL风格
df_ = df.where("Year >= 2000 and Year <= 2023 ").\
    groupBy("Country").\
    agg(F.max("Total").alias("Total")).\
    orderBy("Total" , ascending = False)
df_.coalesce(1).write.mode("overwrite").csv("output/result1", header=True)
# 3. 使用SQL和DSL两种风格API编写程序统计2021年碳排放增量同比(2020年)最高的10个国家。
# SQL风格
spark.sql("""
select 
    mid1.Country as Country_2020, 
    mid1.Total as Total_2020, 
    mid2.Country as Country_2021, 
    mid2.Total as Total_2021
from 
    (select 
        Country, 
        max(Total) as Total,
        Year
     from MtCO2 
     where Year = 2020
     group by Country, Year
    ) as mid1
inner join 
    (select 
        Country, 
        max(Total) as Total,
        Year
     from MtCO2 
     where Year = 2021
     group by Country, Year
    ) as mid2
on mid1.Country = mid2.Country
order by Total_2020 desc
""").show(10)
# DSL风格
mid1 = df.filter(F.col("Year") == 2020)\
    .groupBy("Country", "Year")\
    .agg(F.max("Total").alias("Total_2020"))\
    .alias("mid1")
mid2 =df.filter(F.col("Year") == 2021)\
    .groupBy("Country", "Year")\
    .agg(F.max("Total").alias("Total_2021"))\
    .alias("mid2")
df_ = mid1.join(mid2, F.col("mid1.Country") == F.col("mid2.Country"), "inner")\
.select(
    F.col("mid1.Country").alias("Country_2020"),
    F.col("mid1.Total_2020"),
    F.col("mid2.Country").alias("Country_2021"),
    F.col("mid2.Total_2021"),
)\
.orderBy(F.col("Total_2020").desc())
df_.coalesce(1).write.mode("overwrite").csv("output/result2", header=True)

5. 使用Python语言编写Spark程序,使用DataFrame API实现下列要求,数据来自于一个文本文件movie.txt,统计结果保存到CSV格式文件中。

影评数据表格式说明:

UserID::MovieID::Rating::Timestamp

  • 清洗数据,过滤掉某个用户评价数据全为空(MovieID等于0)的行。

  • 使用SQL和DSL两种风格API编写程序统计超过平均分的电影的数量。

  • 使用SQL和DSL两种风格API编写程序统计给低分电影(分数小于3)打分次数最多的10个用户。

from pyspark.sql import SparkSession
import findspark
from pyspark.sql.types import *
from pyspark.sql import functions as F
findspark.init()
spark = SparkSession.builder.\
            appName("Sparkwork").\
            master("local[*]").\
            getOrCreate()
schema = StructType([
    StrutField("UserID", IntegerType(), True),
    StructField("MovieID", IntegerType(), True),
    StructField("Rating", IntegerType), True), 
    StructField("Timestamp", IntegerType(), True),
])
df = spark.read.format('csv') \
    .option("sep", "::") \
    .option("header", False)\
    .option("encoding", "utf-8")\
    .schema(schema=schema)\
    .load("./ratings.dat")\
    .alias("rating")
df.createOrReplaceTempView("ratings")
# 1. 清洗数据,过滤掉某个用户评价数据全为空(MovieID等于0)的行
# SQL 风格
df_ = spark.sql("""
    SELECT * 
    FROM ratings 
    WHERE MovieID != 0
""")
# DSL 风格
df = df.filter(F.col("MovieID") != 0)
df_.createOrReplaceTempView("rating")
# 2. 使用SQL和DSL两种风格API编写程序统计超过平均分的电影的数量。
# SQL 风格
spark.sql("""
select 
    count(mid2.MovieID) as `超过平均分的电影的数量`
from
    (select 
        MovieID, 
        round(avg(Rating),2) as Rating 
    from 
        rating
    group by 
        MovieID
    ) as mid2
join
    (select 
        round(avg(Rating),2) as Rating 
    from 
        rating
    ) as mid 
on mid2.Rating > mid.Rating
""").show()
# DSL 风格
mid1 = df.select(F.round(F.avg("Rating"), 2)\
    .alias("Rating")).alias("mid1")
mid2 = df.select("MovieID","Rating")\
    .groupBy("MovieID").agg(F.round(F.avg("Rating"), 2)\
    .alias("Rating")).alias("mid2")
df_ = mid1.join(mid2, F.col("mid1.Rating") < F.col("mid2.Rating"), "inner")\
.select(
    F.count("mid2.MovieID").alias("超过平均分的电影的数量")
)
df_.coalesce(1).write.mode("overwrite").csv("output/result1", header=True)
# 3. 使用SQL和DSL两种风格API编写程序统计给低分电影(分数小于3)打分次数最多的10个用户。
# SQL 风格
spark.sql("""
select 
    UserID , count(rating.MovieID) as `低分电影数`
from 
    rating
join
    (select 
        MovieID, 
        round(avg(Rating),2) as Rating 
    from 
        rating
    group by 
        MovieID
    ) as mid2
on 
    mid2.Rating < 3 
and 
    mid2.MovieID = rating.MovieID
group by 
    UserID
order by `低分电影数` desc
""").show(10)
# DSL 风格
mid1 = df.select("MovieID","Rating")\
    .groupBy("MovieID").agg(F.round(F.avg("Rating"),2)\
    .alias("Rating")).alias("mid1")
df_ = df.join(
    mid1,
(F.col("mid1.Rating") < 3) & (F.col("mid1.MovieID") == F.col("rating.MovieID"))
).groupBy(
    F.col("rating.UserID")  # 按 UserID 分组
).agg(
    F.count(F.col("rating.MovieID")).alias("超过平均分的电影的数量")  # 聚合统计
).orderBy(
    "超过平均分的电影的数量" , ascending = False
).select(
    "UserID" , "超过平均分的电影的数量"
)
df_.coalesce(1).write.mode("overwrite").csv("output/result2", header=True)

Spark Streaming

1. (DStream)使用SparkStream实现词频统计,数据源采用socket方式

import findspark
findspark.init()

from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName("Text")
conf.setMaster('local[*]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc,5)
ssc.checkpoint("./")
lines = ssc.socketTextStream("127.0.0.1",6666)
words = lines.flatMap(lambda x: x.split(' '))\
    .map(lambda x:(x,1))\
    .reduceByKeyAndWindow(
            lambda x,y:x+y,
            lambda x,y:x-y,
            windowDuration = 20,
            slideDuration = 10
        )
words.pprint()
ssc.start()
ssc.awaitTermination();

2. (Structured Streaming)使用SparkStream实现词频统计,数据源采用socket方式

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, window, to_timestamp

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("Spark Streaming WordCount") \
    .getOrCreate()

# 从 socket 输入读取数据 (假设主机为 localhost,端口为 9999)
socket_stream = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# 将输入数据按 "time:word" 格式解析
parsed_stream = socket_stream \
    .select(split(col("value"), ",").alias("parts")) \
    .selectExpr("parts[0] as time", "parts[1] as word") \
    .withColumn("timestamp", to_timestamp(col("time"), "yyyy-MM-dd HH:mm:ss"))

# 定义 Watermark 和窗口
windowed_counts = parsed_stream \
    .withWatermark("timestamp", "20 seconds") \
    .groupBy(
        window(col("timestamp"), "1 minute", "30 seconds"),  # 窗口大小 1 分钟,滑动间隔 30 秒
        col("word")
    ).count()

# 输出到控制台
query = windowed_counts.writeStream \
    .outputMode("update") \
    .format("updata") \
    .option("truncate", "false") \
    .trigger(processingTime = '8 seconds')\
    .start()

# 等待查询结束
query.awaitTermination()