Spark期末复习
Spark简答题
1. Spark中Shuffle表示什么情况,那些算子能够触发Shuffle发生
Shuffle 是指数据在不同分区之间的重新分配和移动的过程,通常发生在任务阶段之间。触发 Shuffle 的情况包括需要对数据进行重新分区或分组计算,即数据需要被重新分区或数据需要被聚合或组合
能够触发 Shuffle 的算子包括:
- groupByKey0
- reduceByKey
- join
- distinct
- repartition
- coalesce (减少分区数且 shuffle=true 时)
- sortBy
2. Spark中的检查点和缓存什么区别和联系
- 区别:
- 检查点 (Checkpoint):用于持久化 RDD,将数据写入磁盘以提供容错能力,一旦设置后会清除之前的RDD血缘关系。适合长时间运行的任务。
- 缓存 (Cache):用于将数据存储在内存或磁盘中以加速访问,主要为提升性能,适用于短期存储。
- 联系:
- 缓存 提供快速访问,检查点 提供容错能力。
- 二者都可用于提高容错能力,减少计算开销。
- 可以先缓存后检查点,提高检查点的效率。
3. Hadoop的基于进程的计算模型,Spark基于线程的计算模型,比较这两种模型的优缺点
- Hadoop(进程):
- 优点:隔离性高,适合大规模离线批处理任务。容错性好,进程失败不会影响其他进程。
- 缺点:进程启动和切换开销大,性能较低。数据需要频繁读写磁盘,导致 I/O 开销高。
- Spark(线程):
- 优点:线程间共享内存,计算效率高,适合内存计算。启动和调度开销小,支持实时和迭代计算。
- 缺点:线程隔离性较弱,可能存在数据共享冲突和内存泄漏风险。对内存管理要求高,容易因资源不足导致失败。
4. 什么是宽依赖和窄依赖,并列举相关的算子
- 窄依赖:子 RDD 的每个分区只依赖父 RDD 的一个或少量分区,不需要 Shuffle。例如:
map
、filter
、flatMap
、union
。 - 宽依赖:子 RDD 的一个分区依赖父 RDD 的多个分区,需要进行 Shuffle。例如:
reduceByKey
、groupByKey
、join
、distinct
。
5. SparkSQL中的临时视图和全局视图是什么,有什么区别,如何创建
- 临时视图 (Temporary View):
- 只在当前 SparkSession 生命周期内有效,不能跨 SparkSession。
- 创建方式:
createOrReplaceTempView("viewName")
。
- 全局视图 (Global View):
- 可跨 SparkSession 使用,存储在全局 Catalog 中,生命周期与 Spark 应用程序一致。
- 创建方式:
createGlobalTempView("viewName")
。
6. RDD、DataFrame、DStream数据抽象之间的联系和区别是什么
- 联系:都是 Spark 提供的核心抽象,用于处理不同场景的数据。RDD 是底层抽象,DataFrame 和 DStream 基于 RDD 实现。
- 区别:
- RDD:底层抽象,弹性分布式数据集,提供低级 API,提供了精确的控制,适合处理非结构化数据。
- DataFrame:类似于关系型数据库的表,支持 SQL 查询,支持优化执行计划,适合处理结构化数据。
- DStream:处理流式数据的抽象,基于 RDD 实现,适合实时计算,计算时间序列上的 RDD。
7. 比较Spark和MapReduce
- Spark:
- 速度:内存计算,速度快,适合实时和迭代计算。
- 编程模型:提供高级 API(RDD、DataFrame、SQL)。
- 容错性:RDD 提供数据容错机制。
- 应用场景:批处理、实时计算、流处理和机器学习。
- MapReduce:
- 速度:基于磁盘 I/O,速度慢,适合离线批处理。
- 编程模型:基于
map
和reduce
,开发复杂。 - 容错性:通过数据冗余实现容错。
- 应用场景:离线批量数据处理。
8. Spark算子有哪些类型,分别列举其中的算子
- 转换算子 (Transformation):返回一个新的 RDD,不会触发计算。
- 如:
map
、flatMap
、filter
、groupByKey
、reduceByKey
、join
。
- 如:
- 行动算子 (Action):触发计算,返回结果。
- 如:
collect
、count
、saveAsTextFile
、reduce
。
- 如:
9. Spark中那些算子能够导致重新分区
能够导致重新分区的算子包括:
repartition
coalesce
(减少分区数且 shuffle=true 时)partitionBy
groupByKey
reduceByKey
aggregateByKey
cogroup
distinct
sortByKey
sortBy
join
10. Spark中的DStream是什么,它有什么特点,它支持哪些数据源
- DStream(Discretized Stream)是 Spark Streaming 中的基本抽象,表示连续的数据流,由一系列时间间隔内的 RDD 组成。
- 特点:
- 高吞吐、低延迟:适合实时数据处理。
- 分布式:数据流被分成多个小批次(RDD)进行处理。
- 弹性容错:基于 RDD 的容错机制,支持失败恢复。
- API 丰富:支持 Transformation 和 Action 操作。
- 可以与 RDD 转换互操作。
- 支持的数据源:
- 文件系统:HDFS、S3 等。
- 消息队列:Kafka、Flume。
- Socket 数据:TCP Socket。
- 自定义数据源:通过
receiver
接收数据。
11. Spark Streaming程序的基本步骤
-
通过创建输入 DStream 来定义输入源
- DStream 是 Spark Streaming 中的基本抽象。
-
对 DStream 应用转换操作和输出操作来定义流计算
- 例如使用
map
、filter
等转换算子。
- 例如使用
-
用
streamingContext.start()
来开始接收数据和处理流程- 开启流式计算的执行。
-
通过
streamingContext.awaitTermination()
方法来等待处理结束- 等待计算完成(可以是手动结束或因错误而结束)。
-
通过
streamingContext.stop()
来手动结束流计算进程- 手动终止流式计算。
Spark编程题
Spark的输入
RDD输入
从文件读取·
rdd = sc.textFile("path/to/file.txt") # 基本的读法
rdd = sc.wholeTextFiles("path/to/file.txt") # 小文件读取
SparkSql输入
文件读取
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
df = spark.read.json("path/to/file.json")
-
Mysql
- format( ) 支持Spark支持的文件格式
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/db_name") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("dbtable", "table_name") \
.option("user", "username") \
.option("password", "password") \
.load()
DFStream 输入
DStream
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName("Text")
conf.setMaster('local[*]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc,5)
lines = ssc.textFileStream("path/to/directory")
socket
lines = ssc.socketTextStream("localhost", 9999)
Kafka
from pyspark.streaming.kafka import KafkaUtils
# 使用 DirectStream 读取 Kafka 数据
kafka_stream = KafkaUtils.createDirectStream(
ssc,
topics=["topic_name"], # Kafka 主题列表
kafkaParams={
"metadata.broker.list": "broker1:9092,broker2:9092", # Kafka broker 列表
"group.id": "group_id", # 消费者组 ID
}
)
# 处理 Kafka 数据
lines = kafka_stream.map(lambda message: message[1]) # 获取消息的 value
lines.pprint()
Structured Streaming 输入
从文件流读取
df = spark.readStream \
.format("csv") \
.option("path", "path/to/directory") \
.schema(schema) \
.load()
Kafka
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "topic_name") \
.load()
Socket
df = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
Spark的输出
RDD 输出
print()
print(rdd.collect())
print(rdd_data)
saveAsTextFile
rdd.saveAsTextFile("/")
SparkSQL 输出
show
df.show() # 默认显示前 20 行
df.show(10) # 显示前 10 行
-
CSV 文件
-
"append"
:追加数据到目标路径(如果目标路径存在)。 -
"ignore"
:如果目标路径存在,则跳过写入。 -
"error"
或"errorifexists"
(默认):如果目标路径存在,则抛出异常。 -
"overwrite"
:覆盖目标路径中现有的文件。
-
df.write \
.mode("append") \ # 设置文件保存模式
.options(header=True, delimiter=",") \ #其余参数
.csv("output/path") # 保存位置
MySQL
df.write \
.format("jdbc")\
.option("url", "jdbc:mysql://localhost:3306/db_name")\
.option("driver", "com.mysql.cj.jdbc.Driver")\
.option("dbtable", "table_name")\
.option("user", "username")\
.option("password", "password")\
.save()
DFStream 输出
pprint()
df_stream.pprint() # 每批次打印到控制台
saveAsTextFiles
df_stream.saveAsTextFiles("/")
Structured Streaming 输出
console
query = df.writeStream \
.outputMode("append") \ # 可选:append, complete, update
.format("console") \
.start()
query.awaitTermination()
文件系统
query = df.writeStream \
.outputMode("append") \
.format("csv") \ # 保存为 CSV 格式
.option("path", "output/path") \ # 保存路径
.option("checkpointLocation", "output/checkpoint") \ # 检查点路径
.start()
query.awaitTermination()
kafka
query = df.writeStream \
.outputMode("append") \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "output_topic") \
.option("checkpointLocation", "output/checkpoint") \
.start()
query.awaitTermination()
RDD
1. 从1万个数中抽取100个,取这100个数里面奇数的最大3个和偶数最小3个
from pyspark import SparkContext
import random
sc = SparkContext("local" , "SampleAndMaxMin")
rdd = sc.parallelize(data)
# 随机抽取100个数
sampled_data = rdd.takeSample(False, 100)
# 转换为RDD以进一步处理
sampled_rdd = sc.parallelize(sampled_data)
# 筛选奇数和偶数
odd_numbers = sampled_rdd.filter(lambda x: x % 2 != 0).sortBy(lambda x: -x).take(3)
even_numbers = sampled_rdd.filter(lambda x: x % 2 == 0).sortBy(lambda x: x).take(3)
print(odd_numbers)
print(even_numbers)
2. 读取文件,提取所有的单词,按照单词中数字的大小降序排序
from pyspark import SparkContext
import re
sc = SparkContext("local", "WordNumberSort")
# 读取文件
file_path = "file.txt" # 替换成实际的文件路径
lines = sc.textFile(file_path)
# 提取单词并筛选包含数字的单词
def extract_words_with_numbers(line):
return [word for word in re.findall(r'\b\w*\d+\w*\b', line)]
words_rdd = lines.flatMap(extract_words_with_numbers)
# 提取单词中的数字并进行排序
def extract_number(word):
return int("".join(filter(str.isdigit, word))) # 提取单词中的数字
sorted_words = words_rdd.map(lambda word: (word, extract_number(word))) \
.sortBy(lambda x: -x[1]) \
.map(lambda x: x[0])
print(sorted_words.collect())
3. 读取文件,提取所有的单词,按照单词中数字的大小降序排序
from pyspark import SparkContext
import re
sc = SparkContext("local", "WordNumberSort")
# 读取文件
file_path = "file.txt" # 替换成实际的文件路径
lines = sc.textFile(file_path)
# 提取单词并筛选包含数字的单词
def extract_words_with_numbers(line):
return [word for word in re.findall(r'\b\w*\d+\w*\b', line)]
words_rdd = lines.flatMap(extract_words_with_numbers)
# 提取单词中的数字并进行排序
def extract_number(word):
return int("".join(filter(str.isdigit, word))) # 提取单词中的数字
sorted_words = words_rdd.map(lambda word: (word, extract_number(word))) \
.sortBy(lambda x: x[1]) \
.map(lambda x: x[0])
print(sorted_words.collect())
4. 生成一个整数RDD(1000个),随机抽取100个,计算每个分区的数据和,并降序排序
from pyspark import SparkContext
import random
sc = SparkContext("local", "PartitionSumSort")
# 生成一个包含1000个整数的RDD
data = [random.randint(1, 1000) for _ in range(1000)]
rdd = sc.parallelize(data, numSlices=10) # 设置分区数
# 随机抽取100个数
sampled_rdd = rdd.takeSample(False, 100)
# 转换为RDD并重新分区
sampled_rdd = sc.parallelize(sampled_rdd, numSlices=5)
# 计算每个分区的数据和
partition_sums = sampled_rdd.mapPartitions(lambda partition: [sum(partition)])
# 对分区的和降序排序
sorted_sums = partition_sums.sortBy(lambda x: -x).collect()
print(sorted_sums)
5. 计算平均成绩
- 计算平均成绩,数据形式包含名字、科目和分数: ('mike',‘math', 96) ...
from pyspark import SparkContext
sc = SparkContext("local", "AverageScore")
# 模拟数据
data = [
('mike', 'math', 96),
('mike', 'english', 88),
('lily', 'math', 78),
('lily', 'english', 85),
('john', 'math', 90),
('john', 'english', 75)
]
rdd = sc.parallelize(data)
# 计算平均成绩
average_rdd = (
rdd.map(lambda x: (x[0], (x[2], 1))) # 转换为 (name, (score, 1))
.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) # 累加分数和计数
.mapValues(lambda x: x[0] / x[1]) # 计算平均值
)
print(average_rdd.collect())
6. 计算历史温度最高的两个月
- 计算历史温度最高的两个月,数据来自文件,文件中每一行内容存放的是日期和温度,用空格分开( 2004-4-1 39)
from pyspark import SparkContext
sc = SparkContext("local", "MaxTemperatureMonths")
# 模拟数据:日期和温度
data = [
"2004-01-01 15",
"2004-01-15 30",
"2004-02-10 25",
"2004-02-20 40",
"2004-03-01 45",
"2004-03-10 50"
]
rdd = sc.parallelize(data)
# 提取月份和温度
def extract_month_temp(line):
date, temp = line.split()
month = "-".join(date.split("-")[:2]) # 提取月份 (2004-01)
return (month, int(temp))
# 计算每个月的最高温度
month_max_temp = (
rdd.map(extract_month_temp)
.reduceByKey(max) # 每个月的最高温度
.sortBy(lambda x: -x[1]) # 降序排序
)
# 获取最高温度的前两个月
top2_months = month_max_temp.take(2)
print(top2_months)
7. 统计PV(页面访问量)
- 通过Nginx日志统计站点页面访问量,要求:地址中如果是 /sys/user/list?page=5 形式的去掉?及后面的内容,转换成 /sys/user/list ,并统计访问量最高的10个页面
from pyspark import SparkContext
import re
sc = SparkContext("local", "PageViewStats")
# 模拟Nginx日志数据
data = [
'123.161.202.231 - - [11/Oct/2024:11:47:47 +0800] "GET /api/crm/customer/list?page=5 HTTP/2.0"',
'123.161.202.231 - - [11/Oct/2024:11:47:48 +0800] "GET /api/crm/customer/is_online?t=172 HTTP/2.0"',
'123.161.202.231 - - [11/Oct/2024:11:47:50 +0800] "GET /sys/user/list?page=2 HTTP/2.0"',
'123.161.202.231 - - [11/Oct/2024:11:47:51 +0800] "GET /sys/user/list?page=3 HTTP/2.0"'
]
rdd = sc.parallelize(data)
# 提取URL并去掉?后的内容
def extract_page(line):
match = re.search(r'GET\s(\S+)', line)
if match:
url = match.group(1).split('?')[0]
return url
return None
# 统计访问量
page_counts = (
rdd.map(extract_page)
.filter(lambda x: x is not None)
.map(lambda x: (x, 1))
.reduceByKey(lambda a, b: a + b)
.sortBy(lambda x: -x[1]) # 降序排序
)
# 获取访问量最高的10个页面
top_pages = page_counts.take(10)
print(top_pages)
8. 统计UV(用户访问量)
- 统计UV,通过Nginx日志统计站点用户访问量,用户访问多次相同地址视为1次访问,最终统计放量最多的10个用户
from pyspark import SparkContext
import re
sc = SparkContext("local", "UserVisitStats")
# 模拟Nginx日志数据
data = [
'123.161.202.231 - - [11/Oct/2024:11:47:47 +0800] "GET /api/crm/customer/is_online?t=172 HTTP/2.0"',
'123.161.202.231 - - [11/Oct/2024:11:47:48 +0800] "GET /api/crm/customer/is_online?t=173 HTTP/2.0"',
'456.161.202.232 - - [11/Oct/2024:11:47:50 +0800] "GET /sys/user/list?page=2 HTTP/2.0"',
'456.161.202.232 - - [11/Oct/2024:11:47:51 +0800] "GET /sys/user/list?page=2 HTTP/2.0"'
]
rdd = sc.parallelize(data)
# 提取用户IP和页面URL
def extract_user_page(line):
match = re.search(r'^(\S+).+GET\s(\S+)', line)
if match:
ip = match.group(1) # 用户IP
url = match.group(2).split('?')[0] # 去掉?后的内容
return (ip, url)
return None
# 统计UV
uv_counts = (
rdd.map(extract_user_page)
.filter(lambda x: x is not None)
.distinct() # 去重,同一用户访问相同页面视为1次
.map(lambda x: (x[0], 1)) # 以用户IP为key
.reduceByKey(lambda a, b: a + b)
.sortBy(lambda x: -x[1]) # 降序排序
)
# 获取访问量最多的10个用户
top_users = uv_counts.take(10)
print(top_users)
9. 访问记录分析
http://math.lynu.edu.cn/zhangsan 其中math表示科目, 张三表示教师
编写Spark程序统计下面的问题:
-
各个科目所有教师访问量Top5的
-
每个科目访问量Top3的教师
-
访问量最高的5个科目
-
访问量最高的5个教师
from pyspark import SparkContext
import re
sc = SparkContext("local", "AccessLogAnalysis")
# 模拟数据
data = [
"http://math.lynu.edu.cn/zhangsan",
"http://math.lynu.edu.cn/lisi",
"http://physics.lynu.edu.cn/zhangsan",
"http://math.lynu.edu.cn/zhangsan",
"http://cs.lynu.edu.cn/wangwu",
"http://cs.lynu.edu.cn/lisi",
"http://math.lynu.edu.cn/lisi",
"http://physics.lynu.edu.cn/wangwu",
"http://physics.lynu.edu.cn/zhangsan",
"http://math.lynu.edu.cn/wangwu",
]
rdd = sc.parallelize(data)
# 提取科目和教师
def extract_subject_teacher(url):
match = re.match(r"http://(\w+)\.lynu\.edu\.cn/(\w+)", url)
if match:
subject, teacher = match.groups()
return (subject, teacher)
return None
subject_teacher_rdd = rdd.map(extract_subject_teacher).filter(lambda x: x is not None)
# 1. 各个科目所有教师访问量Top5
subject_teacher_count = subject_teacher_rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(lambda a, b: a + b)
subject_teacher_top5 = (
subject_teacher_count.map(lambda x: (x[0][0], (x[0][1], x[1])))
.groupByKey()
.mapValues(lambda teachers: sorted(teachers, key=lambda t: -t[1])[:5])
)
print("各个科目所有教师访问量Top5:")
print(subject_teacher_top5.collect())
# 2. 每个科目访问量Top3的教师
subject_teacher_top3 = (
subject_teacher_count.map(lambda x: (x[0][0], (x[0][1], x[1])))
.groupByKey()
.mapValues(lambda teachers: sorted(teachers, key=lambda t: -t[1])[:3])
)
print("每个科目访问量Top3的教师:")
print(subject_teacher_top3.collect())
# 3. 访问量最高的5个科目
subject_count = subject_teacher_rdd.map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)
top5_subjects = subject_count.sortBy(lambda x: -x[1]).take(5)
print("访问量最高的5个科目:", top5_subjects)
# 4. 访问量最高的5个教师
teacher_count = subject_teacher_rdd.map(lambda x: (x[1], 1)).reduceByKey(lambda a, b: a + b)
top5_teachers = teacher_count.sortBy(lambda x: -x[1]).take(5)
print("访问量最高的5个教师:", top5_teachers)
10. 成绩分析
有文本文件数据源格式如下:(姓名,科目,分数)
Tom,Database,80
Jim,Database,90
Tom,DataStructure,80
Jim,DataStructure,97
编程实现以下统计结果
-
总共有多少名学生
-
该学校开始了多少们课程
-
Tom总成绩平均分
-
每名同学选修的课程门数
-
DataStructure课程有多少人选修
-
各门课程的平均分是多少
-
每位学生的总成绩
from pyspark import SparkContext
sc = SparkContext("local", "ScoreAnalysis")
# 模拟数据 (姓名, 科目, 分数)
data = [
("Tom", "Database", 80),
("Jim", "Database", 90),
("Tom", "DataStructure", 80),
("Jim", "DataStructure", 97),
("Ann", "Database", 85),
("Ann", "DataStructure", 88),
("Tom", "Math", 70),
("Jim", "Math", 95),
("Ann", "Math", 82)
]
rdd = sc.parallelize(data)
# 1. 总共有多少名学生
students = rdd.map(lambda x: x[0]).distinct().count()
print("总共有多少名学生:", students)
# 2. 学校开设了多少门课程
courses = rdd.map(lambda x: x[1]).distinct().count()
print("学校开设了多少门课程:", courses)
# 3. Tom 总成绩和平均分
tom_scores = rdd.filter(lambda x: x[0] == "Tom").map(lambda x: x[2])
tom_total = tom_scores.sum()
tom_avg = tom_total / tom_scores.count()
print("Tom的总成绩:", tom_total, "平均分:", tom_avg)
# 4. 每名同学选修的课程门数
student_courses = rdd.map(lambda x: (x[0], x[1])).distinct().groupByKey().mapValues(len)
print("每名同学选修的课程门数:", student_courses.collect())
# 5. DataStructure课程有多少人选修
ds_students = rdd.filter(lambda x: x[1] == "DataStructure").map(lambda x: x[0]).distinct().count()
print("DataStructure课程选修人数:", ds_students)
# 6. 各门课程的平均分
course_avg = rdd.map(lambda x: (x[1], (x[2], 1))) \
.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
.mapValues(lambda x: x[0] / x[1])
print("各门课程的平均分:", course_avg.collect())
# 7. 每位学生的总成绩
student_total_scores = rdd.map(lambda x: (x[0], x[2])).reduceByKey(lambda a, b: a + b)
print("每位学生的总成绩:", student_total_scores.collect())
SparkSQL
1. DSL风格实现 wordcount程序
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# 初始化SparkSession
spark = SparkSession.builder.master("local").appName("WordCount").getOrCreate()
# 模拟数据
data = ["hello world", "hello spark", "word count example", "hello world spark"]
# 将RDD转换为DataFrame
rdd = spark.sparkContext.parallelize(data)
# 将字符串拆分为单词,并转换成单列元组
words_rdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda word: (word,))
df = spark.createDataFrame(words_rdd, ["word"])
# 使用DSL风格计算单词计数
word_count = (
df.groupBy("word") # 按照 word 分组
.count() # 统计每个单词的数量
.orderBy(col("count").desc()) # 按照count降序排序
)
print("WordCount using DSL:")
word_count.show()
2. SQL风格实现 wordcount程序
from pyspark.sql import SparkSession
# 初始化SparkSession
spark = SparkSession.builder.master("local").appName("WordCountSQL").getOrCreate()
# 模拟数据
data = ["hello world", "hello spark", "word count example", "hello world spark"]
# 将字符串拆分为单词,并转换成单列元组
rdd = spark.sparkContext.parallelize(data)
words_rdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda word: (word,))
df = spark.createDataFrame(words_rdd, ["word"])
# 注册临时视图
df.createOrReplaceTempView("words")
# 使用SQL查询实现WordCount
word_count_sql = spark.sql("""
SELECT word, COUNT(*) as count
FROM words
GROUP BY word
ORDER BY count DESC
""")
print("WordCount using SQL:")
word_count_sql.show()
3. 电影数据分析
数据格式userId::movieId::rate::ts
1::1193::5::978300760
-
电影平均分
-
每个用户评分的平均、最高、最低分
-
查询评分超过100次的电影和平均分和排名top10
from pyspark.sql import SparkSession
import findspark
from pyspark.sql.types import *
from pyspark.sql import functions as F
findspark.init()
spark = SparkSession.builder.\
appName("Sparkwork").\
master("local[*]").\
getOrCreate()
#数据结构
schema = StructType() \
.add("userId", StringType())\
.add("movieId", StringType())\
.add("rate", IntegerType())\
.add("ts", StringType())
#读数据
df = spark.read.format('csv') \
.option("sep", "::") \
.option("header", False)\
.option("encoding", "utf-8")\
.schema(schema=schema)\
.load("./ratings.dat")
df.show(5)
df.createOrReplaceTempView("ratings")
# 1 电影平均分
# SQL风格
spark.sql("""
select movieId ,
round(avg(rate),2) as avg_rate
from ratings
group by movieId
order by movieId
""").show(5)
# DSL风格
df.groupBy("movieId")\
.avg("rate")\
.withColumnRenamed("avg(rate)", "avg_rate")\
.withColumn('avg_rate', F.round("avg_rate",2))\
.orderBy("movieId", ascending=True)\
.show(5)
# 2 每个用户评分的平均、最高、最低分
# SQL风格
spark.sql("""
select userId ,
round(avg(rate),2) as avg_rate ,
max(rate) as max_rate ,
min(rate) as min_rate
from ratings
group by userId
order by userId
""").show(5)
# DSL风格
df.groupBy("userId")\
.agg(
F.round(F.avg("rate"), 2).alias("avg_rate"),
F.max("rate").alias("max_rate"),
F.min("rate").alias("min_rate")
)\
.orderBy("userId", ascending=True)\
.show(5)
# 3 查询评分超过100次的电影和平均分和排名top10
# SQL风格
spark.sql("""
select userId ,
round(avg(rate),2) as avg_rate ,
count(*) as count
from ratings
group by userId
HAVING COUNT(*) > 100
order by avg_rate DESC
""").show(5)
# DSL风格
df.groupBy("userId") \
.agg(
F.round(F.avg("rate"), 2).alias("avg_rate"),
F.count("*").alias("count")
) \
.filter(F.col("count") > 100)\
.orderBy(F.col("avg_rate").desc())\
.show(5)
4. 使用Python语言编写Spark程序,使用DataFrame API实现下列要求,数据来自于一个CSV文件GCB2022v27_MtCO2_flat.csv,统计结果保存到CSV格式文件中。
-
清洗数据,过滤掉某个国家某年数据全为空的行,并将所有空字段替换为0。
-
使用SQL和DSL两种风格API编写程序统计2000年-2023年碳排放总量最高的10个国家。
-
使用SQL和DSL两种风格API编写程序统计2021年碳排放增量同比(2020年)最高的10个国家。
from pyspark.sql import SparkSession
import findspark
from pyspark.sql.types import *
from pyspark.sql import functions as F
findspark.init()
spark = SparkSession.builder.\
appName("Sparkwork").\
master("local[*]").\
getOrCreate()
schema = StructType([
StructField("Country", StringType(), True), # 国家名称
StructField("ISO 3166-1 alpha-3", StringType(), True), # ISO 三字母国家代码
StructField("Year", IntegerType(), True), # 数据记录的年份
StructField("Total", DoubleType(), True), # 总排放量
StructField("Coal", DoubleType(), True), # 煤炭燃烧的排放量
StructField("Oil", DoubleType(), True), # 石油燃烧的排放量
StructField("Gas", DoubleType(), True), # 天然气燃烧的排放量
StructField("Cement", DoubleType(), True), # 水泥工业相关的排放量
StructField("Flaring", DoubleType(), True), # 天然气燃烧(火炬燃烧)的排放量
StructField("Other", DoubleType(), True), # 其他来源的排放量
StructField("Per Capita", DoubleType(), True) # 人均排放量
])
df = spark.read.format('csv') \
.option("sep", ",") \
.option("header", True)\
.option("encoding", "utf-8")\
.schema(schema=schema)\
.load("./GCB2022v27_MtCO2_flat.csv")
df.createOrReplaceTempView("MtCO2")
# 1. 清洗数据,过滤掉某个国家某年数据全为空的行,并将所有空字段替换为0。
# SQL风格
df_MtCO2 = spark.sql(
"""
SELECT
Country,
`ISO 3166-1 alpha-3`,
Year,
COALESCE(Total, 0) AS Total,
COALESCE(Coal, 0) AS Coal,
COALESCE(Oil, 0) AS Oil,
COALESCE(Gas, 0) AS Gas,
COALESCE(Cement, 0) AS Cement,
COALESCE(Flaring, 0) AS Flaring,
COALESCE(Other, 0) AS Other,
COALESCE(`Per Capita`, 0) AS `Per Capita`
FROM emission_data
WHERE NOT (
Total IS NULL AND
Coal IS NULL AND
Oil IS NULL AND
Gas IS NULL AND
Cement IS NULL AND
Flaring IS NULL AND
Other IS NULL AND
`Per Capita` IS NULL
)
"""
)
# DSL风格
df = df.filter(
~(
col("Total").isNull() &
col("Coal").isNull() &
col("Oil").isNull() &
col("Gas").isNull() &
col("Cement").isNull() &
col("Flaring").isNull() &
col("Other").isNull() &
col("Per Capita").isNull()
)
)
df = df.fillna(0)
df_MtCO2.createOrReplaceTempView("MtCO2")
# 2. 使用SQL和DSL两种风格API编写程序统计2000年-2023年碳排放总量最高的10个国家。
# SQL风格
spark.sql("""
select
Country,
max(Total) as Total
from MtCO2
where Year >= 2000 and Year <= 2023
group by Country
order by Total desc
""").show(10)
# DSL风格
df_ = df.where("Year >= 2000 and Year <= 2023 ").\
groupBy("Country").\
agg(F.max("Total").alias("Total")).\
orderBy("Total" , ascending = False)
df_.coalesce(1).write.mode("overwrite").csv("output/result1", header=True)
# 3. 使用SQL和DSL两种风格API编写程序统计2021年碳排放增量同比(2020年)最高的10个国家。
# SQL风格
spark.sql("""
select
mid1.Country as Country_2020,
mid1.Total as Total_2020,
mid2.Country as Country_2021,
mid2.Total as Total_2021
from
(select
Country,
max(Total) as Total,
Year
from MtCO2
where Year = 2020
group by Country, Year
) as mid1
inner join
(select
Country,
max(Total) as Total,
Year
from MtCO2
where Year = 2021
group by Country, Year
) as mid2
on mid1.Country = mid2.Country
order by Total_2020 desc
""").show(10)
# DSL风格
mid1 = df.filter(F.col("Year") == 2020)\
.groupBy("Country", "Year")\
.agg(F.max("Total").alias("Total_2020"))\
.alias("mid1")
mid2 =df.filter(F.col("Year") == 2021)\
.groupBy("Country", "Year")\
.agg(F.max("Total").alias("Total_2021"))\
.alias("mid2")
df_ = mid1.join(mid2, F.col("mid1.Country") == F.col("mid2.Country"), "inner")\
.select(
F.col("mid1.Country").alias("Country_2020"),
F.col("mid1.Total_2020"),
F.col("mid2.Country").alias("Country_2021"),
F.col("mid2.Total_2021"),
)\
.orderBy(F.col("Total_2020").desc())
df_.coalesce(1).write.mode("overwrite").csv("output/result2", header=True)
5. 使用Python语言编写Spark程序,使用DataFrame API实现下列要求,数据来自于一个文本文件movie.txt,统计结果保存到CSV格式文件中。
影评数据表格式说明:
UserID::MovieID::Rating::Timestamp
-
清洗数据,过滤掉某个用户评价数据全为空(MovieID等于0)的行。
-
使用SQL和DSL两种风格API编写程序统计超过平均分的电影的数量。
-
使用SQL和DSL两种风格API编写程序统计给低分电影(分数小于3)打分次数最多的10个用户。
from pyspark.sql import SparkSession
import findspark
from pyspark.sql.types import *
from pyspark.sql import functions as F
findspark.init()
spark = SparkSession.builder.\
appName("Sparkwork").\
master("local[*]").\
getOrCreate()
schema = StructType([
StrutField("UserID", IntegerType(), True),
StructField("MovieID", IntegerType(), True),
StructField("Rating", IntegerType), True),
StructField("Timestamp", IntegerType(), True),
])
df = spark.read.format('csv') \
.option("sep", "::") \
.option("header", False)\
.option("encoding", "utf-8")\
.schema(schema=schema)\
.load("./ratings.dat")\
.alias("rating")
df.createOrReplaceTempView("ratings")
# 1. 清洗数据,过滤掉某个用户评价数据全为空(MovieID等于0)的行
# SQL 风格
df_ = spark.sql("""
SELECT *
FROM ratings
WHERE MovieID != 0
""")
# DSL 风格
df = df.filter(F.col("MovieID") != 0)
df_.createOrReplaceTempView("rating")
# 2. 使用SQL和DSL两种风格API编写程序统计超过平均分的电影的数量。
# SQL 风格
spark.sql("""
select
count(mid2.MovieID) as `超过平均分的电影的数量`
from
(select
MovieID,
round(avg(Rating),2) as Rating
from
rating
group by
MovieID
) as mid2
join
(select
round(avg(Rating),2) as Rating
from
rating
) as mid
on mid2.Rating > mid.Rating
""").show()
# DSL 风格
mid1 = df.select(F.round(F.avg("Rating"), 2)\
.alias("Rating")).alias("mid1")
mid2 = df.select("MovieID","Rating")\
.groupBy("MovieID").agg(F.round(F.avg("Rating"), 2)\
.alias("Rating")).alias("mid2")
df_ = mid1.join(mid2, F.col("mid1.Rating") < F.col("mid2.Rating"), "inner")\
.select(
F.count("mid2.MovieID").alias("超过平均分的电影的数量")
)
df_.coalesce(1).write.mode("overwrite").csv("output/result1", header=True)
# 3. 使用SQL和DSL两种风格API编写程序统计给低分电影(分数小于3)打分次数最多的10个用户。
# SQL 风格
spark.sql("""
select
UserID , count(rating.MovieID) as `低分电影数`
from
rating
join
(select
MovieID,
round(avg(Rating),2) as Rating
from
rating
group by
MovieID
) as mid2
on
mid2.Rating < 3
and
mid2.MovieID = rating.MovieID
group by
UserID
order by `低分电影数` desc
""").show(10)
# DSL 风格
mid1 = df.select("MovieID","Rating")\
.groupBy("MovieID").agg(F.round(F.avg("Rating"),2)\
.alias("Rating")).alias("mid1")
df_ = df.join(
mid1,
(F.col("mid1.Rating") < 3) & (F.col("mid1.MovieID") == F.col("rating.MovieID"))
).groupBy(
F.col("rating.UserID") # 按 UserID 分组
).agg(
F.count(F.col("rating.MovieID")).alias("超过平均分的电影的数量") # 聚合统计
).orderBy(
"超过平均分的电影的数量" , ascending = False
).select(
"UserID" , "超过平均分的电影的数量"
)
df_.coalesce(1).write.mode("overwrite").csv("output/result2", header=True)
Spark Streaming
1. (DStream)使用SparkStream实现词频统计,数据源采用socket方式
import findspark
findspark.init()
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName("Text")
conf.setMaster('local[*]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc,5)
ssc.checkpoint("./")
lines = ssc.socketTextStream("127.0.0.1",6666)
words = lines.flatMap(lambda x: x.split(' '))\
.map(lambda x:(x,1))\
.reduceByKeyAndWindow(
lambda x,y:x+y,
lambda x,y:x-y,
windowDuration = 20,
slideDuration = 10
)
words.pprint()
ssc.start()
ssc.awaitTermination();
2. (Structured Streaming)使用SparkStream实现词频统计,数据源采用socket方式
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, window, to_timestamp
# 创建 SparkSession
spark = SparkSession.builder \
.appName("Spark Streaming WordCount") \
.getOrCreate()
# 从 socket 输入读取数据 (假设主机为 localhost,端口为 9999)
socket_stream = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# 将输入数据按 "time:word" 格式解析
parsed_stream = socket_stream \
.select(split(col("value"), ",").alias("parts")) \
.selectExpr("parts[0] as time", "parts[1] as word") \
.withColumn("timestamp", to_timestamp(col("time"), "yyyy-MM-dd HH:mm:ss"))
# 定义 Watermark 和窗口
windowed_counts = parsed_stream \
.withWatermark("timestamp", "20 seconds") \
.groupBy(
window(col("timestamp"), "1 minute", "30 seconds"), # 窗口大小 1 分钟,滑动间隔 30 秒
col("word")
).count()
# 输出到控制台
query = windowed_counts.writeStream \
.outputMode("update") \
.format("updata") \
.option("truncate", "false") \
.trigger(processingTime = '8 seconds')\
.start()
# 等待查询结束
query.awaitTermination()