PySpark 实战指南:如何高效合并两个 DataFrame

在大数据处理领域,数据的整合往往是最基础也是最关键的步骤之一。当你面对分散在不同数据源中的信息时,如何将它们无缝拼接在一起,往往决定了后续分析的效率。今天,我们将深入探讨 PySpark 中拼接两个 DataFrame 的多种方法。我们将不仅限于简单的语法演示,还会剖析每种方法背后的工作原理、适用场景以及在实际开发中可能遇到的“坑”。准备好了吗?让我们通过实战案例,一起掌握这些核心技能。

环境准备与数据初始化

为了让大家更直观地理解,我们首先构建一个 Spark 环境,并准备两份模拟数据。在这个过程中,我会穿插讲解初始化的细节,确保即使是初学者也能跟上节奏。

在 PySpark 中,一切操作的入口都是 SparkSession。它是我们与 Spark 交互的句柄。

# 导入必要的库
from pyspark.sql import SparkSession

# 创建一个 Spark 会话
# appName 用于在 Spark UI 中标识应用,方便调试
spark = SparkSession.builder \
    .appName(‘PySpark Concatenation Example‘) \
    .getOrCreate()

接下来,我们创建第一个 DataFrame(df1)。这里我们模拟了一份员工基本信息的简化版数据。

# 数据以元组列表的形式定义
# 注意:日期是以字符串形式存储的,这在实际 ETL 中非常常见
data1 = [
    (‘Ram‘, ‘1991-04-01‘, ‘M‘, 3000),
    (‘Mike‘, ‘2000-05-19‘, ‘M‘, 4000),
    (‘Rohini‘, ‘1978-09-05‘, ‘M‘, 4000),
    (‘Maria‘, ‘1967-12-01‘, ‘F‘, 4000),
    (‘Jenis‘, ‘1980-02-17‘, ‘F‘, 1200)
]

# 定义 Schema(架构),明确列名
# 这一步比让 Spark 自动推断类型更加高效且稳定
columns = ["Name", "DOB", "Gender", "salary"]

# 创建 DataFrame
df1 = spark.createDataFrame(data=data1, schema=columns)

# 打印查看数据
df1.show()

输出结果:

+------+----------+------+------+
|  Name|       DOB|Gender|salary|
+------+----------+------+------+
|   Ram|1991-04-01|     M|  3000|
|  Mike|2000-05-19|     M|  4000|
|Rohini|1978-09-05|     M|  4000|
| Maria|1967-12-01|     F|  4000|
| Jenis|1980-02-17|     F|  1200|
+------+----------+------+------+

紧接着,我们创建第二个 DataFrame(INLINECODEbf21af3e)。为了展示拼接效果,INLINECODEe3f6282f 包含了不同的员工数据,且在定义时,我特意稍微调整了列的顺序(虽然在这个例子中看起来一样,但在实际开发中列顺序错位是常见的报错原因)。

# 第二份数据,假设这是新进员工或另一个分部的数据
data2 = [
    (‘Mohi‘, ‘1991-04-01‘, ‘M‘, 3000),
    (‘Ani‘, ‘2000-05-19‘, ‘F‘, 4300),
    (‘Shipta‘, ‘1978-09-05‘, ‘F‘, 4200),
    (‘Jessy‘, ‘1967-12-01‘, ‘F‘, 4010),
    (‘kanne‘, ‘1980-02-17‘, ‘F‘, 1200)
]

# 注意:这里我们使用相同的列名,但记住,列的数据类型必须兼容
df2 = spark.createDataFrame(data=data2, schema=columns)

# 查看第二份数据
df2.show()

输出结果:

+------+----------+------+------+
|  Name|       DOB|Gender|salary|
+------+----------+------+------+
|  Mohi|1991-04-01|     M|  3000|
|   Ani|2000-05-19|     F|  4300|
|Shipta|1978-09-05|     F|  4200|
| Jessy|1967-12-01|     F|  4010|
| kanne|1980-02-17|     F|  1200|
+------+----------+------+------+

现在,数据准备就绪,让我们正式进入拼接环节。

方法一:使用 union() 进行基于位置的合并

这是最直接的方法,类似于 SQL 中的 INLINECODE85c21a75。INLINECODE4d8840c4 函数会将两个 DataFrame 的数据简单地上下拼接在一起。

核心原理:

