
本文档介绍了如何使用 PySpark 并行处理多个视频文件,并进行人脸识别等视频分析任务。我们将探讨如何利用 Spark 的分布式计算能力,高效地从视频中提取帧,检测人脸,并进行人脸追踪。本文提供了详细的代码示例和步骤,帮助读者理解和应用 PySpark 进行大规模视频处理。
首先,确保你的环境中安装了必要的 Python 库和 FFmpeg。
安装 Python 库:
pip install ffmpeg-python pip install face-recognition conda install -c conda-forge opencv
安装 FFmpeg:
确保你的系统上安装了 FFmpeg。具体的安装方法取决于你的操作系统。例如,在 Ubuntu 上可以使用以下命令:
sudo apt-get update sudo apt-get install ffmpeg
以下代码展示了如何使用 PySpark 并行读取视频文件,提取帧,并进行人脸检测和追踪。
from pyspark import SQLContext, SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 配置 Spark
conf = SparkConf().setAppName("myApp").setMaster("local[40]")
spark = SparkSession.builder.master("local[40]").config("spark.driver.memory", "30g").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
import cv2
import os
import uuid
import ffmpeg
import subprocess
import numpy as np
from scipy.optimize import linear_sum_assignment
import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql.types import (StructType, StructField,
IntegerType, FloatType,
ArrayType, BinaryType,
MapType, DoubleType, StringType)
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer
from pyspark.sql import Row, DataFrame, SparkSession
import pathlib
# 指定视频文件目录
input_dir = "../data/video_files/faces/"
pathlist = list(pathlib.Path(input_dir).glob('*.mp4'))
pathlist = [Row(str(ele)) for ele in pathlist]
# 创建 DataFrame
column_name = ["video_uri"]
df = sqlContext.createDataFrame(data=pathlist, schema=column_name)
print("Initial dataframe")
df.show(10, truncate=False)
# 定义视频元数据 Schema
video_metadata = StructType([
StructField("width", IntegerType(), False),
StructField("height", IntegerType(), False),
StructField("num_frames", IntegerType(), False),
StructField("duration", FloatType(), False)
])
# 定义 Shot Schema
shots_schema = ArrayType(
StructType([
StructField("start", FloatType(), False),
StructField("end", FloatType(), False)
]))
# 定义 UDF:提取视频元数据
@F.udf(returnType=video_metadata)
def video_probe(uri):
probe = ffmpeg.probe(uri, threads=1)
video_stream = next(
(
stream
for stream in probe["streams"]
if stream["codec_type"] == "video"
),
None,
)
width = int(video_stream["width"])
height = int(video_stream["height"])
num_frames = int(video_stream["nb_frames"])
duration = float(video_stream["duration"])
return (width, height, num_frames, duration)
# 定义 UDF:提取视频帧
@F.udf(returnType=ArrayType(BinaryType()))
def video2images(uri, width, height,
sample_rate: int = 5,
start: float = 0.0,
end: float = -1.0,
n_channels: int = 3):
"""
Uses FFmpeg filters to extract image byte arrays
and sampled & localized to a segment of video in time.
"""
video_data, _ = (
ffmpeg.input(uri, threads=1)
.output(
"pipe:",
format="rawvideo",
pix_fmt="rgb24",
ss=start,
t=end - start,
r=1 / sample_rate,
).run(capture_stdout=True))
img_size = height * width * n_channels
return [video_data[idx:idx + img_size] for idx in range(0, len(video_data), img_size)]
# 添加视频元数据列
df = df.withColumn("metadata", video_probe(F.col("video_uri")))
print("With Metadata")
df.show(10, truncate=False)
# 提取视频帧
df = df.withColumn("frame", F.explode(
video2images(F.col("video_uri"), F.col("metadata.width"), F.col("metadata.height"), F.lit(1), F.lit(0.0),
F.lit(5.0))))
# 定义人脸检测相关 Schema 和 UDF
box_struct = StructType(
[
StructField("xmin", IntegerType(), False),
StructField("ymin", IntegerType(), False),
StructField("xmax", IntegerType(), False),
StructField("ymax", IntegerType(), False)
]
)
def bbox_helper(bbox):
top, right, bottom, left = bbox
bbox = [top, left, bottom, right]
return list(map(lambda x: max(x, 0), bbox))
@F.udf(returnType=ArrayType(box_struct))
def face_detector(img_data, width=1920, height=1080, n_channels=3):
img = np.frombuffer(img_data, np.uint8).reshape(height, width, n_channels)
faces = face_recognition.face_locations(img)
return [bbox_helper(f) for f in faces]
# 进行人脸检测
df = df.withColumn("faces", face_detector(F.col("frame"), F.col("metadata.width"), F.col("metadata.height")))
# 定义人脸追踪相关 Schema 和 UDF
annot_schema = ArrayType(
StructType(
[
StructField("bbox", box_struct, False),
StructField("tracker_id", StringType(), False),
]
)
)
def bbox_iou(b1, b2):
L = list(zip(b1, b2))
left, top = np.max(L, axis=1)[:2]
right, bottom = np.min(L, axis=1)[2:]
if right < left or bottom < top:
return 0
b_area = lambda b: (b[2] - b[0]) * (b[3] - b[1])
inter_area = b_area([left, top, right, bottom])
b1_area, b2_area = b_area(b1), b_area(b2)
iou = inter_area / float(b1_area + b2_area - inter_area)
return iou
@F.udf(returnType=MapType(IntegerType(), IntegerType()))
def tracker_match(trackers, detections, bbox_col="bbox", threshold=0.3):
"""
Match Bounding Boxes across successive image frames.
"""
from scipy.optimize import linear_sum_assignment
similarity = bbox_iou
if not trackers or not detections:
return {}
if len(trackers) == len(detections) == 1:
if (
similarity(trackers[0][bbox_col], detections[0][bbox_col])
>= threshold
):
return {0: 0}
sim_mat = np.array(
[
[
similarity(tracked[bbox_col], detection[bbox_col])
for tracked in trackers
]
for detection in detections
],
dtype=np.float32,
)
matched_idx = linear_sum_assignment(-sim_mat)
matches = []
for m in matched_idx:
try:
if sim_mat[m[0], m[1]] >= threshold:
matches.append(m.reshape(1, 2))
except:
pass
if len(matches) == 0:
return {}
else:
matches = np.concatenate(matches, axis=0, dtype=int)
rows, cols = zip(*np.where(matches))
idx_map = {cols[idx]: rows[idx] for idx in range(len(rows))}
return idx_map
@F.udf(returnType=ArrayType(box_struct))
def OFMotionModel(frame, prev_frame, bboxes, height, width):
if not prev_frame:
prev_frame = frame
gray = cv2.cvtColor(np.frombuffer(frame, np.uint8).reshape(height, width, 3), cv2.COLOR_BGR2GRAY)
prev_gray = cv2.cvtColor(np.frombuffer(prev_frame, np.uint8).reshape(height, width, 3), cv2.COLOR_BGR2GRAY)
inst = cv2.DISOpticalFlow.create(cv2.DISOPTICAL_FLOW_PRESET_MEDIUM)
inst.setUseSpatialPropagation(False)
flow = inst.calc(prev_gray, gray, None)
h, w = flow.shape[:2]
shifted_boxes = []
for box in bboxes:
xmin, ymin, xmax, ymax = box
avg_y = np.mean(flow[int(ymin):int(ymax), int(xmin):int(xmax), 0])
avg_x = np.mean(flow[int(ymin):int(ymax), int(xmin):int(xmax), 1])
shifted_boxes.append(
{"xmin": int(max(0, xmin + avg_x)), "ymin": int(max(0, ymin + avg_y)), "xmax": int(min(w, xmax + avg_x)),
"ymax": int(min(h, ymax + avg_y))})
return shifted_boxes
def match_annotations(iterator, segment_id="video_uri", id_col="tracker_id"):
"""
Used by mapPartitions to iterate over the small chunks of our hierarchically-organized data.
"""
matched_annots = []
for idx, data in enumerate(iterator):
data = data[1]
if not idx:
old_row = {idx: uuid.uuid4() for idx in range(len(data[1]))}
old_row[segment_id] = data[0]
pass
annots = []
curr_row = {segment_id: data[0]}
if old_row[segment_id] != curr_row[segment_id]:
old_row = {}
if data[2] is not None:
for ky, vl in data[2].items():
detection = data[1][vl].asDict()
detection[id_col] = old_row.get(ky, uuid.uuid4())
curr_row[vl] = detection[id_col]
annots.append(Row(**detection))
matched_annots.append(annots)
old_row = curr_row
return matched_annots
def track_detections(df, segment_id="video_uri", frames="frame", detections="faces", optical_flow=True):
id_col = "tracker_id"
frame_window = Window().orderBy(frames)
value_window = Window().orderBy("value")
annot_window = Window.partitionBy(segment_id).orderBy(segment_id, frames)
indexer = StringIndexer(inputCol=segment_id, outputCol="vidIndex")
# adjust detections w/ optical flow
if optical_flow:
df = (
df.withColumn("prev_frames", F.lag(F.col(frames)).over(annot_window))
.withColumn(detections, OFMotionModel(F.col(frames), F.col("prev_frames"), F.col(detections), F.col("metadata.height"), F.col("metadata.width")))
)
df = (
df.select(segment_id, frames, detections)
.withColumn("bbox", F.explode(detections))
.withColumn(id_col, F.lit(""))
.withColumn("trackables", F.struct([F.col("bbox"), F.col(id_col)]))
.groupBy(segment_id, frames, detections)
.agg(F.collect_list("trackables").alias("trackables"))
.withColumn(
"old_trackables", F.lag(F.col("trackables")).over(annot_window)
)
.withColumn(
"matched",
tracker_match(F.col("trackables"), F.col("old_trackables")),
)
.withColumn("frame_index", F.row_number().over(frame_window))
)
df = (
indexer.fit(df)
.transform(df)
.withColumn("vidIndex", F.col("vidIndex").cast(StringType()))
)
unique_ids = df.select("vidIndex").distinct().count()
matched = (
df.select("vidIndex", segment_id, "trackables", "matched")
.rdd.map(lambda x: (x[0], x[1:]))
.partitionBy(unique_ids, lambda x: int(x[0]))
.mapPartitions(match_annotations)
)
matched_annotations = sqlContext.createDataFrame(matched, annot_schema).withColumn("value_index",
F.row_number().over(
value_window))
return (
df.join(matched_annotations, F.col("value_index") == F.col("frame_index"))
.withColumnRenamed("value", "trackers_matched")
.withColumn("tracked", F.explode(F.col("trackers_matched")))
.select(
segment_id,
frames,
detections,
F.col("tracked.{}".format("bbox")).alias("bbox"),
F.col("tracked.{}".format(id_col)).alias(id_col),
)
.withColumn(id_col, F.sha2(F.concat(F.col(segment_id), F.col(id_col)), 256))
.withColumn("tracked_detections", F.struct([F.col("bbox"), F.col(id_col)]))
.groupBy(segment_id, frames, detections)
.agg(F.collect_list("tracked_detections").alias("tracked_detections"))
.orderBy(segment_id, frames, detections)
)
# 定义 DetectionTracker Transformer
from pyspark import keyword_only
from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
class DetectionTracker(Transformer, HasInputCol, HasOutputCol):
"""Detect and track."""
@keyword_only
def __init__(self, inputCol=None, outputCol=None, framesCol=None, detectionsCol=None, optical_flow=None):
"""Initialize."""
super(DetectionTracker, self).__init__()
self.framesCol = Param(self, "framesCol", "Column containing frames.")
self.detectionsCol = Param(self, "detectionsCol", "Column containing detections.")
self.optical_flow = Param(self, "optical_flow", "Use optical flow for tracker correction. Default is False")
self._setDefault(framesCol="frame", detectionsCol="faces", optical_flow=False)
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None, framesCol=None, detectionsCol=None, optical_flow=None):
"""Get params."""
kwargs = self._input_kwargs
return self._set(**kwargs)
def setFramesCol(self, value):
"""Set framesCol."""
return self._set(framesCol=value)
def getFramesCol(self):
"""Get framesCol."""
return self.getOrDefault(self.framesCol)
def setDetectionsCol(self, value):
"""Set detectionsCol."""
return self._set(detectionsCol=value)
def getDetectionsCol(self):
"""Get detectionsCol."""
return self.getOrDefault(self.detectionsCol)
def setOpticalflow(self, value):
"""Set optical_flow."""
return self._set(optical_flow=value)
def getOpticalflow(self):
"""Get optical_flow."""
return self.getOrDefault(self.optical_flow)
def _transform(self, dataframe):
"""Do transformation."""
input_col = self.getInputCol()
output_col = self.getOutputCol()
frames_col = self.getFramesCol()
detections_col = self.getDetectionsCol()
optical_flow = self.getOpticalflow()
id_col = "tracker_id"
frame_window = Window().orderBy(frames_col)
value_window = Window().orderBy("value")
annot_window = Window.partitionBy(input_col).orderBy(input_col, frames_col)
indexer = StringIndexer(inputCol=input_col, outputCol="vidIndex")
# adjust detections w/ optical flow
if optical_flow:
dataframe = (
dataframe.withColumn("prev_frames", F.lag(F.col(frames_col)).over(annot_window))
.withColumn(detections_col,
OFMotionModel(F.col(frames_col), F.col("prev_frames"), F.col(detections_col)))
)
dataframe = (
dataframe.select(input_col, frames_col, detections_col)
.withColumn("bbox", F.explode(detections_col))
.withColumn(id_col, F.lit(""))
.withColumn("trackables", F.struct([F.col("bbox"), F.col(id_col)]))
.groupBy(input_col, frames_col, detections_col)
.agg(F.collect_list("trackables").alias("trackables"))
.withColumn(
"old_trackables", F.lag(F.col("trackables")).over(annot_window)
)
.withColumn(
"matched",
tracker_match(F.col("trackables"), F.col("old_trackables")),
)
.withColumn("frame_index", F.row_number().over(frame_window))
)
dataframe = (
indexer.fit(dataframe)
.transform(dataframe)
.withColumn("vidIndex", F.col("vidIndex").cast(StringType()))
)
unique_ids = dataframe.select("vidIndex").distinct().count()
matched = (
dataframe.select("vidIndex", input_col, "trackables", "matched")
.rdd.map(lambda x: (x[0], x[1:]))
.partitionBy(unique_ids, lambda x: int(x[0]))
.mapPartitions(match_annotations)
)
matched_annotations = sqlContext.createDataFrame(matched, annot_schema).withColumn("value_index",
F.row_number().over(
value_window))
return (
dataframe.join(matched_annotations, F.col("value_index") == F.col("frame_index"))
.withColumnRenamed("value", "trackers_matched")
.withColumn("tracked", F.explode(F.col("trackers_matched")))
.select(
input_col,
frames_col,
detections_col,
F.col("tracked.{}".format("bbox")).alias("bbox"),
F.col("tracked.{}".format(id_col)).alias(id_col),
)
.withColumn(id_col, F.sha2(F.concat(F.col(input_col), F.col(id_col)), 256))
.withColumn(output_col, F.struct([F.col("bbox"), F.col(id_col)]))
.groupBy(input_col, frames_col, detections_col)
.agg(F.collect_list(output_col).alias(output_col))
.orderBy(input_col, frames_col, detections_col)
)
# 使用 DetectionTracker
detectTracker = DetectionTracker(inputCol="video_uri", outputCol="tracked_detections")
print(type(detectTracker))
detectTracker.transform(df)
final = track_detections(df)
print("Final dataframe")
final.select("tracked_detections").show(100, truncate=False)Spark 配置:
视频元数据提取:
视频帧提取:
人脸检测:
人脸追踪:
DetectionTracker Transformer:
本文档提供了一个使用 PySpark 并行处理视频文件的完整示例,包括视频元数据提取、视频帧提取、人脸检测和追踪。通过使用 Spark 的分布式计算能力,我们可以高效地处理大规模的视频数据,并进行各种视频分析任务。在实际应用中,可以根据具体需求调整代码,例如修改人脸检测算法、调整光流法参数等。
以上就是并行处理视频流:使用 PySpark 进行大规模视频分析的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号