在数据科学和工程领域,我们经常需要处理那些无法通过简单的内置方法解决的复杂逻辑。当你使用 Polars 这种高性能 DataFrame 库时,你可能会遇到这样一种情况:你需要根据某一行的多个列(例如年龄和薪水)来计算一个新值,而这个逻辑太过复杂,无法用简单的表达式来概括。
这时,“逐行处理”似乎是一个直观的解决方案。然而,如果你直接在 Polars 中尝试像 pandas 那样的迭代方式,可能会发现它并不那么直接,甚至性能会受到影响。别担心,在这篇文章中,我们将深入探讨如何在 Polars 中优雅、高效地应用自定义函数来处理每一行数据。我们将从基础概念入手,通过多个实战案例,逐步掌握这一高级技巧,并了解如何在使用强大功能的同时保持代码的运行速度。
准备工作
在我们开始编写代码之前,请确保你的 Python 环境中已经安装了 Polars。由于 Polars 的底层是用 Rust 编写的,它在安装时可能会编译一些二进制文件,但这通常只需要几秒钟。你可以通过 pip 快速安装:
pip install polars
为了更好地理解接下来的内容,建议你对 Python 的基本语法和 DataFrame 的概念(如列、行、数据类型)有一定的了解。如果你之前使用过 pandas,那也没关系,我们将一起探索 Polars 独特的思维方式。
理解 Polars 的核心哲学:为什么“逐行”很特殊?
在正式写代码之前,我们需要先达成一个共识:Polars 是基于列式存储设计的。这意味着它在进行加法、过滤或聚合操作时,是按列整块整块处理的,这种“向量化”操作速度极快。
当我们谈论“逐行应用自定义函数”时,实际上是在进行一种反模式或者说是最后手段。因为逐行处理意味着要打破列式存储的优势,强制 CPU 一次只处理一条数据,并且还要在 Python 和 Rust 之间频繁切换上下文。因此,本文的目标不仅是教你“如何做”,更是教你“如何做才不慢”,以及何时应该考虑向量化替代方案。
创建示例数据集
首先,让我们构建一个包含员工信息的 DataFrame,作为我们实验的 playground。我们将包含姓名、年龄、入职年份和当前薪水这几列。
import polars as pl
# 创建一个模拟的员工数据集
data = {
"name": ["张伟", "李娜", "王强", "赵敏", "刘洋"],
"age": [25, 30, 35, 40, 45],
"join_year": [2021, 2018, 2015, 2010, 2005],
"salary": [50000, 60000, 70000, 80000, 90000]
}
df = pl.DataFrame(data)
print("原始 DataFrame:")
print(df)
输出:
shape: (5, 4)
┌────────┬─────┬────────────┬────────┐
│ name ┆ age ┆ join_year ┆ salary │
│ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ i64 ┆ i64 │
╞════════╪═════╪════════════╪════════╡
│ 张伟 ┆ 25 ┆ 2021 ┆ 50000 │
│ 李娜 ┆ 30 ┆ 2018 ┆ 60000 │
│ 王强 ┆ 35 ┆ 2015 ┆ 70000 │
│ 赵敏 ┆ 40 ┆ 2010 ┆ 80000 │
│ 刘洋 ┆ 45 ┆ 2005 ┆ 90000 │
└────────┴─────┴────────────┴────────┘
方法一:使用 pl.struct().apply() 处理多列逻辑
这是 Polars 中实现逐行处理的标准方法。INLINECODE98cdf24c 将选定的一组列打包成一个结构体对象,然后我们可以调用 INLINECODE722cc270(或 map_elements)将一个 Python 函数应用到每个结构体上。这里的“结构体”可以简单理解为那一行数据的快照。
#### 示例 1:根据年龄和工龄计算复杂的奖金系数
假设我们的业务逻辑非常复杂:奖金不仅取决于年龄,还取决于入职年份。如果员工超过 30 岁且入职超过 5 年,系数是 1.2;如果只有其中一个满足,系数是 1.1;否则是 1.0。这种涉及多列的条件判断,最适合使用自定义函数。
# 定义自定义业务逻辑函数
def calculate_bonus_coefficient(row):
# 从打包的行对象中提取字段
age = row["age"]
years_worked = 2024 - row["join_year"] # 假设当前年份是 2024
# 实施复杂的业务规则
if age > 30 and years_worked > 5:
return 1.20
elif age > 30 or years_worked > 5:
return 1.10
else:
return 1.00
# 应用函数:
# 1. pl.struct 打包了 ‘age‘ 和 ‘join_year‘
# 2. apply 接收我们的 Python 函数
# 3. alias 给新列命名
df_with_bonus = df.with_columns(
pl.struct(["age", "join_year"])
.apply(calculate_bonus_coefficient)
.alias("bonus_coefficient")
)
print(df_with_bonus)
输出:
shape: (5, 5)
┌────────┬─────┬────────────┬────────┬───────────────────┐
│ name ┆ age ┆ join_year ┆ salary ┆ bonus_coefficient │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ i64 ┆ i64 ┆ f64 │
╞════════╪═════╪════════════╪════════╪═══════════════════╡
│ 张伟 ┆ 25 ┆ 2021 ┆ 50000 ┆ 1.0 │
│ 李娜 ┆ 30 ┆ 2018 ┆ 60000 ┆ 1.1 │
│ 王强 ┆ 35 ┆ 2015 ┆ 70000 ┆ 1.2 │
│ 赵敏 ┆ 40 ┆ 2010 ┆ 80000 ┆ 1.2 │
│ 刘洋 ┆ 45 ┆ 2005 ┆ 90000 ┆ 1.2 │
└────────┴─────┴────────────┴────────┴───────────────────┘
注意: 在上面的代码中,我们没有使用 INLINECODE0f56660d 参数。Polars 通常能自动推断返回类型。但在某些复杂情况下,建议显式指定 INLINECODE29fa23de 以避免类型推断错误。
#### 示例 2:动态生成文本标签(处理字符串与数字的组合)
让我们看一个更直观的例子。我们需要为每个员工生成一个个性化的报告摘要字符串。这种操作是典型的“无法向量化”的任务,因为每行的输出都是基于不同值的文本拼接。
def generate_report_string(row):
name = row["name"]
salary = row["salary"]
age = row["age"]
# 根据 salary 格式化货币字符串
salary_str = f"¥{salary:,}"
# 自定义逻辑
if age < 30:
return f"员工 {name} 年轻有为,目前薪水 {salary_str}。"
else:
return f"资深员工 {name},经验丰富,当前薪水 {salary_str}。"
df_report = df.with_columns(
pl.struct(["name", "salary", "age"])
.apply(generate_report_string)
.alias("report_summary")
)
print(df_report.select(["name", "report_summary"]))
这个例子展示了自定义函数在格式化输出方面的强大能力。我们可以随意调用 Python 的字符串方法,这在纯表达式的 DataFrame 操作中是很难做到的。
进阶技巧:INLINECODE3044b01b 与 INLINECODEe71ae033
从 Polars 的新版本开始,INLINECODEf8d6f629 方法逐渐被重命名为 INLINECODE76831062,以便更清晰地表达“映射元素”的含义(虽然 INLINECODE1dc99100 目前仍然作为别名存在以保持向后兼容)。为了写出更现代、更符合未来规范的代码,让我们看看如何使用 INLINECODEfa6f515c。
#### 示例 3:数据清洗与标准化
在真实世界中,数据往往是脏乱的。比如,我们需要对薪水进行极端的修正:如果某人年龄很大但薪水很低,或者是新手但薪水极高,我们需要标记出来,甚至做修正。这种包含 if/else 和数学计算的逻辑,非常适合这里。
def clean_salary_data(row):
age = row["age"]
salary = row["salary"]
# 异常值检测逻辑
if age 100000:
# 如果是拿高薪的年轻人,可能是数据录入错误,我们截断到 10万
return 100000
elif age > 50 and salary < 20000:
# 如果是年长且低薪,补齐到 2万
return 20000
else:
return salary
# 使用 map_elements 并明确指定返回类型为 Float (以防返回 NaN 或浮点数)
# "return_dtype" 参数可以确保输出列的类型正确
df_cleaned = df.with_columns(
pl.struct(["age", "salary"])
.map_elements(clean_salary_data, return_dtype=pl.Int64)
.alias("cleaned_salary")
)
print(df_cleaned)
在这个例子中,如果你尝试使用 Polars 的 pl.when().then().otherwise() 链式写法,代码会变得非常冗长且难以维护。使用自定义函数,逻辑清晰明了。
性能警告与最佳实践
既然我们已经学会了“怎么做”,我们必须停下来谈谈“代价”。作为负责任的数据开发者,你需要知道以下几点:
- 性能杀手:INLINECODE69d3ab71(或 INLINECODE00bc2a5c)会逐行迭代。对于 100 行的数据,这没问题。但对于 1000 万行的数据,这可能会比原生的向量化操作慢 100 倍甚至更多。这是因为每次函数调用都有 Python 到 Rust 的序列化开销。
- 优先使用表达式 API:在编写自定义函数之前,先问自己:“我能否用 INLINECODE621f7691 来实现?” 如果答案是肯定的,请务必使用表达式。例如,计算“税后薪水”如果是简单的 INLINECODE9ab36d60,就绝对不要用
apply。
- 使用 INLINECODEb7282501:为了避免 Polars 猜测你的返回类型(这会导致它先扫描一遍数据来确定类型),在 INLINECODE56635d32 中显式声明
return_dtype可以带来微小的性能提升并减少潜在 Bug。
替代方案:当 apply 太慢时怎么办?
如果你的自定义函数是为了数学计算,且确实必须逐行进行,但又实在太慢,你可以考虑使用 Polars 的 INLINECODE450454e0 风格,或者使用 INLINECODE70530bac 结合 return_dtype 的优化模式。但对于极度复杂的逻辑,目前的最佳实践仍然是:
- 尽量减少列数:只把 INLINECODE19ca8a5d 中需要的列放入 INLINECODEc383907b。不要打包整个 DataFrame 的所有列,这会增加数据拷贝的开销。
- 使用 Rust 插件(终极方案):如果你是高级用户,可以将你的逻辑编译成 Rust 插件,但这需要 Rust 编程知识。
总结与下一步
在这篇文章中,我们不仅学会了如何在 Polars 中使用 INLINECODE9c762ddd 和 INLINECODEfe9ec414 来处理逐行数据,还深入探讨了为什么这应该被视为一种“特殊手段”而非日常工具。我们通过计算奖金系数、生成报告文本以及清洗脏数据等实战案例,看到了自定义函数在处理复杂、非向量化逻辑时的灵活性。
掌握这一技能后,你将能够应对 Polars 中那些看似棘手的数据转换任务。接下来,建议你在自己的数据集上尝试这些技巧,并观察不同数据量下的处理速度。记住,工具本身没有好坏之分,关键在于如何在正确的场景下使用它。当你发现自己必须使用 apply 时,请确保已经权衡了性能与开发效率。
继续探索 Polars 的强大功能吧,你会发现它不仅能处理大数据,也能优雅地解决细节问题!