使用 Pyspark 解析 JSON 字符串列

在本文中,我们将讨论如何将包含 JSON 字符串的列解析为各自独立的列。在这里,我们将读取 CSV 文件中存在的 JSON 字符串,并使用 Python Pyspark 将其转换为多个 DataFrame 列。

示例 1:使用 pyspark.sql.functions.from_json 解析 JSON 字符串列

为了解析 JSON 字符串,我们将使用 from_json() SQL 函数,将包含 JSON 字符串的列解析为具有指定模式的 StructType。如果字符串无法解析,它将返回 null。

<a href=" https://github.com/rahulkriplani/gfgparseacolumnofjsonstring/blob/873ac3d26429a0133a685db591c65155e3db4149/data/movieinput.csv "> movieinput.csv 文件包含 15 条记录,其中包含以 JSON 字符串形式存在的电影详细信息(title、rating、releaseYear 和 genre)。我们想要读取这个文件并解析 JSON 字符串,以将电影细节提取到各自独立的列 title、rating、releaseYear 和 genre 中。

此函数需要两个必需参数:

  • col: 包含 JSON 字符串的列名。
  • schema: 解析 JSON 列时使用的 StructType 或 StructType 的 ArrayType。

Python3


CODEBLOCK_15fbc54b

输出:

!image

一旦我们将数据读入 DataFrame,现在让我们使用 from_json() 将 JSON 列转换为多列。如上所述,此函数将带有 JSON 字符串的列名和 JSON 模式作为参数,因此让我们创建代表我们数据的模式。

Python3


CODEBLOCK_547967ce

现在,让我们使用 from_json() 函数,它返回包含所有 JSON 列的 Column 结构体。

Python3


CODEBLOCK_89dd08ab

输出:

!image

最后,我们使用 select 方法展开(explode) json 结构体以将其展平。我们可以选择 movie.title、movie.rating 等列。但更好的方法是使用 * 通配符,它将选择所有具有 movie. 前缀的列。

Python3


CODEBLOCK_5498a32a

输出:

!image

示例 2:使用 tojson()jsontuple() 解析 JSON 字符串列

为此,除了上述方法之外,我们还将使用 tojson()jsontuple()

  • 首先,我们将借助 from_json() 函数将 JSON 字符串列映射为 MapType。
  • 然后我们将 MapType 列转换为 JSON 字符串。如果 JSON 数据不正确,该函数将抛出异常。
  • 最后,我们根据传递的字段名称为 JSON 列创建新列。

Python3


from pyspark.sql import SparkSession

import pyspark.sql.functions as F

import pyspark.sql.types as T

if name == "main":

spark = SparkSession.builder.appName(‘Parse a\

column of json strings‘).getOrCreate()

df = spark.createDataFrame(

[

["1","{‘color‘: ‘red‘, ‘value‘: ‘#f00‘}"],

["2","{‘color‘: ‘green‘, ‘value‘: ‘#0f0‘}"],

["3","{‘color‘: ‘blue‘, ‘value‘: ‘#00f‘}"],

["4","{‘color‘: ‘cyan‘, ‘value‘: ‘#0ff‘}"],

["5","{‘color‘: ‘magenta‘, ‘value‘: ‘#f0f‘}"],

["6","{‘color‘: ‘yellow‘, ‘value‘: ‘#ff0‘}"],

["7","{‘color‘: ‘black‘, ‘value‘: ‘#000‘}"],

]

).toDF(‘id‘, ‘colors‘)

df.show(truncate=False)

df.printSchema()

df = df.withColumn("colors",

F.from_json(df.colors,

T.MapType(T.StringType(),

T.StringType())))

df.show(truncate=False)

df.printSchema()

df = df.withColumn("colors", F.to_json(df.colors))

df.show(truncate=False)

df.printSchema()

df = df.select(‘id‘, F.json_tuple(F.col("colors"),

"color", "value")

).toDF(‘id‘, ‘color‘,

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。如需转载,请注明文章出处豆丁博客和来源网址。https://shluqu.cn/21611.html
点赞
0.00 平均评分 (0% 分数) - 0