如何使用PySpark对多组数据执行K-Means聚类分析

聖光之護
发布: 2025-10-26 09:45:22
原创
814人浏览过

如何使用pyspark对多组数据执行k-means聚类分析

本文旨在解决PySpark中对不同类别数据独立执行K-Means聚类时遇到的`SparkSession`序列化错误。我们将深入探讨Spark的驱动器-执行器架构,解释为何不能在执行器中调用`createDataFrame`等`SparkSession`操作。文章将提供一个基于Spark ML库的解决方案,通过迭代方式在驱动器上为每个类别独立运行K-Means,并给出详细的代码示例和注意事项,帮助读者正确高效地实现分类数据聚类任务。

在PySpark中,对数据进行K-Means聚类是常见的机器学习任务。当需要针对数据集中的不同类别(或分组)独立执行K-Means时,开发者可能会遇到一些挑战,尤其是涉及到Spark的分布式执行模型和对象序列化问题。一个常见的错误是尝试在Spark执行器(executor)中调用SparkSession相关的方法,例如createDataFrame,这会导致pickle.PicklingError。

理解Spark的分布式执行与序列化

Spark采用驱动器-执行器(Driver-Executor)架构。

  • 驱动器(Driver):负责运行应用程序的main函数,创建SparkSession,调度任务,并协调执行器的工作。所有SparkSession对象都存在于驱动器上。
  • 执行器(Executor):运行在工作节点上,负责执行由驱动器分配的任务。当驱动器将任务发送给执行器时,任务中的所有对象(包括函数、变量等)都必须能够被序列化(pickled),以便通过网络传输到执行器。

SparkSession是一个复杂的、与JVM紧密关联的驱动器端对象。它无法被序列化并发送到执行器。因此,任何尝试在执行器中(例如,在一个RDD的map或foreach转换中)直接引用或使用SparkSession对象来创建新的DataFrame,都将导致序列化错误。

为什么sparkSession.createDataFrame在执行器中会失败?

在您提供的原始代码片段中,kmeans函数被设计为在RDD的map操作中执行:

groupedData.rdd.map(lambda row: kmeans(row.point_list, row.category))

def kmeans(points, category):
  # ...
  df = sparkSession.createDataFrame([(Vectors.dense(x),) for x in points], ["features"])
  # ...
登录后复制

这里的kmeans函数会在执行器上运行。当它尝试调用sparkSession.createDataFrame时,执行器会发现它没有一个可用的sparkSession实例,或者更准确地说,它无法反序列化从驱动器传递过来的sparkSession引用。这就是导致pickle.PicklingError和Py4JError的根本原因。createDataFrame需要一个活动的SparkSession实例来构建DataFrame,而这个实例只能在驱动器上访问。

使用Spark MLlib/ML实现按类别K-Means聚类

为了正确地在PySpark中实现按类别K-Means聚类,同时避免上述序列化问题,我们应该将SparkSession相关的操作保留在驱动器上。以下是一种推荐的实现方法,它利用Spark ML库的K-Means算法,并在驱动器上迭代处理每个类别。

聚好用AI
聚好用AI

可免费AI绘图、AI音乐、AI视频创作,聚集全球顶级AI,一站式创意平台

聚好用AI 115
查看详情 聚好用AI

1. 初始化Spark会话并加载数据

首先,确保您的Spark会话已正确初始化,并且能够访问Hive表。

from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, DoubleType

# 初始化SparkSession并启用Hive支持
spark = SparkSession.builder \
    .appName("PerCategoryKMeans") \
    .enableHiveSupport() \
    .getOrCreate()

# 从Hive表加载原始数据
# 假设您的Hive表 'my_table' 包含 'category' 字符串列和 'point' 数组(或列表)列
# 'point' 列的每个元素代表一个数据点的特征向量,例如 [1.0, 2.0, 3.0]
rawData = spark.sql('select category, point from my_table')

# 打印数据模式以确认 'point' 列的类型
rawData.printSchema()
# 示例:
# root
#  |-- category: string (nullable = true)
#  |-- point: array (nullable = true)
#  |    |-- element: double (containsNull = true)
登录后复制

2. 数据预处理:将特征转换为Vector类型

Spark ML库的K-Means算法要求输入DataFrame包含一个features列,其类型为VectorUDT(即pyspark.ml.linalg.Vector)。如果您的point列已经是数值数组类型(ArrayType(DoubleType)),我们需要将其转换为VectorUDT。

# 定义一个UDF,将Python列表(或ArrayType)转换为Spark的VectorUDT
# VectorUDT 是pyspark.ml.linalg.Vector的内部表示类型
array_to_vector_udf = udf(lambda arr: Vectors.dense(arr), VectorUDT())

# 将 'point' 列转换为 'features' 列,类型为VectorUDT
preparedData = rawData.withColumn("features", array_to_vector_udf(col("point")))

preparedData.printSchema()
# 示例:
# root
#  |-- category: string (nullable = true)
#  |-- point: array (nullable = true)
#  |    |-- element: double (containsNull = true)
#  |-- features: vector (nullable = true)
登录后复制

如果point列是一个单一的数值列,或者有多个独立的数值列需要组合成特征向量,则应使用VectorAssembler:

# 假设 'point_x', 'point_y' 是独立的数值列
# assembler = VectorAssembler(inputCols=["point_x", "point_y"], outputCol="features")
# preparedData = assembler.transform(rawData)
登录后复制

请根据您的实际数据结构选择合适的特征转换方法。

3. 迭代执行K-Means聚类

接下来,我们将在驱动器上迭代处理每个类别。这种方法虽然在驱动器上循环,但每次K-Means的fit和transform操作仍然会利用Spark集群的分布式能力。

# 获取所有不重复的类别
categories = preparedData.select("category").distinct().collect()

all_results = {} # 用于存储所有类别的聚类结果

# 遍历每个类别
for row in categories:
    category = row.category
    print(f"--- 正在处理类别: {category} ---")

    # 过滤出当前类别的数据
    category_df = preparedData.filter(col("category") == category)

    # 检查当前类别是否有足够的数据进行聚类
    # K-Means通常需要至少k个点,或者更多,以获得有意义
登录后复制

以上就是如何使用PySpark对多组数据执行K-Means聚类分析的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号