DataFrame 的组成
DataFrame 是 Spark 中的一个重要的数据抽象,类似于传统的二维表格结构。DataFrame 中的数据以行和列的形式组织,表格结构包含以下三个关键部分:
- 行:数据的实际记录。
- 列:表示数据的属性。
- 表结构描述:描述表的元数据,比如列的名称、数据类型等。
以 MySQL 中的表为例:
- 表由多个 行 组成。
- 表的数据也被分成多个 列。
- 表有表结构信息,包含列、列名、列类型、列约束等。
DataFrame 组成的具体结构
基于以上前提,DataFrame 的组成分为 结构层面 和 数据层面:
结构层面
- StructType 对象:用于描述整个 DataFrame 的表结构(即表的模式信息)。它包含了所有列的信息。
- StructField 对象:用于描述 DataFrame 中一个列的信息。它包含列的名称、数据类型、是否允许空值等信息。
例如,使用 StructType
和 StructField
定义一个表结构:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
struct_type = StructType() \
.add('id', IntegerType(), False) \
.add('name', StringType(), True) \
.add('age', IntegerType(), False)
上述代码定义了一个 StructType
对象,其中包含了三个 StructField
:
- 列名为
id
,类型为IntegerType
,不允许为空。 - 列名为
name
,类型为StringType
,允许为空。 - 列名为
age
,类型为IntegerType
,不允许为空。
多个 StructField
对象构成一个 StructType
对象,从而描述整个 DataFrame 的表结构,包括列的名称、类型以及每个列是否允许为空。
数据层面
- Row 对象:用于记录 DataFrame 中的一行数据。每个
Row
对象表示一条记录。 例如:Row(1, '张三', 11)
表示一行数据,其中包含id
为 1,name
为'张三'
,age
为 11。 - Column 对象:用于记录 DataFrame 中的一列数据,同时包含列的相关信息,如列名和列的数据类型。
结构与数据的关系
- StructType 和 StructField:用于描述 DataFrame 的结构信息(即元数据),类似于数据库表的表结构。
- Row 和 Column:
Row
表示一行数据,是 DataFrame 中的基本数据单元。Column
表示一列数据,并且包含了列的元数据,例如列名和数据类型等。
Row 和 Column、StructType、StructField 的使用是 DataFrame 编程中的重要组成部分,后续的编程阶段会频繁接触它们。
总结
- DataFrame 是一个类似于关系型数据库中表的数据结构,它有 行、列 和 表结构描述。
- 通过 StructType 和 StructField 可以定义和描述 DataFrame 的表结构(Schema)。
- Row 对象表示一行数据,而 Column 对象表示一列数据。
DataFrame 的代码构建
以下是对 DataFrame 的代码构建 以及将 RDD 转换为 DataFrame 的整理和补充:
DataFrame 的代码构建
RDD 转换为 DataFrame 的三种方式
RDD(Resilient Distributed Dataset)和 DataFrame 都是 Spark 中的分布式数据抽象,但 DataFrame 提供了更高层次的结构化数据操作,类似于关系数据库中的二维表格结构。将 RDD 转换为 DataFrame 是 Spark 处理数据的一种常见需求。以下介绍三种将 RDD 转换为 DataFrame 的方式:
方式 1:使用 SparkSession 的 createDataFrame() 方法
使用 SparkSession 的 createDataFrame()
方法是最常见的方式。这种方法简单直接,适合数据结构不复杂的情况。
示例代码
from pyspark.sql import SparkSession
# 创建 Spark 会话
spark = SparkSession.builder \
.appName("DataFrame Conversion") \
.master("local[*]") \
.getOrCreate()
# 通过 SparkContext 读取数据并转换为 RDD
rdd = spark.sparkContext.textFile("hdfs://mycluster/sparkdata/student_info.txt") \
.map(lambda x: x.split(',')) \
.map(lambda x: (x[1], int(x[2])))
# 使用 SparkSession 的 createDataFrame() 方法将 RDD 转换为 DataFrame
df = spark.createDataFrame(rdd, schema=['name', 'age'])
# 打印表结构
df.printSchema()
# 打印数据
df.show(20, False)
# 将 DataFrame 转为临时视图表,以便使用 SQL 查询
df.createOrReplaceTempView('student')
# 执行 SQL 查询
spark.sql('SELECT * FROM student WHERE name = "Student_11"').show()
方式 2:通过 StructType 对象定义 Schema
这种方式通过 StructType
对象显式定义 DataFrame 的结构(Schema),可以定义列名、数据类型、是否允许为空等属性,灵活控制 DataFrame 的结构。
示例代码
from pyspark.sql.types import StructType, StringType, IntegerType
# 创建 RDD
rdd = spark.sparkContext.textFile("hdfs://mycluster/sparkdata/student_info.txt") \
.map(lambda x: x.split(',')) \
.map(lambda x: (x[1], int(x[2])))
# 定义 Schema(表结构)
schema = StructType() \
.add('name', StringType(), nullable=True) \
.add('age', IntegerType(), nullable=False)
# 使用 SparkSession 的 createDataFrame() 方法将 RDD 转换为 DataFrame,并指定 Schema
df = spark.createDataFrame(rdd, schema=schema)
# 打印表结构
df.printSchema()
方式 3:使用 RDD 的 toDF() 方法
这种方式通过 RDD 的 toDF()
方法快速将 RDD 转换为 DataFrame。需要注意的是,这种方式需要导入 SparkSession
,并且数据类型是从 RDD 中自动推断的。
示例代码
# 创建 RDD
rdd = spark.sparkContext.textFile("hdfs://mycluster/sparkdata/student_info.txt") \
.map(lambda x: x.split(',')) \
.map(lambda x: (x[1], int(x[2])))
# 使用 RDD 的 toDF() 方法将 RDD 转换为 DataFrame,并指定列名
df = rdd.toDF(['name', 'age'])
# 打印表结构
df.printSchema()
# 显示数据
df.show()
总结
-
方式 1:使用 createDataFrame() 方法:
- 通过
spark.createDataFrame()
将 RDD 转换为 DataFrame,传入列名即可,数据类型由 Spark 自动推断。 - 简单快捷,适合数据结构不复杂的场景。
- 通过
-
方式 2:使用 StructType 定义 Schema:
- 使用
StructType
和StructField
显式定义 DataFrame 的表结构。 - 更灵活,适合对数据结构有更严格控制的场景,例如需要明确列的数据类型和约束时。
- 使用
-
方式 3:使用 RDD 的 toDF() 方法:
- 使用 RDD 自带的
toDF()
方法快速将 RDD 转换为 DataFrame。 - 适合列名简单且无需明确定义数据类型的场景。
- 使用 RDD 自带的
选择合适的方式:具体使用哪种方式取决于项目的需求和数据的复杂程度。如果数据结构较为简单,可以直接使用 createDataFrame()
或 toDF()
。如果需要严格控制表结构,建议使用 StructType
来显式定义 Schema。无论哪种方式,DataFrame 的引入使得大规模数据的操作和分析更加直观、高效。
以下是对 将 Pandas DataFrame 转换为 Spark DataFrame 的整理和补充:
DataFrame 的代码构建 - 基于 Pandas 的 DataFrame
在大数据处理场景中,可能需要将 Pandas DataFrame 转换为分布式计算的 Spark SQL DataFrame,以便能够利用集群的计算能力处理大规模数据集。Spark 提供了方法将本地 Pandas 的 DataFrame 转换为 Spark 的 DataFrame。
以下代码示例展示了如何将一个 Pandas DataFrame 对象转换为 Spark DataFrame:
import pandas as pd
from pyspark.sql import SparkSession
# 创建 Spark 会话
spark = SparkSession.builder \
.appName("Pandas to Spark DataFrame") \
.master("local[*]") \
.getOrCreate()
# 创建 Pandas DataFrame
pdf = pd.DataFrame({
'id': [1, 2, 3],
'name': ['xxx', 'yyy', 'zzz'],
'age': [12, 11, 10]
})
# 使用 Spark 的 createDataFrame() 方法将 Pandas DataFrame 转换为 Spark DataFrame
df = spark.createDataFrame(pdf)
# 打印 Spark DataFrame 的 Schema
df.printSchema()
# 打印 Spark DataFrame 的数据
df.show()
以下是对 Spark DataFrame 的外部数据读取 的整理和补充,包括 spark.read.format()
方法支持的多种读取类型:
DataFrame 的代码构建 - 读取外部数据
Spark SQL 提供了统一的 API 来读取和加载各种外部数据源以构建 DataFrame,如 text
、csv
、json
、parquet
等。通过 spark.read.format()
方法可以使用不同的数据格式构建 DataFrame。
1. 读取 Text 文件
可以通过 Spark SQL 读取文本文件,并构建 DataFrame。以下是读取 Text 文件的示例:
from pyspark.sql.types import StructType, StringType
# 定义 Schema
schema = StructType().add('data', StringType(), nullable=True)
# 读取 text 文件
df = spark.read.format('text') \
.schema(schema=schema) \
.load("hdfs://mycluster/sparkdata/student_info.txt")
# 打印表结构和数据
df.printSchema()
df.show()
Note:读取
text
文件时,默认将每一行读取为一条记录,列名默认为value
,数据类型为string
。
2. 读取 CSV 文件
Spark 提供了对 CSV 文件的读取支持,可以指定选项如是否有表头、分隔符等。
df_csv = spark.read.format('csv') \
.option('header', 'true') \
.option('inferSchema', 'true') \
.option('encoding','utf-8')\
.option('name STRING , age INT , job STRING')
.load("hdfs://mycluster/sparkdata/student_info.csv")
df_csv.printSchema()
df_csv.show()
- 选项:
- header:是否包含表头(默认为
'false'
)。 - inferSchema:是否推断数据类型(默认为
'false'
)。
- header:是否包含表头(默认为
3. 读取 JSON 文件
Spark 也可以读取 JSON 格式的文件,适用于半结构化的数据。
df_json = spark.read.format('json') \
.load("hdfs://mycluster/sparkdata/student_info.json")
# JSON 类型一般不用写.schema,jsom自带,json带有列名和列类型(字符串和数字)
df_json.printSchema()
df_json.show()
- 选项:
- 可以使用
.option()
来指定读取的行为,例如是否允许多行(multiline
)等。
- 可以使用
JSON 类型一般不用写.schema,jsom自带,json带有列名和列类型(字符串和数字)
4. 读取 Parquet 文件
Parquet 是 Spark 默认推荐的文件格式,因其压缩性能好且读取效率高,适用于大规模数据处理。
df_parquet = spark.read.format('parquet') \
.load("hdfs://mycluster/sparkdata/student_info.parquet")
df_parquet.printSchema()
df_parquet.show()
- Parquet 文件是列式存储格式,适合进行快速查询和分析。
5. 读取 ORC 文件
ORC 是另一种列式存储格式,与 Parquet 类似,通常用于 Hive 数据仓库。
df_orc = spark.read.format('orc') \
.load("hdfs://mycluster/sparkdata/student_info.orc")
df_orc.printSchema()
df_orc.show()
6. 读取 Avro 文件
Avro 是另一种常用的序列化格式,尤其适用于 Hadoop 生态中的数据交换。
df_avro = spark.read.format('avro') \
.load("hdfs://mycluster/sparkdata/student_info.avro")
df_avro.printSchema()
df_avro.show()
Note:默认情况下,读取 Avro 文件需要添加相应的 Avro 包。
7. 读取 JDBC 数据源
可以通过 JDBC 连接到关系型数据库(如 MySQL、PostgreSQL 等)读取数据。
df_jdbc = spark.read.format('jdbc') \
.option('url', 'jdbc:mysql://localhost:3306/mydb') \
.option('dbtable', 'student_info') \
.option('user', 'root') \
.option('password', 'password') \
.load()
df_jdbc.printSchema()
df_jdbc.show()
- 选项:
- url:数据库连接字符串。
- dbtable:要读取的数据库表。
- user 和 password:数据库用户名和密码。
总结
Spark SQL 提供了统一的 spark.read.format()
API 来读取多种不同格式的数据源,并生成 DataFrame。常见的读取类型包括:
- Text 文件:适合读取纯文本数据。
- CSV 文件:适合结构化的文本数据。
- JSON 文件:适合半结构化的数据。
- Parquet 文件:适合大规模数据的高效存储和查询。
- ORC 文件:通常用于 Hive 数据仓库中的列式存储格式。
- Avro 文件:常用于 Hadoop 环境中的数据交换。
- JDBC 数据源:通过 JDBC 连接数据库进行读取。
DataFrame 的操作
DataFrame 支持两种风格的编程,分别是:
- DSL 风格
- SQL 风格
1. DSL 风格
DSL 称为领域特定语言(Domain-Specific Language),用于特定领域的操作。在 Spark 中,DSL 风格即通过调用 DataFrame 提供的 API 来操作数据。
DSL 风格特点:
- 以调用 API 的方式来处理数据。
- 常用于进行数据的过滤、选择、分组等操作。
示例:
df.where("age > 18").limit(10).show()
2. SQL 风格
SQL 风格是通过编写标准的 SQL 语句来对 DataFrame 进行数据处理,类似于数据库中的 SQL 操作。
SQL 风格特点:
- 使用 SQL 语句处理 DataFrame 中的数据。
- 可以方便地对数据进行查询、聚合、排序等操作。
示例:
spark.sql("SELECT * FROM student WHERE age > 18").show()
总结:
- DSL 风格:通过调用特定 API 操作数据,如
df.where().limit()
。 - SQL 风格:通过编写 SQL 语句来处理 DataFrame,如
spark.sql("SELECT * FROM ...")
。
两种风格各有优劣,开发者可以根据个人习惯和具体的场景选择合适的方式进行数据操作。