
本文档旨在指导开发者如何使用 PySpark 并行处理多个视频文件,实现大规模视频分析。内容涵盖环境配置、依赖安装、视频元数据提取、帧提取、人脸检测以及目标追踪等关键步骤,并提供可直接运行的 PySpark 代码示例,帮助读者快速上手并应用于实际项目中。
在开始之前,请确保已安装以下软件和库:
接下来,使用 pip 和 conda 安装所需的 Python 库:
pip install ffmpeg-python pip install face-recognition conda install -c conda-forge opencv
以下代码展示了如何使用 PySpark 并行读取视频文件,提取帧,进行人脸检测和目标追踪。
from pyspark import SQLContext, SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 创建 SparkSession
conf = SparkConf().setAppName("myApp").setMaster("local[40]")
spark = SparkSession.builder.master("local[40]").config("spark.driver.memory", "30g").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
import cv2
import os
import uuid
import ffmpeg
import subprocess
import numpy as np
from scipy.optimize import linear_sum_assignment
import pyspark.sql.functions as F
from pyspark.sql import Row, DataFrame, SparkSession
import pathlib
# 指定视频文件目录
input_dir = "../data/video_files/faces/"
# 获取视频文件列表
pathlist = list(pathlib.Path(input_dir).glob('*.mp4'))
pathlist = [Row(str(ele)) for ele in pathlist]
# 创建 DataFrame
column_name = ["video_uri"]
df = sqlContext.createDataFrame(data=pathlist, schema=column_name)
print("Initial dataframe")
df.show(10, truncate=False)
# 定义视频元数据 Schema
video_metadata = StructType([
    StructField("width", IntegerType(), False),
    StructField("height", IntegerType(), False),
    StructField("num_frames", IntegerType(), False),
    StructField("duration", FloatType(), False)
])
# 定义 Shots Schema
shots_schema = ArrayType(
    StructType([
        StructField("start", FloatType(), False),
        StructField("end", FloatType(), False)
    ]))
# UDF: 视频元数据提取
@F.udf(returnType=video_metadata)
def video_probe(uri):
    probe = ffmpeg.probe(uri, threads=1)
    video_stream = next(
        (
            stream
            for stream in probe["streams"]
            if stream["codec_type"] == "video"
        ),
        None,
    )
    width = int(video_stream["width"])
    height = int(video_stream["height"])
    num_frames = int(video_stream["nb_frames"])
    duration = float(video_stream["duration"])
    return (width, height, num_frames, duration)
# UDF: 视频帧提取
@F.udf(returnType=ArrayType(BinaryType()))
def video2images(uri, width, height,
                 sample_rate: int = 5,
                 start: float = 0.0,
                 end: float = -1.0,
                 n_channels: int = 3):
    """
    Uses FFmpeg filters to extract image byte arrays
    and sampled & localized to a segment of video in time.
    """
    video_data, _ = (
        ffmpeg.input(uri, threads=1)
        .output(
            "pipe:",
            format="rawvideo",
            pix_fmt="rgb24",
            ss=start,
            t=end - start,
            r=1 / sample_rate,
        ).run(capture_stdout=True))
    img_size = height * width * n_channels
    return [video_data[idx:idx + img_size] for idx in range(0, len(video_data), img_size)]
# 添加元数据列
df = df.withColumn("metadata", video_probe(F.col("video_uri")))
print("With Metadata")
df.show(10, truncate=False)
# 提取帧
df = df.withColumn("frame", F.explode(
    video2images(F.col("video_uri"), F.col("metadata.width"), F.col("metadata.height"), F.lit(1), F.lit(0.0),
                 F.lit(5.0))))
import face_recognition
# 定义 Bounding Box Schema
box_struct = StructType(
    [
        StructField("xmin", IntegerType(), False),
        StructField("ymin", IntegerType(), False),
        StructField("xmax", IntegerType(), False),
        StructField("ymax", IntegerType(), False)
    ]
)
# Bounding Box Helper
def bbox_helper(bbox):
    top, right, bottom, left = bbox
    bbox = [top, left, bottom, right]
    return list(map(lambda x: max(x, 0), bbox))
# UDF: 人脸检测
@F.udf(returnType=ArrayType(box_struct))
def face_detector(img_data, width=1920, height=1080, n_channels=3):
    img = np.frombuffer(img_data, np.uint8).reshape(height, width, n_channels)
    faces = face_recognition.face_locations(img)
    return [bbox_helper(f) for f in faces]