union() 不会去除重复行,也不会进行数据类型的隐式强制转换(如果类型不兼容会报错)。最重要的是,它是基于位置(Position-based)合并的。这意味着它不会检查列名是否一致,而是直接将第一个 DataFrame 的第 N 列与第二个 DataFrame 的第 N 列拼在一起。

> ⚠️ 严正警告: 如果两个 DataFrame 的列顺序不同,但列名相同,直接使用 union() 会导致数据错位。这是新手最容易犯的错误——把“工资”数据拼到了“性别”列里,而代码却不会报错!

代码示例:

# 使用 union 拼接 df1 和 df2
# 这里的操作仅仅是把 df2 的行追加到 df1 下面
result_union = df1.union(df2)

# 显示合并后的结果
print("使用 Union 合并后的结果:")
result_union.show()

输出分析:

+------+----------+------+------+
|  Name|       DOB|Gender|salary|
+------+----------+------+------+
|   Ram|1991-04-01|     M|  3000|
|  Mike|2000-05-19|     M|  4000|
|Rohini|1978-09-05|     M|  4000|
| Maria|1967-12-01|     F|  4000|
| Jenis|1980-02-17|     F|  1200|
|  Mohi|1991-04-01|     M|  3000|
|   Ani|2000-05-19|     F|  4300|
|Shipta|1978-09-05|     F|  4200|
| Jessy|1967-12-01|     F|  4010|
| kanne|1980-02-17|     F|  1200|
+------+----------+------+------+

可以看到,行数增加了,数据顺序保留了原始顺序。如果 INLINECODEf46da1bd 和 INLINECODEfde704f0 有完全相同的行,union() 也会全部保留。

方法二:使用 unionByName() 进行基于名称的合并(推荐)

为了解决 INLINECODE962573b5 可能导致的列顺序错位问题,Spark 引入了 INLINECODE912969a8(从 Spark 2.3 开始可用,并在 Spark 3.1 中进一步完善)。

核心优势:

unionByName()基于列名进行匹配的。这意味着,只要两个 DataFrame 中有相同名称的列,它就会将它们对齐,而不管它们在 Schema 中的原始位置如何。这大大提高了代码的安全性和可读性。

代码示例:

# 使用 unionByName 进行合并
# 即使 df2 的列定义顺序与 df1 不同(只要列名存在),也能正确对齐
result_union_by_name = df1.unionByName(df2)

# 显示结果
print("使用 unionByName 合并后的结果(安全对齐):")
result_union_by_name.show()

输出分析:

输出结果与方法一在视觉上是一样的(因为在这个例子中列顺序是一致的),但在内部处理上,unionByName 执行了列名匹配检查。

进阶技巧:处理列缺失的情况

在某些 Spark 版本(3.1+)中,INLINECODEdd251bdd 允许两个 DataFrame 拥有不同的列,通过设置 INLINECODE923c7875,缺失的位置将填充 null。这对于处理变更缓慢的数据流非常有用。

# 假设我们有一个只有 Name 和 salary 的 df3
data3 = [(‘NewGuy‘, 5000)]
df3 = spark.createDataFrame(data3, ["Name", "salary"])

# 尝试合并 df1 和 df3,允许缺失列
# 这在 ETL 补全数据场景下非常强大
try:
    # 注意:这需要 Spark 3.1 或更高版本
    result_missing = df1.unionByName(df3, allowMissingColumns=True)
    print("包含缺失列的合并结果:")
    result_missing.show()
except Exception as e:
    print(f"当前 Spark 版本可能不支持 allowMissingColumns: {e}")

方法三:使用 functools 处理多 DataFrame 合并

在实际的生产环境中,我们往往不是只合并两个 DataFrame,而是需要将一个列表中的十几个、甚至上百个 DataFrame 合并成一个。这时候,写几十个 .union() 既不优雅,维护起来也是噩梦。

Python 的 INLINECODE3ed83d7d 模块中的 INLINECODE18dfaccd 函数提供了完美的解决方案。它可以帮我们将一个合并操作迭代地应用到一个列表上。

代码示例:

import functools

# 定义一个通用的合并函数
def unionAll(*dfs):
    # reduce 会依次将列表中的 df 累加
    # lambda df1, df2: df1.union(df2) 定义了累加的逻辑
    return functools.reduce(lambda df1, df2: df1.union(df2), dfs)

# 或者,如果你需要处理列顺序可能不一致的多个 DF:
def unionAllByName(*dfs):
    return functools.reduce(lambda df1, df2: df1.unionByName(df2), dfs)

