菜单
本页目录

SparkSQL读入

在 SparkSQL 中,将数据读取后转为二维表的格式,通常是为了进行结构化的处理或查询。二维表的结构就是常见的表格形式,其中数据被组织成行和列。在 Spark 中,DataFrame 就是一个二维表的表示。每种文件格式(如 Parquet、JSON、ORC、CSV、Text 等)都有不同的读取方式,并且需要根据其特定的结构进行转化为二维表的操作。

下面我们详细讲解从各个数据格式读取数据,并如何将其转为二维表格式,包括必要的规则和常用方法。


1. Parquet 格式

Parquet 是一种列式存储格式,数据天然就是结构化的,因此读取 Parquet 文件时,直接返回的是结构化的二维表(DataFrame)。

读取规则

  • Parquet 文件自带模式(schema),Spark 自动推断数据类型并返回 DataFrame。
  • 对于列式存储的数据,列读取性能较好。
  • 如果多个 Parquet 文件的 schema 不同,可以使用 mergeSchema 选项合并它们。

读取并转为二维表的示例

df = spark.read \
    .format('parquet') \
    .load('hdfs://mycluster/sparkdata/data.parquet')

如何转为二维表

  • 读取后数据已经是 DataFrame(二维表)格式,直接使用即可。
  • 可以进一步使用 select() 选择所需的列,或者使用 filter() 进行数据筛选。
# 选择部分列并展示
df.select("name", "age", "city").show()

注意事项

  • Parquet 文件由于自带模式,转换为二维表无需额外处理,直接读取后即为表格结构。

2. JSON 格式

JSON 数据常用于存储半结构化的数据,JSON 文件中的数据格式可以非常灵活,但不一定是标准的二维表格式。因此,有时需要对嵌套的 JSON 数据进行展平(Flatten)处理,才能得到标准的二维表。

读取规则

  • 默认情况下,Spark 会自动推断 JSON 文件的 schema。
  • 如果 JSON 结构是嵌套的或复杂的,则需要通过 JSON 展平(Flatten)操作将其转换为二维表格式。

读取并转为二维表的示例

df = spark.read \
    .format('json') \
    .load('hdfs://mycluster/sparkdata/data.json')

如何转为二维表

  1. 普通 JSON(没有嵌套结构),直接读取即可视为二维表。
  2. 嵌套 JSON,需要对嵌套字段进行展平(Flatten)操作。
# 假设 JSON 数据结构为嵌套的
# {
#   "name": "Alice",
#   "age": 25,
#   "address": {
#       "city": "New York",
#       "zip": "10001"
#   }
# }

# 展平嵌套字段,转换为二维表格式
df = df.select("name", "age", "address.city", "address.zip")
df.show()

注意事项

  • 使用 select() 方法提取嵌套字段,将其转化为二维表的列。
  • 可以使用 explode() 方法将数组类型的字段展平为多行。

3. ORC 格式

ORC 和 Parquet 类似,是一种列式存储格式,通常用于 Hive 和 Hadoop 生态系统中。ORC 文件自带 schema,读取后直接是结构化的二维表。

读取规则

  • ORC 文件自带模式,Spark 会自动推断数据类型。
  • 数据直接作为 DataFrame 处理,不需要额外的结构化操作。

读取并转为二维表的示例

df = spark.read \
    .format('orc') \
    .load('hdfs://mycluster/sparkdata/data.orc')

如何转为二维表

  • 读取后,数据直接是二维表(DataFrame)形式。
  • 可以使用 select()filter() 操作进一步处理。
df.select("name", "age", "city").show()

注意事项

  • 如果有多个 ORC 文件,且 schema 不一致,可以使用 mergeSchema 选项合并它们。
  • ORC 文件的数据是列式存储的,因此查询列的性能很高。

4. CSV 格式

CSV 是一种简单的文本格式,通常用于存储表格数据。读取 CSV 文件时,需要注意指定分隔符、是否包含头部等参数。

读取规则

  • 可以通过 header 选项指定 CSV 文件是否包含列名。
  • 通过 inferSchema 可以让 Spark 自动推断列的数据类型,否则所有列都将被读取为字符串。
  • 如果 CSV 文件的字段分隔符不是逗号,需要通过 sep 选项指定。

读取并转为二维表的示例

df = spark.read \
    .format('csv') \
    .option('header', True) \
    .option('sep', ',') \
    .option('inferSchema', True) \
    .load('hdfs://mycluster/sparkdata/data.csv')

如何转为二维表

  • 读取 CSV 文件后,数据已经是二维表格式。
  • 可以根据需求选择或筛选列。
df.select("name", "age", "city").show()

注意事项

  • 如果 CSV 文件没有头部,需要手动指定列名(使用 toDF() 方法)。
  • CSV 格式不自带模式(schema),需要通过 inferSchema 或手动指定 schema 来定义列的类型。

5. Text 格式

Text 文件是最简单的文本格式,每一行是一个字符串。Spark 读取文本文件时,默认将每一行作为一个记录,存储在 DataFrame 的 value 列中。要将其转为二维表,需要根据内容进一步处理。

读取规则

  • 每行文本会被作为一条记录,存储在 DataFrame 的 value 列中。
  • 如果每行包含多个字段,需使用分隔符对其进行拆分。

读取并转为二维表的示例

df = spark.read \
    .format('text') \
    .load('hdfs://mycluster/sparkdata/data.txt')

如何转为二维表

  1. 单列文本:直接读取,数据以单列 value 存储。
  2. 多列文本:如果每行数据包含多个字段,需要使用 split() 将其分解为多列。
# 假设文本文件格式如下:
# Alice,25,New York
# Bob,30,Los Angeles

from pyspark.sql.functions import split

# 将每行文本拆分为多个字段
df = df.withColumn('name', split(df['value'], ',')[0]) \
       .withColumn('age', split(df['value'], ',')[1].cast('int')) \
       .withColumn('city', split(df['value'], ',')[2])
df.show()

注意事项

  • 对于多字段的文本文件,需要手动拆分并转换为多列的格式。
  • 使用 split() 方法对每一行数据进行拆分,并转换为合适的数据类型。

总结:读取文件后如何转为二维表

格式读取规则转为二维表的规则
Parquet自动推断模式,列式存储直接是二维表,无需额外处理
JSON自动推断模式,嵌套数据需要展平使用 select 展平嵌套结构,转换为二维表
ORC自动推断模式,列式存储直接是二维表,无需额外处理
CSV通过 header 指定列名,sep 指定分隔符直接是二维表,使用 selectfilter 进一步处理
Text每行作为单个记录,默认以 value 列存储使用 split() 拆分每行数据,转化为多列

不同格式的数据源读取到 Spark 中后,常常需要额外的操作将其标准化为二维表的形式。对于复杂的结构(如 JSON 和文本文件),需要对数据进行展平或拆分。而像 Parquet、ORC 这样的格式天然是结构化的,可以直接视作二维表使用。