# 添加人脸检测列
df = df.withColumn("faces", face_detector(F.col("frame"), F.col("metadata.width"), F.col("metadata.height")))
# 定义 Annotation Schema
annot_schema = ArrayType(
    StructType(
        [
            StructField("bbox", box_struct, False),
            StructField("tracker_id", StringType(), False),
        ]
    )
)
# Bounding Box IoU 计算
def bbox_iou(b1, b2):
    L = list(zip(b1, b2))
    left, top = np.max(L, axis=1)[:2]
    right, bottom = np.min(L, axis=1)[2:]
    if right < left or bottom < top:
        return 0
    b_area = lambda b: (b[2] - b[0]) * (b[3] - b[1])
    inter_area = b_area([left, top, right, bottom])
    b1_area, b2_area = b_area(b1), b_area(b2)
    iou = inter_area / float(b1_area + b2_area - inter_area)
    return iou
# UDF: 目标匹配
@F.udf(returnType=MapType(IntegerType(), IntegerType()))
def tracker_match(trackers, detections, bbox_col="bbox", threshold=0.3):
    """
    Match Bounding Boxes across successive image frames.
    Parameters
        ----------
        trackers : List of Box2dType with str identifier
            A column of tracked objects.
        detections: List of Box2dType without tracker id matching
            The list of unmatched detections.
        bbox_col: str
                A string to name the column of bounding boxes.
        threshold : Float
                IOU of Box2d objects exceeding threshold will be matched.
        Return
        ------
        MapType
            Returns a MapType matching indices of trackers and detections.
    """
    from scipy.optimize import linear_sum_assignment
    similarity = bbox_iou  # lambda a, b: a.iou(b)
    if not trackers or not detections:
        return {}
    if len(trackers) == len(detections) == 1:
        if (
                similarity(trackers[0][bbox_col], detections[0][bbox_col])
                >= threshold
        ):
            return {0: 0}
    sim_mat = np.array(
        [
            [
                similarity(tracked[bbox_col], detection[bbox_col])
                for tracked in trackers
            ]
            for detection in detections
        ],
        dtype=np.float32,
    )
    matched_idx = linear_sum_assignment(-sim_mat)
    matches = []
    for m in matched_idx:
        try:
            if sim_mat[m[0], m[1]] >= threshold:
                matches.append(m.reshape(1, 2))
        except:
            pass
    if len(matches) == 0:
        return {}
    else:
        matches = np.concatenate(matches, axis=0, dtype=int)
    rows, cols = zip(*np.where(matches))
    idx_map = {cols[idx]: rows[idx] for idx in range(len(rows))}
    return idx_map
# UDF: 光流运动模型
@F.udf(returnType=ArrayType(box_struct))
def OFMotionModel(frame, prev_frame, bboxes, height, width):
    if not prev_frame:
        prev_frame = frame
    gray = cv2.cvtColor(np.frombuffer(frame, np.uint8).reshape(height, width, 3), cv2.COLOR_BGR2GRAY)
    prev_gray = cv2.cvtColor(np.frombuffer(prev_frame, np.uint8).reshape(height, width, 3), cv2.COLOR_BGR2GRAY)
    inst = cv2.DISOpticalFlow.create(cv2.DISOPTICAL_FLOW_PRESET_MEDIUM)
    inst.setUseSpatialPropagation(False)
    flow = inst.calc(prev_gray, gray, None)
    h, w = flow.shape[:2]
    shifted_boxes = []
    for box in bboxes:
        xmin, ymin, xmax, ymax = box
        avg_y = np.mean(flow[int(ymin):int(ymax), int(xmin):int(xmax), 0])
        avg_x = np.mean(flow[int(ymin):int(ymax), int(xmin):int(xmax), 1])
        shifted_boxes.append(
            {"xmin": int(max(0, xmin + avg_x)), "ymin": int(max(0, ymin + avg_y)), "xmax": int(min(w, xmax + avg_x)),
             "ymax": int(min(h, ymax + avg_y))})
    return shifted_boxes