# 让我们创建第三个 DataFrame 来演示多表合并
data_extra = [(‘Extra‘, ‘2001-01-01‘, ‘M‘, 1000)]
df_extra = spark.createDataFrame(data_extra, columns)

# 将 df1, df2, df_extra 放入一个列表
# 调用我们的自定义函数
dfs_to_merge = [df1, df2, df_extra]
result_multiple = unionAll(*dfs_to_merge)

# 或者使用 unionAllByName 以确保安全
# result_multiple = unionAllByName(*dfs_to_merge) 

print("合并多个 DataFrame 的结果:")
result_multiple.show()

输出分析:

+------+----------+------+------+
|  Name|       DOB|Gender|salary|
+------+----------+------+------+
|   Ram|1991-04-01|     M|  3000|
... (df1 数据) ...
|  Mohi|1991-04-01|     M|  3000|
... (df2 数据) ...
| Extra|2001-01-01|     M|  1000|
+------+----------+------+------+

这种方法在处理循环产生的 DataFrame 或动态读取多个文件时极为实用。

实战中的性能优化与最佳实践

掌握了基本语法后,让我们聊聊怎么让代码跑得更快、更稳。

#### 1. 去重与缓存

如果你需要的是类似 SQL INLINECODE05c06c8b(去重)的效果,而不是 INLINECODE1ade0d88(保留所有行),你需要显式地调用 .distinct()

# 合并并去除完全相同的行
result_distinct = df1.union(df2).distinct()

性能提示: INLINECODEa0011b63 操作非常昂贵,因为它会引发 INLINECODE978099a2(所有数据在网络中重新分发以进行比较)。如果数据量巨大,且你确定源数据没有重叠,请尽量避免使用 INLINECODE6f01a0f6,或者直接使用 INLINECODEafb2880f 而不是 union().distinct()

#### 2. 分区策略

当你合并两个巨大的 DataFrame 时,考虑 INLINECODE1d681265 或 INLINECODEa81f908f。

# 合并后,如果数据倾斜严重或分区过多导致小文件问题,可以进行合并
# coalesce 用于减少分区,不触发 shuffle,性能较好
result_union = df1.union(df2).coalesce(4) 

#### 3. Schema 统一的重要性

在合并之前,强制进行类型对齐是一个好习惯。例如,INLINECODEce3520f9 的 salary 是 INLINECODEf69ad73b,而 INLINECODEf567c828 的 salary 是 INLINECODE8743f63d,直接 union 会报错。

解决方案:

from pyspark.sql.functions import col

# 假设 df2 的 salary 列被误读为字符串类型
# df2 = df2.withColumn("salary", col("salary").cast("int"))

# 确保类型一致后再合并
# result = df1.union(df2)

常见错误与排查清单

  • AnalysisException: Union can only be performed on tables with compatible column types:

* 原因: 数据类型不匹配(如 String vs Int)。

* 解决: 使用 .cast() 统一类型。

  • 数据错位(不报错,但结果错误):

* 原因: 使用了 union(),但两个 DataFrame 的列顺序定义不同。

* 解决: 改用 INLINECODE848f05a1,或者在创建 DataFrame 时严格保持 INLINECODEaaef0c7c 列表顺序一致。

  • NoSuchColumnException:

* 原因: 使用了 unionByName(),但列名拼写不一致(如 "Gender" vs "gender")。

* 解决: 在合并前使用 .withColumnRenamed() 统一列名。

总结

在 PySpark 中合并数据看似简单,实则暗藏玄机。

  • 如果追求简单且列顺序完全一致,使用 union() 是没问题的。
  • 如果你想让代码更健壮、防止因列顺序变化导致的 Bug,请始终优先使用 unionByName()
  • 如果是动态列表合并,别忘了 Python 强大的 functools.reduce 工具。

希望通过这篇文章,你不仅能学会“怎么写代码”,更能理解“怎么写好代码”。数据处理不仅仅是把数据凑在一起,更是为了保证数据流的准确和高效。下一次当你面对 Spark 拼接任务时,相信你能自信地选择最合适的方案!

如果你在实践过程中遇到任何问题,或者想了解更多关于 Spark 优化的细节,欢迎继续探讨。Happy Coding!

# 清理环境,释放资源
spark.stop()
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。如需转载,请注明文章出处豆丁博客和来源网址。https://shluqu.cn/22483.html
点赞
0.00 平均评分 (0% 分数) - 0