SparkSQL读入
在 SparkSQL 中,将数据读取后转为二维表的格式,通常是为了进行结构化的处理或查询。二维表的结构就是常见的表格形式,其中数据被组织成行和列。在 Spark 中,DataFrame 就是一个二维表的表示。每种文件格式(如 Parquet、JSON、ORC、CSV、Text 等)都有不同的读取方式,并且需要根据其特定的结构进行转化为二维表的操作。
下面我们详细讲解从各个数据格式读取数据,并如何将其转为二维表格式,包括必要的规则和常用方法。
1. Parquet 格式
Parquet 是一种列式存储格式,数据天然就是结构化的,因此读取 Parquet 文件时,直接返回的是结构化的二维表(DataFrame)。
读取规则:
- Parquet 文件自带模式(schema),Spark 自动推断数据类型并返回 DataFrame。
- 对于列式存储的数据,列读取性能较好。
- 如果多个 Parquet 文件的 schema 不同,可以使用
mergeSchema
选项合并它们。
读取并转为二维表的示例:
df = spark.read \
.format('parquet') \
.load('hdfs://mycluster/sparkdata/data.parquet')
如何转为二维表:
- 读取后数据已经是 DataFrame(二维表)格式,直接使用即可。
- 可以进一步使用
select()
选择所需的列,或者使用filter()
进行数据筛选。
# 选择部分列并展示
df.select("name", "age", "city").show()
注意事项:
- Parquet 文件由于自带模式,转换为二维表无需额外处理,直接读取后即为表格结构。
2. JSON 格式
JSON 数据常用于存储半结构化的数据,JSON 文件中的数据格式可以非常灵活,但不一定是标准的二维表格式。因此,有时需要对嵌套的 JSON 数据进行展平(Flatten)处理,才能得到标准的二维表。
读取规则:
- 默认情况下,Spark 会自动推断 JSON 文件的 schema。
- 如果 JSON 结构是嵌套的或复杂的,则需要通过 JSON 展平(Flatten)操作将其转换为二维表格式。
读取并转为二维表的示例:
df = spark.read \
.format('json') \
.load('hdfs://mycluster/sparkdata/data.json')
如何转为二维表:
- 普通 JSON(没有嵌套结构),直接读取即可视为二维表。
- 嵌套 JSON,需要对嵌套字段进行展平(Flatten)操作。
# 假设 JSON 数据结构为嵌套的
# {
# "name": "Alice",
# "age": 25,
# "address": {
# "city": "New York",
# "zip": "10001"
# }
# }
# 展平嵌套字段,转换为二维表格式
df = df.select("name", "age", "address.city", "address.zip")
df.show()
注意事项:
- 使用
select()
方法提取嵌套字段,将其转化为二维表的列。 - 可以使用
explode()
方法将数组类型的字段展平为多行。
3. ORC 格式
ORC 和 Parquet 类似,是一种列式存储格式,通常用于 Hive 和 Hadoop 生态系统中。ORC 文件自带 schema,读取后直接是结构化的二维表。
读取规则:
- ORC 文件自带模式,Spark 会自动推断数据类型。
- 数据直接作为 DataFrame 处理,不需要额外的结构化操作。
读取并转为二维表的示例:
df = spark.read \
.format('orc') \
.load('hdfs://mycluster/sparkdata/data.orc')
如何转为二维表:
- 读取后,数据直接是二维表(DataFrame)形式。
- 可以使用
select()
和filter()
操作进一步处理。
df.select("name", "age", "city").show()
注意事项:
- 如果有多个 ORC 文件,且 schema 不一致,可以使用
mergeSchema
选项合并它们。 - ORC 文件的数据是列式存储的,因此查询列的性能很高。
4. CSV 格式
CSV 是一种简单的文本格式,通常用于存储表格数据。读取 CSV 文件时,需要注意指定分隔符、是否包含头部等参数。
读取规则:
- 可以通过
header
选项指定 CSV 文件是否包含列名。 - 通过
inferSchema
可以让 Spark 自动推断列的数据类型,否则所有列都将被读取为字符串。 - 如果 CSV 文件的字段分隔符不是逗号,需要通过
sep
选项指定。
读取并转为二维表的示例:
df = spark.read \
.format('csv') \
.option('header', True) \
.option('sep', ',') \
.option('inferSchema', True) \
.load('hdfs://mycluster/sparkdata/data.csv')
如何转为二维表:
- 读取 CSV 文件后,数据已经是二维表格式。
- 可以根据需求选择或筛选列。
df.select("name", "age", "city").show()
注意事项:
- 如果 CSV 文件没有头部,需要手动指定列名(使用
toDF()
方法)。 - CSV 格式不自带模式(schema),需要通过
inferSchema
或手动指定 schema 来定义列的类型。
5. Text 格式
Text 文件是最简单的文本格式,每一行是一个字符串。Spark 读取文本文件时,默认将每一行作为一个记录,存储在 DataFrame 的 value
列中。要将其转为二维表,需要根据内容进一步处理。
读取规则:
- 每行文本会被作为一条记录,存储在 DataFrame 的
value
列中。 - 如果每行包含多个字段,需使用分隔符对其进行拆分。
读取并转为二维表的示例:
df = spark.read \
.format('text') \
.load('hdfs://mycluster/sparkdata/data.txt')
如何转为二维表:
- 单列文本:直接读取,数据以单列
value
存储。 - 多列文本:如果每行数据包含多个字段,需要使用
split()
将其分解为多列。
# 假设文本文件格式如下:
# Alice,25,New York
# Bob,30,Los Angeles
from pyspark.sql.functions import split
# 将每行文本拆分为多个字段
df = df.withColumn('name', split(df['value'], ',')[0]) \
.withColumn('age', split(df['value'], ',')[1].cast('int')) \
.withColumn('city', split(df['value'], ',')[2])
df.show()
注意事项:
- 对于多字段的文本文件,需要手动拆分并转换为多列的格式。
- 使用
split()
方法对每一行数据进行拆分,并转换为合适的数据类型。
总结:读取文件后如何转为二维表
格式 | 读取规则 | 转为二维表的规则 |
---|---|---|
Parquet | 自动推断模式,列式存储 | 直接是二维表,无需额外处理 |
JSON | 自动推断模式,嵌套数据需要展平 | 使用 select 展平嵌套结构,转换为二维表 |
ORC | 自动推断模式,列式存储 | 直接是二维表,无需额外处理 |
CSV | 通过 header 指定列名,sep 指定分隔符 | 直接是二维表,使用 select 和 filter 进一步处理 |
Text | 每行作为单个记录,默认以 value 列存储 | 使用 split() 拆分每行数据,转化为多列 |
不同格式的数据源读取到 Spark 中后,常常需要额外的操作将其标准化为二维表的形式。对于复杂的结构(如 JSON 和文本文件),需要对数据进行展平或拆分。而像 Parquet、ORC 这样的格式天然是结构化的,可以直接视作二维表使用。