在本文中,我们将讨论如何将包含 JSON 字符串的列解析为各自独立的列。在这里,我们将读取 CSV 文件中存在的 JSON 字符串,并使用 Python Pyspark 将其转换为多个 DataFrame 列。
示例 1:使用 pyspark.sql.functions.from_json 解析 JSON 字符串列
为了解析 JSON 字符串,我们将使用 from_json() SQL 函数,将包含 JSON 字符串的列解析为具有指定模式的 StructType。如果字符串无法解析,它将返回 null。
<a href=" https://github.com/rahulkriplani/gfgparseacolumnofjsonstring/blob/873ac3d26429a0133a685db591c65155e3db4149/data/movieinput.csv "> movieinput.csv 文件包含 15 条记录,其中包含以 JSON 字符串形式存在的电影详细信息(title、rating、releaseYear 和 genre)。我们想要读取这个文件并解析 JSON 字符串,以将电影细节提取到各自独立的列 title、rating、releaseYear 和 genre 中。
此函数需要两个必需参数:
- col: 包含 JSON 字符串的列名。
- schema: 解析 JSON 列时使用的 StructType 或 StructType 的 ArrayType。
Python3
CODEBLOCK_15fbc54b
输出:
一旦我们将数据读入 DataFrame,现在让我们使用 from_json() 将 JSON 列转换为多列。如上所述,此函数将带有 JSON 字符串的列名和 JSON 模式作为参数,因此让我们创建代表我们数据的模式。
Python3
CODEBLOCK_547967ce
现在,让我们使用 from_json() 函数,它返回包含所有 JSON 列的 Column 结构体。
Python3
CODEBLOCK_89dd08ab
输出:
最后,我们使用 select 方法展开(explode) json 结构体以将其展平。我们可以选择 movie.title、movie.rating 等列。但更好的方法是使用 * 通配符,它将选择所有具有 movie. 前缀的列。
Python3
CODEBLOCK_5498a32a
输出:
示例 2:使用 tojson() 和 jsontuple() 解析 JSON 字符串列
为此,除了上述方法之外,我们还将使用 tojson() 和 jsontuple()。
- 首先,我们将借助 from_json() 函数将 JSON 字符串列映射为 MapType。
- 然后我们将 MapType 列转换为 JSON 字符串。如果 JSON 数据不正确,该函数将抛出异常。
- 最后,我们根据传递的字段名称为 JSON 列创建新列。
Python3
“
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
if name == "main":
spark = SparkSession.builder.appName(‘Parse a\
column of json strings‘).getOrCreate()
df = spark.createDataFrame(
[
["1","{‘color‘: ‘red‘, ‘value‘: ‘#f00‘}"],
["2","{‘color‘: ‘green‘, ‘value‘: ‘#0f0‘}"],
["3","{‘color‘: ‘blue‘, ‘value‘: ‘#00f‘}"],
["4","{‘color‘: ‘cyan‘, ‘value‘: ‘#0ff‘}"],
["5","{‘color‘: ‘magenta‘, ‘value‘: ‘#f0f‘}"],
["6","{‘color‘: ‘yellow‘, ‘value‘: ‘#ff0‘}"],
["7","{‘color‘: ‘black‘, ‘value‘: ‘#000‘}"],
]
).toDF(‘id‘, ‘colors‘)
df.show(truncate=False)
df.printSchema()
df = df.withColumn("colors",
F.from_json(df.colors,
T.MapType(T.StringType(),
T.StringType())))
df.show(truncate=False)
df.printSchema()
df = df.withColumn("colors", F.to_json(df.colors))
df.show(truncate=False)
df.printSchema()
df = df.select(‘id‘, F.json_tuple(F.col("colors"),
"color", "value")
).toDF(‘id‘, ‘color‘,