欢迎来到 PySpark 数据处理的世界!作为一名数据工程师或数据科学家,我们在日常工作中经常面临一个常见且基础的任务:从庞大的数据集中清洗数据,识别不重复的值,或者 simply 查看某一列到底有哪些唯一的存在。这不仅能帮助我们理解数据的分布,还是进行数据质量检查的关键步骤。
在这篇文章中,我们将深入探索如何在 Python 中利用 PySpark 强大的 DataFrame API 来显示不重复的列值。我们不仅要学习如何使用 INLINECODEa5b495da 和 INLINECODE470d79b7 这两个核心函数,还会探讨它们背后的区别、性能考量以及在实际生产环境中的最佳实践。
准备好了吗?让我们创建一个示例 DataFrame,然后开始这段探索之旅。
环境准备与示例数据构建
首先,我们需要初始化一个 SparkSession。它是我们所有 PySpark 操作的入口点。为了让你更直观地理解,我们构建了一个包含员工数据的模拟数据集。注意看,我们在数据中故意放入了一些重复的行(比如 ID 为 "1" 和 "3" 的记录出现了两次),这正是我们后续要去重的目标。
# 导入 pyspark 模块
import pyspark
# 从 pyspark.sql 模块中导入 SparkSession
from pyspark.sql import SparkSession
# 创建 SparkSession,并给应用程序命名
# appName 会在 Spark UI 中显示,方便我们追踪任务
spark = SparkSession.builder.appName(‘sparkdf‘).getOrCreate()
# 员工数据列表,包含了一些重复的行值
# 比如 ID "1" 和 "3" 对应的数据出现了两次
data =[["1", "sravan", "company 1"],
["3", "bobby", "company 3"],
["2", "ojaswi", "company 2"],
["1", "sravan", "company 1"], # 重复行
["3", "bobby", "company 3"], # 重复行
["4", "rohith", "company 2"],
["5", "gnanesh", "company 1"]]
# 指定列名,这样生成的 DataFrame 结构会更清晰
columns = [‘Employee ID‘,‘Employee NAME‘,‘Company Name‘]
# 从列表数据创建 DataFrame
dataframe = spark.createDataFrame(data, columns)
# 展示初始数据
# 你可以看到完全相同的行占据了不同的物理行
print("原始数据预览:")
dataframe.show()
原始数据输出:
+-----------+-------------+------------+
|Employee ID|Employee NAME|Company Name|
+-----------+-------------+------------+
| 1| sravan | company 1|
| 3| bobby | company 3|
| 2| ojaswi | company 2|
| 1| sravan | company 1|
| 3| bobby | company 3|
| 4| rohith | company 2|
| 5| gnanesh | company 1|
+-----------+-------------+------------+
看到上面的输出,你可能会想:“如果我想知道公司里到底有哪些唯一的员工 ID,该怎么办?” 或者 “我想获取完全没有任何重复记录的员工名单,该怎么操作?”。别急,我们将通过两种主要方法来解决这些问题。
方法 1:使用 distinct() 获取唯一值
distinct() 是 PySpark 中最直观的去重函数。它的作用是基于 DataFrame 中所有的列(或者你选择的列子集)进行去重,返回一个新的 DataFrame,其中每一行都是唯一的。
#### 核心概念
在 SQL 中,我们经常使用 INLINECODEc990b126。PySpark 的 INLINECODEa95f5a39 与之非常相似。当你在一个 DataFrame 上调用 distinct() 时,Spark 会查看所有列的组合。如果两行的所有列值都完全相同,那么其中一行会被移除。
#### 场景 1:获取单列的唯一值
让我们先从最简单的场景开始:假设我们只关心“员工 ID”这一列,想知道数据集中包含哪些不同的 ID。
# 使用 select() 选择 "Employee ID" 列
# 然后链式调用 distinct() 进行去重
# 最后调用 show() 将结果打印到控制台
print("员工 ID 去重后的结果:")
dataframe.select("Employee ID").distinct().show()
输出分析:
+-----------+
|Employee ID|
+-----------+
| 3|
| 5|
| 1|
| 4|
| 2|
+-----------+
正如你所见,虽然原始数据中 ID "1" 和 "3" 出现了多次,但在结果中它们只出现了一次。注意,结果的顺序可能与原始数据不同,这是因为 Spark 的分布式特性决定的。
#### 场景 2:获取多列组合的唯一值
在实际业务中,我们经常需要根据多个字段的组合来判断唯一性。例如,我们想知道“ID”和“姓名”这两个字段的唯一组合情况。
# 语法:dataframe.select("column_name 1", "column_name 2").distinct().show()
# 这里我们同时选择 ID 和 NAME,只有当这两列的值同时相同时,才会被视为重复行
print("员工 ID 和姓名的组合去重结果:")
dataframe.select(["Employee ID", "Employee NAME"]).distinct().show()
输出分析:
+-----------+-------------+
|Employee ID|Employee NAME|
+-----------+-------------+
| 5| gnanesh|
| 4| rohith|
| 1| sravan|
| 2| ojaswi|
| 3| bobby|
+-----------+-------------+
实用见解: 你可能会问,INLINECODE06fc2c67 和直接对整张表操作有什么区别?其实,当你先 INLINECODEc3f45a0a 再 distinct() 时,你实际上是在告诉 Spark:“我只关心这几列,请基于这几列计算唯一值。” 这通常比先全表去重再 Select 要高效得多,因为参与 Shuffle 的数据量减少了。
方法 2:使用 dropDuplicates() 灵活去重
除了 INLINECODEd377dad9,PySpark 还提供了一个功能更强大的方法:INLINECODEe879a0cc。从功能上讲,INLINECODE27152cce 和 INLINECODE99971379 在处理全表去重时效果几乎一模一样,但前者提供了更精细的控制力。
#### 核心差异:Subset 参数
dropDuplicates() 的强大之处在于它允许你指定一个子集列。这意味着你可以说:“请根据列 A 删除重复行,但保留列 B 和列 C 的数据(即使它们本身在重复行中是不同的)。” 这在处理脏数据时非常有用。
#### 场景 1:单列去重(基础用法)
让我们用 dropDuplicates() 来实现刚才单列去重的效果。
# 我们也可以直接在 select() 后使用 dropDuplicates()
# 这与 select().distinct() 的结果相同
print("使用 dropDuplicates() 对员工 ID 去重:")
dataframe.select("Employee ID").dropDuplicates().show()
输出:
+-----------+
|Employee ID|
+-----------+
| 3|
| 5|
| 1|
| 4|
| 2|
+-----------+
#### 场景 2:多列去重与实战应用
让我们看看多列的情况,并深入探讨一个更高级的用法——去重时保留哪一行的数据。
# 针对多列进行去重
print("使用 dropDuplicates() 对 ID 和 NAME 去重:")
dataframe.select(["Employee ID", "Employee NAME"]).dropDuplicates().show()
进阶示例:解决“保留最新记录”的问题
这是数据工程面试中非常经典的问题:假设数据中包含时间戳(或者版本号),同一个 ID 有多条记录,我们只想保留最新的一条。虽然 INLINECODE38c3eaa0 本身并不直接支持“保留最大/最小值”的逻辑,但它是解决这个问题的关键第一步。通常我们需要结合 Window 函数来使用,但为了演示 INLINECODE04924e14 的能力,我们先看它如何基于特定列去重。
想象一下,如果我们直接在 DataFrame 上调用 dropDuplicates([‘Employee ID‘]),Spark 会保留遇到的第一个该 ID 的记录,删除后续的。
# 直接在整个 DataFrame 上操作,而不只是 select()
# 这将保留每个 ID 出现的第一行数据,并删除其他行(即使其他列的值不同)
print("基于 Employee ID 对整张表进行去重(保留首行):")
dataframe.dropDuplicates([‘Employee ID‘]).show()
输出:
+-----------+-------------+------------+
|Employee ID|Employee NAME|Company Name|
+-----------+-------------+------------+
| 1| sravan | company 1|
| 3| bobby | company 3|
| 5| gnanesh | company 1|
| 4| rohith | company 2|
| 2| ojaswi | company 2|
+-----------+-------------+------------+
注意观察: 结果中每个 ID 只出现了一次。Spark 默认保留了每个重复组中的第一条记录。这听起来简单,但在处理海量数据时,理解“哪一条是第一条”取决于数据如何分区的,这一点非常重要。
深入解析:distinct() 与 dropDuplicates() 的性能与选择
你可能会问:“既然功能差不多,我该选哪一个?” 这里有一个经验法则。
- 代码可读性:如果你只是想要完全唯一的行(所有列都相同),使用
distinct()更符合 SQL 的直觉,代码也更简洁。 - 特定列去重:如果你需要基于部分列去重(例如,“只要用户 ID 重复就删除,不管邮箱是不是不一样”),那么 INLINECODE001c6047 是更清晰、更明确的写法。虽然 INLINECODE1166a449 也能达到类似效果,但
dropDuplicates在逻辑上更准确地描述了“基于这些列丢弃重复项”的意图。
性能优化与最佳实践
在处理大规模数据集时,去重操作可能会导致数据重新分布,这是一个昂贵的操作。以下是一些优化建议:
- 减少 Shuffle 数据量:在使用 INLINECODEb0abb28e 或 INLINECODE5c8ee0f7 之前,先使用
select()只选择你需要的列。正如我们在示例中所做的那样,处理 2 列的数据比处理 10 列的数据要快得多,网络传输的压力也更小。 - 合理设置分区数:去重操作通常会增加或减少分区数。如果你的数据集非常倾斜(某些 Key 特别多),去重可能会导致某些任务运行极慢。你可能需要在去重前调用 INLINECODEfbb3dcad 或 INLINECODE61757827 来优化数据分布。
- 缓存策略:如果你需要对同一个 DataFrame 进行多次去重(例如,先按 ID 去重,再按 Name 去重),并且在内存允许的情况下,可以先调用
dataframe.cache(),避免重复计算原始数据。
常见错误与解决方案
在刚开始使用 PySpark 时,你可能会遇到一些常见的坑。让我们看看如何避免它们。
错误 1:大小写敏感性问题
PySpark 默认是区分大小写的。如果你在 INLINECODE33159380 中写错了列名的大小写(例如 "employee id" 而不是 "Employee ID"),程序会抛出 INLINECODE6c3911df。
# 错误示例:
# dataframe.select("employee id").distinct().show()
# 报错:Column ‘employee id‘ does not exist.
解决方案:始终检查 DataFrame 的 schema,或者使用 dataframe.columns 查看确切的列名。
错误 2:混淆 count() 与 distinct()
有时候我们只想知道有多少个不同的值,而不想把它们都列出来。这是一个典型的性能陷阱。
# 低效写法:先把所有唯一值拉取到 Driver,再数数
# count_val = dataframe.select("Employee ID").distinct().count()
# 更好的写法:
# PySpark 的优化器通常能很好地处理 count(distinct()),但在 Spark SQL 中直接写可能更直观
# dataframe.createOrReplaceTempView("df")
# spark.sql("select count(distinct `Employee ID`) from df").show()
总结与后续步骤
在这篇文章中,我们全面探讨了在 PySpark DataFrame 中显示和获取不重复列值的两种核心方法:INLINECODEcaffa792 和 INLINECODE6665f6c2。我们不仅学习了如何针对单列和多列进行操作,还深入了解了它们在处理实际数据去重任务时的细微差别。
关键要点:
- 使用 INLINECODE0f86ab88 配合 INLINECODE947f3588 是查看特定列唯一值的快速方法。
-
dropDuplicates()提供了更灵活的语义,特别是在处理基于部分列去重的复杂逻辑时。 - 始终关注性能,尽量避免在参与 Shuffle 的 DataFrame 中包含不必要的列。
掌握了这些技巧后,你可以更自信地处理数据清洗和 ETL 任务中的去重环节。接下来,建议你尝试在自己的数据集上应用这些代码,或者探索 PySpark 强大的 Window 函数,它能帮你解决更复杂的“去重并保留最大/最小值”的问题。
感谢阅读!如果你在实践过程中遇到任何问题,或者想分享你的使用心得,欢迎随时交流。祝你在 PySpark 的学习道路上越走越远!