# 匹配 annotations
def match_annotations(iterator, segment_id="video_uri", id_col="tracker_id"):
    """
    Used by mapPartitions to iterate over the small chunks of our hierarchically-organized data.
    """
    matched_annots = []
    for idx, data in enumerate(iterator):
        data = data[1]
        if not idx:
            old_row = {idx: uuid.uuid4() for idx in range(len(data[1]))}
            old_row[segment_id] = data[0]
            pass
        annots = []
        curr_row = {segment_id: data[0]}
        if old_row[segment_id] != curr_row[segment_id]:
            old_row = {}
        if data[2] is not None:
            for ky, vl in data[2].items():
                detection = data[1][vl].asDict()
                detection[id_col] = old_row.get(ky, uuid.uuid4())
                curr_row[vl] = detection[id_col]
                annots.append(Row(**detection))
        matched_annots.append(annots)
        old_row = curr_row
    return matched_annots
# 追踪 detections
def track_detections(df, segment_id="video_uri", frames="frame", detections="faces", optical_flow=True):
    id_col = "tracker_id"
    frame_window = Window().orderBy(frames)
    value_window = Window().orderBy("value")
    annot_window = Window.partitionBy(segment_id).orderBy(segment_id, frames)
    indexer = StringIndexer(inputCol=segment_id, outputCol="vidIndex")
    # adjust detections w/ optical flow
    if optical_flow:
        df = (
            df.withColumn("prev_frames", F.lag(F.col(frames)).over(annot_window))
            .withColumn(detections, OFMotionModel(F.col(frames), F.col("prev_frames"), F.col(detections), F.col("metadata.height"), F.col("metadata.width")))
        )
    df = (
        df.select(segment_id, frames, detections)
        .withColumn("bbox", F.explode(detections))
        .withColumn(id_col, F.lit(""))
        .withColumn("trackables", F.struct([F.col("bbox"), F.col(id_col)]))
        .groupBy(segment_id, frames, detections)
        .agg(F.collect_list("trackables").alias("trackables"))
        .withColumn(
            "old_trackables", F.lag(F.col("trackables")).over(annot_window)
        )
        .withColumn(
            "matched",
            tracker_match(F.col("trackables"), F.col("old_trackables")),
        )
        .withColumn("frame_index", F.row_number().over(frame_window))
    )
    df = (
        indexer.fit(df)
        .transform(df)
        .withColumn("vidIndex", F.col("vidIndex").cast(StringType()))
    )
    unique_ids = df.select("vidIndex").distinct().count()
    matched = (
        df.select("vidIndex", segment_id, "trackables", "matched")
        .rdd.map(lambda x: (x[0], x[1:]))
        .partitionBy(unique_ids, lambda x: int(x[0]))
        .mapPartitions(match_annotations)
    )
    matched_annotations = sqlContext.createDataFrame(matched, annot_schema).withColumn("value_index",
                                                                                       F.row_number().over(
                                                                                           value_window))
    return (
        df.join(matched_annotations, F.col("value_index") == F.col("frame_index"))
        .withColumnRenamed("value", "trackers_matched")
        .withColumn("tracked", F.explode(F.col("trackers_matched")))
        .select(
            segment_id,
            frames,
            detections,
            F.col("tracked.{}".format("bbox")).alias("bbox"),
            F.col("tracked.{}".format(id_col)).alias(id_col),
        )
        .withColumn(id_col, F.sha2(F.concat(F.col(segment_id), F.col(id_col)), 256))
        .withColumn("tracked_detections", F.struct([F.col("bbox"), F.col(id_col)]))
        .groupBy(segment_id, frames, detections)
        .agg(F.collect_list("tracked_detections").alias("tracked_detections"))
        .orderBy(segment_id, frames, detections)
    )
