DataFrame通过JDBC读写Mysql
1. 读取 MySQL 数据到 PySpark DataFrame
读取 MySQL 的基本模式
df = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://<host>:<port>/<database>") \
.option("dbtable", "<table_name>") \
.option("user", "<username>") \
.option("password", "<password>") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.load()
详细参数说明:
format("jdbc")
:指定使用JDBC
连接器来进行数据库操作。url
:MySQL 数据库的 URL,格式为jdbc:mysql://<host>:<port>/<database>
。其中,<host>
是数据库服务器的 IP 地址或主机名,<port>
是 MySQL 的端口号(通常为 3306),<database>
是要连接的数据库名称。dbtable
:指定要读取的表或查询,可以是表名(如"tablename"
),也可以是一个 SQL 查询语句(如"(SELECT * FROM tablename WHERE age > 30) AS temp"
)。user
和password
:指定连接 MySQL 数据库的用户名和密码。driver
:指定 MySQL JDBC 驱动类名,MySQL 8.x 版本使用"com.mysql.cj.jdbc.Driver"
。
示例:
df = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://47.101.42.60:3306/mybatis") \
.option("dbtable", "dm_student") \
.option("user", "") \
.option("password", "") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.load()
2. 读取 MySQL 表的查询模式
除了读取整个表,还可以指定 SQL 查询语句来进行过滤、选择数据:
df = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://<host>:<port>/<database>") \
.option("query", "SELECT * FROM <table_name> WHERE age > 30") \
.option("user", "<username>") \
.option("password", "<password>") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.load()
示例:
df = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://47.101.42.60:3306/mybatis") \
.option("query", "SELECT * FROM dm_student WHERE age > 25") \
.option("user", "") \
.option("password", "") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.load()
3. 写出 PySpark DataFrame 到 MySQL
在 PySpark 中,写出数据到 MySQL 数据库时,可以使用不同的模式来控制数据如何写入目标表。以下是所有支持的写出模式及其详细说明:
1. overwrite
模式
- 描述:如果目标表已经存在,则删除该表,然后重新创建一个新表,并将 DataFrame 中的数据写入表中。这个模式会覆盖表中的所有数据。
- 适用场景:适用于完全替换表中数据的情况,比如需要刷新整个表的数据时。
示例:
df.write.format("jdbc") \
.option("url", "jdbc:mysql://<host>:<port>/<database>") \
.option("dbtable", "<table_name>") \
.option("user", "<username>") \
.option("password", "<password>") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.mode("overwrite") \
.save()
2. append
模式
- 描述:如果目标表已经存在,直接将 DataFrame 中的数据追加到该表的末尾,不会覆盖原有数据。如果目标表不存在,Spark 会自动创建该表。
- 适用场景:适用于将新数据追加到已有表中的情况,比如每天定时将新记录添加到历史数据表中。
示例:
df.write.format("jdbc") \
.option("url", "jdbc:mysql://<host>:<port>/<database>") \
.option("dbtable", "<table_name>") \
.option("user", "<username>") \
.option("password", "<password>") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.mode("append") \
.save()
3. ignore
模式
- 描述:如果目标表已经存在,则忽略写入操作,不执行任何数据插入。如果表不存在,则创建新表并将数据写入。
- 适用场景:适用于不想覆盖或追加到已经存在的表中的情况,主要用于防止意外覆盖已有数据。
示例:
df.write.format("jdbc") \
.option("url", "jdbc:mysql://<host>:<port>/<database>") \
.option("dbtable", "<table_name>") \
.option("user", "<username>") \
.option("password", "<password>") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.mode("ignore") \
.save()
4. error
或 errorifexists
模式
- 描述:如果目标表已经存在,写入操作将会失败并抛出错误。如果表不存在,则会创建新表并将数据写入。
- 适用场景:适用于确保不会误覆盖已有表的情况,如果表存在就终止写入并抛出错误,常用于安全性要求较高的场景。
示例:
df.write.format("jdbc") \
.option("url", "jdbc:mysql://<host>:<port>/<database>") \
.option("dbtable", "<table_name>") \
.option("user", "<username>") \
.option("password", "<password>") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.mode("error") \
.save()
5. 总结:各模式的适用场景
写出模式 | 描述 | 适用场景 |
---|---|---|
overwrite | 如果表存在,则删除表并重新创建;否则,创建新表并写入数据 | 替换整个表的数据 |
append | 如果表存在,则将数据追加到表中;否则,创建新表并写入数据 | 向已有表追加新数据 |
ignore | 如果表存在,则忽略写入操作;否则,创建新表并写入数据 | 确保不会覆盖已有表,防止误操作 |
error | 如果表存在,则抛出错误;否则,创建新表并写入数据 | 防止对已有表进行修改,确保数据安全 |
4. 读取和写出时的分区
为了优化性能,在读取和写出 MySQL 数据时,可以指定分区,特别是处理大数据量时,分区可以显著提高效率。
读取时的分区配置:
df = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://<host>:<port>/<database>") \
.option("dbtable", "<table_name>") \
.option("user", "<username>") \
.option("password", "<password>") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("numPartitions", 10) \
.option("partitionColumn", "id") \
.option("lowerBound", "1") \
.option("upperBound", "1000") \
.load()
numPartitions
:指定 Spark 使用的并行读取的分区数量。partitionColumn
:指定用于分区的列,通常是整数类型的主键或索引列。lowerBound
和upperBound
:指定分区列的范围。
示例:按主键分区读取数据
df = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://47.101.42.60:3306/mybatis") \
.option("dbtable", "dm_student") \
.option("user", "root") \
.option("password", "#Alone117") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("numPartitions", 10) \
.option("partitionColumn", "id") \
.option("lowerBound", "1") \
.option("upperBound", "10000") \
.load()
总结:
- 读取数据:使用
read.format("jdbc")
结合 MySQL JDBC 驱动进行数据库连接,支持读取整个表或通过 SQL 查询语句读取部分数据。 - 写出数据:使用
write.format("jdbc")
写出 DataFrame 到 MySQL,支持append
、overwrite
等模式。 - 优化:通过分区选项,如
numPartitions
、partitionColumn
、lowerBound
和upperBound
,可以优化大数据集的读取和写出性能。
通过这些配置,你可以灵活地将 MySQL 数据读入到 PySpark DataFrame 进行处理,并将结果数据写回 MySQL 表。