菜单
本页目录

DataFrame通过JDBC读写Mysql

1. 读取 MySQL 数据到 PySpark DataFrame

读取 MySQL 的基本模式

df = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://<host>:<port>/<database>") \
    .option("dbtable", "<table_name>") \
    .option("user", "<username>") \
    .option("password", "<password>") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .load()

详细参数说明:

  • format("jdbc"):指定使用 JDBC 连接器来进行数据库操作。
  • url:MySQL 数据库的 URL,格式为 jdbc:mysql://<host>:<port>/<database>。其中,<host> 是数据库服务器的 IP 地址或主机名,<port> 是 MySQL 的端口号(通常为 3306),<database> 是要连接的数据库名称。
  • dbtable:指定要读取的表或查询,可以是表名(如 "tablename"),也可以是一个 SQL 查询语句(如 "(SELECT * FROM tablename WHERE age > 30) AS temp")。
  • userpassword:指定连接 MySQL 数据库的用户名和密码。
  • driver:指定 MySQL JDBC 驱动类名,MySQL 8.x 版本使用 "com.mysql.cj.jdbc.Driver"

示例:

df = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://47.101.42.60:3306/mybatis") \
    .option("dbtable", "dm_student") \
    .option("user", "") \
    .option("password", "") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .load()

2. 读取 MySQL 表的查询模式

除了读取整个表,还可以指定 SQL 查询语句来进行过滤、选择数据:

df = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://<host>:<port>/<database>") \
    .option("query", "SELECT * FROM <table_name> WHERE age > 30") \
    .option("user", "<username>") \
    .option("password", "<password>") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .load()

示例:

df = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://47.101.42.60:3306/mybatis") \
    .option("query", "SELECT * FROM dm_student WHERE age > 25") \
    .option("user", "") \
    .option("password", "") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .load()

3. 写出 PySpark DataFrame 到 MySQL

在 PySpark 中,写出数据到 MySQL 数据库时,可以使用不同的模式来控制数据如何写入目标表。以下是所有支持的写出模式及其详细说明:

1. overwrite 模式

  • 描述:如果目标表已经存在,则删除该表,然后重新创建一个新表,并将 DataFrame 中的数据写入表中。这个模式会覆盖表中的所有数据。
  • 适用场景:适用于完全替换表中数据的情况,比如需要刷新整个表的数据时。

示例:

df.write.format("jdbc") \
    .option("url", "jdbc:mysql://<host>:<port>/<database>") \
    .option("dbtable", "<table_name>") \
    .option("user", "<username>") \
    .option("password", "<password>") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .mode("overwrite") \
    .save()

2. append 模式

  • 描述:如果目标表已经存在,直接将 DataFrame 中的数据追加到该表的末尾,不会覆盖原有数据。如果目标表不存在,Spark 会自动创建该表。
  • 适用场景:适用于将新数据追加到已有表中的情况,比如每天定时将新记录添加到历史数据表中。

示例:

df.write.format("jdbc") \
    .option("url", "jdbc:mysql://<host>:<port>/<database>") \
    .option("dbtable", "<table_name>") \
    .option("user", "<username>") \
    .option("password", "<password>") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .mode("append") \
    .save()

3. ignore 模式

  • 描述:如果目标表已经存在,则忽略写入操作,不执行任何数据插入。如果表不存在,则创建新表并将数据写入。
  • 适用场景:适用于不想覆盖或追加到已经存在的表中的情况,主要用于防止意外覆盖已有数据。

示例:

df.write.format("jdbc") \
    .option("url", "jdbc:mysql://<host>:<port>/<database>") \
    .option("dbtable", "<table_name>") \
    .option("user", "<username>") \
    .option("password", "<password>") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .mode("ignore") \
    .save()

4. errorerrorifexists 模式

  • 描述:如果目标表已经存在,写入操作将会失败并抛出错误。如果表不存在,则会创建新表并将数据写入。
  • 适用场景:适用于确保不会误覆盖已有表的情况,如果表存在就终止写入并抛出错误,常用于安全性要求较高的场景。

示例:

df.write.format("jdbc") \
    .option("url", "jdbc:mysql://<host>:<port>/<database>") \
    .option("dbtable", "<table_name>") \
    .option("user", "<username>") \
    .option("password", "<password>") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .mode("error") \
    .save()

5. 总结:各模式的适用场景

写出模式描述适用场景
overwrite如果表存在,则删除表并重新创建;否则,创建新表并写入数据替换整个表的数据
append如果表存在,则将数据追加到表中;否则,创建新表并写入数据向已有表追加新数据
ignore如果表存在,则忽略写入操作;否则,创建新表并写入数据确保不会覆盖已有表,防止误操作
error如果表存在,则抛出错误;否则,创建新表并写入数据防止对已有表进行修改,确保数据安全

4. 读取和写出时的分区

为了优化性能,在读取和写出 MySQL 数据时,可以指定分区,特别是处理大数据量时,分区可以显著提高效率。

读取时的分区配置:

df = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://<host>:<port>/<database>") \
    .option("dbtable", "<table_name>") \
    .option("user", "<username>") \
    .option("password", "<password>") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("numPartitions", 10) \
    .option("partitionColumn", "id") \
    .option("lowerBound", "1") \
    .option("upperBound", "1000") \
    .load()
  • numPartitions:指定 Spark 使用的并行读取的分区数量。
  • partitionColumn:指定用于分区的列,通常是整数类型的主键或索引列。
  • lowerBoundupperBound:指定分区列的范围。

示例:按主键分区读取数据

df = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://47.101.42.60:3306/mybatis") \
    .option("dbtable", "dm_student") \
    .option("user", "root") \
    .option("password", "#Alone117") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("numPartitions", 10) \
    .option("partitionColumn", "id") \
    .option("lowerBound", "1") \
    .option("upperBound", "10000") \
    .load()

总结:

  • 读取数据:使用 read.format("jdbc") 结合 MySQL JDBC 驱动进行数据库连接,支持读取整个表或通过 SQL 查询语句读取部分数据。
  • 写出数据:使用 write.format("jdbc") 写出 DataFrame 到 MySQL,支持 appendoverwrite 等模式。
  • 优化:通过分区选项,如 numPartitionspartitionColumnlowerBoundupperBound,可以优化大数据集的读取和写出性能。

通过这些配置,你可以灵活地将 MySQL 数据读入到 PySpark DataFrame 进行处理,并将结果数据写回 MySQL 表。