# 定义 DetectionTracker Transformer
from pyspark import keyword_only
from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
class DetectionTracker(Transformer, HasInputCol, HasOutputCol):
    """Detect and track."""
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, framesCol=None, detectionsCol=None, optical_flow=None):
        """Initialize."""
        super(DetectionTracker, self).__init__()
        self.framesCol = Param(self, "framesCol", "Column containing frames.")
        self.detectionsCol = Param(self, "detectionsCol", "Column containing detections.")
        self.optical_flow = Param(self, "optical_flow", "Use optical flow for tracker correction. Default is False")
        self._setDefault(framesCol="frame", detectionsCol="faces", optical_flow=False)
        kwargs = self._input_kwargs
        self.setParams(**kwargs)
    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, framesCol=None, detectionsCol=None, optical_flow=None):
        """Get params."""
        kwargs = self._input_kwargs
        return self._set(**kwargs)
    def setFramesCol(self, value):
        """Set framesCol."""
        return self._set(framesCol=value)
    def getFramesCol(self):
        """Get framesCol."""
        return self.getOrDefault(self.framesCol)
    def setDetectionsCol(self, value):
        """Set detectionsCol."""
        return self._set(detectionsCol=value)
    def getDetectionsCol(self):
        """Get detectionsCol."""
        return self.getOrDefault(self.detectionsCol)
    def setOpticalflow(self, value):
        """Set optical_flow."""
        return self._set(optical_flow=value)
    def getOpticalflow(self):
        """Get optical_flow."""
        return self.getOrDefault(self.optical_flow)
    def _transform(self, dataframe):
        """Do transformation."""
        input_col = self.getInputCol()
        output_col = self.getOutputCol()
        frames_col = self.getFramesCol()
        detections_col = self.getDetectionsCol()
        optical_flow = self.getOpticalflow()
        id_col = "tracker_id"
        frame_window = Window().orderBy(frames_col)
        value_window = Window().orderBy("value")
        annot_window = Window.partitionBy(input_col).orderBy(input_col, frames_col)
        indexer = StringIndexer(inputCol=input_col, outputCol="vidIndex")
        # adjust detections w/ optical flow
        if optical_flow:
            dataframe = (
                dataframe.withColumn("prev_frames", F.lag(F.col(frames_col)).over(annot_window))
                .withColumn(detections_col,
                            OFMotionModel(F.col(frames_col), F.col("prev_frames"), F.col(detections_col)))
            )
        dataframe = (
            dataframe.select(input_col, frames_col, detections_col)
            .withColumn("bbox", F.explode(detections_col))
            .withColumn(id_col, F.lit(""))
            .withColumn("trackables", F.struct([F.col("bbox"), F.col(id_col)]))
            .groupBy(input_col, frames_col, detections_col)
            .agg(F.collect_list("trackables").alias("trackables"))
            .withColumn(
                "old_trackables", F.lag(F.col("trackables")).over(annot_window)
            )
            .withColumn(
                "matched",
                tracker_match(F.col("trackables"), F.col("old_trackables")),
            )
            .withColumn("frame_index", F.row_number().over(frame_window))
        )
        dataframe = (
            indexer.fit(dataframe)
            .transform(dataframe)
            .withColumn("vidIndex", F.col("vidIndex").cast(StringType()))
        )
        unique_ids = dataframe.select("vidIndex").distinct().count()
        matched = (
            dataframe.select("vidIndex", input_col, "trackables", "matched")
            .rdd.map(lambda x: (x[0], x[1:]))
            .partitionBy(unique_ids, lambda x: int(x[0]))
            .mapPartitions(match_annotations)
        )
        matched_annotations = sqlContext.createDataFrame(matched, annot_schema).withColumn("value_index",
                                                                                           F.row_number().over(
                                                                                               value_window))
        return (
            dataframe.join(matched_annotations, F.col("value_index") == F.col("frame_index"))
            .withColumnRenamed("value", "trackers_matched")
            .withColumn("tracked", F.explode(F.col("trackers_matched")))
            .select(
                input_col,
                frames_col,
                detections_col,
                F.col("tracked.{}".format("bbox")).alias("bbox"),
                F.col("tracked.{}".format(id_col)).alias(id_col),
            )
            .withColumn(id_col, F.sha2(F.concat(F.col(input_col), F.col(id_col)), 256))
            .withColumn(output_col, F.struct([F.col("bbox"), F.col(id_col)]))
            .groupBy(input_col, frames_col, detections_col)
            .agg(F.collect_list(output_col).alias(output_col))
            .orderBy(input_col, frames_col, detections_col)
        )
# 创建 DetectionTracker 实例
detectTracker = DetectionTracker(inputCol="video_uri", outputCol="tracked_detections")
print(type(detectTracker))
# 应用 Transformer
detectTracker.transform(df)
final = track_detections(df)
print("Final dataframe")
final.select("tracked_detections").show(100, truncate=False)本文提供了一个使用 PySpark 并行处理视频文件的完整示例,涵盖了视频分析的多个关键步骤,包括元数据提取、帧提取、人脸检测和目标追踪。 通过学习和实践本文档,开发者可以掌握使用 PySpark 进行大规模视频分析的基本技能,并将其应用于实际项目中。
以上就是并行处理视频:使用 PySpark 实现大规模视频分析的详细内容,更多请关注php中文网其它相关文章!
                        
                        每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
                Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号