使用 PySpark 从 JSON 对象中选择并透视数据

霞舞
发布: 2025-08-19 15:22:01
原创
173人浏览过

使用 pyspark 从 json 对象中选择并透视数据

本文档介绍了如何使用 PySpark 从包含属性和值的 JSON 对象中提取特定列,并将其透视为所需格式。通过创建 DataFrame 和使用 Spark SQL,我们可以灵活地选择和转换数据,最终得到以指定属性名作为列名的结果。本文提供详细步骤和示例代码,帮助你轻松完成数据提取和转换任务。

使用 PySpark 处理 JSON 数据并进行透视

在数据处理中,经常需要从 JSON 数据中提取特定字段,并将其转换为更易于分析的格式。当 JSON 数据包含具有属性和值的对象数组时,例如 Oracle REST API 的响应,我们可以使用 PySpark 来选择所需的列,并将其透视为以属性名作为列名的形式。

以下是如何使用 PySpark 实现此目标的步骤:

1. 创建 DataFrame

首先,你需要使用 JSON 数据创建一个 DataFrame。假设你已经将 JSON 数据存储在变量 json_data 中,可以使用以下代码创建 DataFrame:

from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("JSONPivot").getOrCreate()

df = spark.read.json(spark.sparkContext.parallelize([json_data]))

# 示例 JSON 数据 (替换为你实际的数据)
json_data = """
[
    {
        "attributeId": 300000000227671,
        "attributeName": "BUSINESS_UNIT",
        "attributeType": "Number",
        "attributeValue": "300000207138371",
        "timeBuildingBlockId": 300000300319699,
        "timeBuildingBlockVersion": 1
    },
    {
        "attributeId": 300000000227689,
        "attributeName": "LOG_ID",
        "attributeType": "Number",
        "attributeValue": "300000001228038",
        "timeBuildingBlockId": 300000300319699,
        "timeBuildingBlockVersion": 1
    }
]
"""

df = spark.read.json(spark.sparkContext.parallelize([json_data]))

df.printSchema()
df.show()
登录后复制

这段代码首先创建了一个 SparkSession,这是与 Spark 集群交互的入口点。然后,它使用 spark.read.json() 方法从 json_data 读取 JSON 数据,并将其转换为 DataFrame。spark.sparkContext.parallelize([json_data]) 用于将 JSON 数据转换为 RDD,然后 spark.read.json() 可以从 RDD 读取数据。 df.printSchema() 打印 DataFrame 的结构,df.show() 显示 DataFrame 的内容。

2. 创建临时视图

为了能够使用 Spark SQL 查询 DataFrame,需要创建一个临时视图:

df.createOrReplaceTempView("myTable")
登录后复制

这将创建一个名为 "myTable" 的临时视图,你可以使用 Spark SQL 查询它。

Find JSON Path Online
Find JSON Path Online

Easily find JSON paths within JSON objects using our intuitive Json Path Finder

Find JSON Path Online 30
查看详情 Find JSON Path Online

3. 使用 Spark SQL 进行透视

现在,可以使用 Spark SQL 查询临时视图,以提取所需的列并进行透视。以下是一个示例查询,用于提取 "LOG_ID" 和 "BUSINESS_UNIT" 的 attributeValue:

result = spark.sql("""
    SELECT
        MAX(CASE WHEN attributeName = 'LOG_ID' THEN attributeValue END) AS LOG_ID,
        MAX(CASE WHEN attributeName = 'BUSINESS_UNIT' THEN attributeValue END) AS BUSINESS_UNIT
    FROM myTable
""")

result.show()
登录后复制

这个 SQL 查询使用 CASE WHEN 语句来根据 attributeName 的值选择相应的 attributeValue。 MAX() 函数用于处理可能存在多个具有相同 attributeName 的情况,并确保每个属性只有一个值。AS 关键字用于为结果列指定别名。

完整代码示例

from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("JSONPivot").getOrCreate()

# 示例 JSON 数据 (替换为你实际的数据)
json_data = """
[
    {
        "attributeId": 300000000227671,
        "attributeName": "BUSINESS_UNIT",
        "attributeType": "Number",
        "attributeValue": "300000207138371",
        "timeBuildingBlockId": 300000300319699,
        "timeBuildingBlockVersion": 1
    },
    {
        "attributeId": 300000000227689,
        "attributeName": "LOG_ID",
        "attributeType": "Number",
        "attributeValue": "300000001228038",
        "timeBuildingBlockId": 300000300319699,
        "timeBuildingBlockVersion": 1
    }
]
"""

# 创建 DataFrame
df = spark.read.json(spark.sparkContext.parallelize([json_data]))

# 创建临时视图
df.createOrReplaceTempView("myTable")

# 使用 Spark SQL 进行透视
result = spark.sql("""
    SELECT
        MAX(CASE WHEN attributeName = 'LOG_ID' THEN attributeValue END) AS LOG_ID,
        MAX(CASE WHEN attributeName = 'BUSINESS_UNIT' THEN attributeValue END) AS BUSINESS_UNIT
    FROM myTable
""")

# 显示结果
result.show()

# 停止 SparkSession
spark.stop()
登录后复制

注意事项

  • 确保你的 JSON 数据格式正确,并且包含所需的 attributeName 和 attributeValue 字段。
  • 根据你的实际需求修改 SQL 查询,以提取所需的列和进行透视。
  • 如果 JSON 数据非常大,可以考虑使用分区来提高查询性能。
  • 在实际应用中,可能需要处理缺失值或错误数据。可以使用 fillna() 或 filter() 方法来处理这些情况。
  • 记得在完成操作后停止 SparkSession,释放资源。

总结

通过使用 PySpark 创建 DataFrame 和使用 Spark SQL,我们可以轻松地从 JSON 对象中选择和透视数据。这种方法非常灵活,可以根据你的实际需求进行定制。希望本文档能够帮助你解决数据提取和转换问题。

以上就是使用 PySpark 从 JSON 对象中选择并透视数据的详细内容,更多请关注php中文网其它相关文章!

相关标签:
最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
热门推荐
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号