Add target-side video preprocessing pipeline

Preprocesses a folder of video files into UUID-named clips suitable as
target inputs for roop-unleashed-style face-swap. Counterpart to the
faceset (source-side) tooling.

work/video_target_pipeline.py — orchestration with subcommands
  scan / scenes / stage / merge / track / score / cut / report. Quality
  gates default to face-sets-can-handle-side-profile values (yaw<=75°,
  pitch<=45°, face_short>=80px, det>=0.5). Cross-track segment merge
  fuses adjacent-in-time tracks within the same scene up to 2s gap.
  Output organized into <output_dir>/<source_stem>/<uuid>.mp4 +
  <uuid>.json sidecar with full provenance.

work/video_face_worker.py — Windows DML face detect+embed worker. Uses
  JSONL append-only for results.jsonl: a critical perf fix (re-
  serializing the monolithic 245MB results.json on every flush was the
  dominant cost in the first attempt, dropping throughput to 0.5 fps).
  Append-only got it to 13+ fps, ~7.5 fps cumulative across the first
  6.18h batch. Also uses seek-once-per-video + sequential cap.grab()
  between samples to dodge cv2 per-sample seek pathology on long H.264.
  Legacy results.json is auto-migrated to .jsonl on first load.

work/run_video_pipeline.sh — generic chain driver, parameterized via
  WORK / INPUT_DIR / OUTPUT_DIR / FILTER_FROM / SKIP_PATTERN / MAX_DUR /
  IDENTITY env vars. work/status_video_pipeline.sh — generic status
  helper.

First production batch (ct_src_00050..00062, 13 files, 6.18h input):
600 emitted segments, 239.5min accepted content (64.6% of input), 254
segments built from >=2 tracks (cross-track merge), 1h43m wall clock.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-27 21:38:50 +02:00
parent 49a43c7685
commit 998fa79f81
6 changed files with 1480 additions and 0 deletions

274
work/video_face_worker.py Normal file
View File

@@ -0,0 +1,274 @@
"""Windows / DirectML video frame face worker.
Reads a queue.json from /opt/face-sets/work/video_target_pipeline.py:stage
(WSL side), each entry: {video_path, win_video_path, frame_idx, time_s,
queue_id}. Decodes frame N from the video, runs insightface FaceAnalysis,
emits per-face records (bbox, det_score, pose, embedding, face_short).
CLI:
py -3.12 video_face_worker.py <queue.json> <out_results.json> [--limit N]
Resumable: existing entries in out_results.json with the same queue_id are
skipped.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from pathlib import Path
import numpy as np
import cv2
from insightface.app import FaceAnalysis
MODEL_ROOT = r"C:\face_embed_venv\models"
MIN_DET = 0.5
MIN_FACE_PIX = 40
FLUSH_EVERY = 100
def jsonl_path_for(out_path: Path) -> Path:
"""Sister JSONL file: one result-record per line, append-only."""
return out_path.with_suffix(".jsonl")
def load_existing(out_path: Path):
"""Load existing results from .jsonl (preferred) or legacy .json (one-time conversion).
Returns (records_list, processed_set)."""
jsonl = jsonl_path_for(out_path)
if jsonl.exists():
records = []
processed = set()
with open(jsonl) as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
r = json.loads(line)
records.append(r)
if r.get("queue_id"):
processed.add(r["queue_id"])
except json.JSONDecodeError:
print(f"[warn] {jsonl}:{line_num} corrupt; skipping", file=sys.stderr)
return records, processed
# legacy JSON support: load once, convert to JSONL
if out_path.exists():
try:
d = json.loads(out_path.read_text())
records = d.get("results", [])
processed = set(d.get("processed", []))
print(f"[migrate] converting {len(records)} legacy JSON records to JSONL", file=sys.stderr)
with open(jsonl, "w") as f:
for r in records:
f.write(json.dumps(r) + "\n")
return records, processed
except Exception as e:
print(f"[warn] could not parse {out_path}: {e}; starting fresh", file=sys.stderr)
return [], set()
def append_records(out_path: Path, new_records: list):
"""Append-only write to the sister .jsonl file. No re-serialization of prior records."""
if not new_records:
return
jsonl = jsonl_path_for(out_path)
with open(jsonl, "a") as f:
for r in new_records:
f.write(json.dumps(r) + "\n")
def write_compat_summary(out_path: Path, total_records: int, processed: set):
"""Write a tiny JSON pointer file at the legacy out_path so older consumers
still see *something*, but the canonical store is the .jsonl. Cheap."""
summary = {
"_format": "jsonl-pointer",
"_jsonl": str(jsonl_path_for(out_path).name),
"results_count": total_records,
"processed_count": len(processed),
}
tmp = out_path.with_suffix(".tmp.json")
tmp.write_text(json.dumps(summary, indent=2))
os.replace(tmp, out_path)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("queue", type=Path)
ap.add_argument("out", type=Path)
ap.add_argument("--limit", type=int, default=None)
args = ap.parse_args()
queue = json.loads(args.queue.read_text())
print(f"[queue] {len(queue)} entries from {args.queue}", flush=True)
args.out.parent.mkdir(parents=True, exist_ok=True)
results, processed = load_existing(args.out)
if processed:
print(f"[resume] {len(processed)} already scored", flush=True)
pending = [e for e in queue if e["queue_id"] not in processed]
if args.limit is not None:
pending = pending[: args.limit]
print(f"[pending] {len(pending)} entries", flush=True)
if not pending:
print("[done] nothing to do")
return
print("[load] FaceAnalysis with DmlExecutionProvider", flush=True)
app = FaceAnalysis(
name="buffalo_l",
root=MODEL_ROOT,
providers=["DmlExecutionProvider", "CPUExecutionProvider"],
)
app.prepare(ctx_id=0, det_size=(640, 640))
# group queue by video so we can keep one VideoCapture open and seek
from collections import defaultdict
by_video = defaultdict(list)
for e in pending:
by_video[e["win_video_path"]].append(e)
n_done = 0
n_load_err = 0
last_flush = time.time()
t_start = time.time()
new_buffer: list = []
def flush():
# append-only: only NEW records since last flush get written. O(new_records),
# not O(total_records). Was 11s/flush at 9k records; now <50ms.
if new_buffer:
append_records(args.out, new_buffer)
new_buffer.clear()
write_compat_summary(args.out, len(results), processed)
for vidpath, entries in by_video.items():
# entries are already sorted by frame_idx. Hybrid decode strategy:
# 1. Seek ONCE to the first pending target (cheap keyframe-seek).
# 2. Sequential cap.grab() between subsequent targets (decode without
# BGR conversion until we reach a target, then cap.retrieve()).
# This avoids per-sample seek cost (the original pathology that
# caused 1.4 fps deep in long videos) AND avoids grab-walking from
# frame 0 on resume (the over-correction that gave 0.08 fps).
entries.sort(key=lambda e: e["frame_idx"])
cap = cv2.VideoCapture(vidpath)
if not cap.isOpened():
print(f"[err] cannot open {vidpath}", flush=True)
for e in entries:
rec = {
"queue_id": e["queue_id"], "video_path": e["video_path"],
"frame_idx": e["frame_idx"], "time_s": e["time_s"],
"faces": [], "error": "cap_open",
}
results.append(rec); new_buffer.append(rec)
processed.add(e["queue_id"])
n_done += 1
n_load_err += 1
continue
first_target = entries[0]["frame_idx"]
if first_target > 0:
cap.set(cv2.CAP_PROP_POS_FRAMES, first_target)
cur_frame_idx = first_target - 1
else:
cur_frame_idx = -1
for e in entries:
target = e["frame_idx"]
if target < cur_frame_idx + 1:
# backward jump (only triggers for unsorted entries — defensive)
cap.set(cv2.CAP_PROP_POS_FRAMES, target)
cur_frame_idx = target - 1
# advance up to (but not including) target via grab()-only
ran_out = False
while cur_frame_idx + 1 < target:
ok = cap.grab()
if not ok:
ran_out = True
break
cur_frame_idx += 1
if not ran_out:
ok = cap.grab()
if not ok:
ran_out = True
else:
cur_frame_idx = target
if ran_out:
rec = {
"queue_id": e["queue_id"], "video_path": e["video_path"],
"frame_idx": e["frame_idx"], "time_s": e["time_s"],
"faces": [], "error": "frame_read",
}
results.append(rec); new_buffer.append(rec)
processed.add(e["queue_id"])
n_done += 1
n_load_err += 1
continue
ok, bgr = cap.retrieve()
if not ok or bgr is None:
rec = {
"queue_id": e["queue_id"], "video_path": e["video_path"],
"frame_idx": e["frame_idx"], "time_s": e["time_s"],
"faces": [], "error": "frame_read",
}
results.append(rec); new_buffer.append(rec)
processed.add(e["queue_id"])
n_done += 1
n_load_err += 1
continue
faces = app.get(bgr)
kept_faces = []
H, W = bgr.shape[:2]
for f in faces:
if float(f.det_score) < MIN_DET:
continue
x1, y1, x2, y2 = [int(round(v)) for v in f.bbox]
x1 = max(x1, 0); y1 = max(y1, 0)
x2 = min(x2, W); y2 = min(y2, H)
w, h = x2 - x1, y2 - y1
short = min(w, h)
if short < MIN_FACE_PIX:
continue
rec = {
"bbox": [x1, y1, x2, y2],
"det_score": float(f.det_score),
"face_short": int(short),
}
if hasattr(f, "pose") and f.pose is not None:
rec["pose"] = [float(x) for x in f.pose] # pitch, yaw, roll
if hasattr(f, "normed_embedding") and f.normed_embedding is not None:
rec["embedding"] = f.normed_embedding.astype(np.float32).tolist()
kept_faces.append(rec)
rec = {
"queue_id": e["queue_id"], "video_path": e["video_path"],
"frame_idx": e["frame_idx"], "time_s": e["time_s"],
"frame_w": W, "frame_h": H,
"faces": kept_faces,
}
results.append(rec); new_buffer.append(rec)
processed.add(e["queue_id"])
n_done += 1
if (n_done % FLUSH_EVERY == 0) or (time.time() - last_flush) > 30.0:
flush()
last_flush = time.time()
el = time.time() - t_start
rate = n_done / max(0.1, el)
eta = (len(pending) - n_done) / max(0.1, rate) / 60.0
print(f"[scan] {n_done}/{len(pending)} rate={rate:.2f} fps eta={eta:.1f}min "
f"errs={n_load_err}", flush=True)
cap.release()
flush()
el = time.time() - t_start
print(f"[done] {n_done} scored, {n_load_err} errors, {el:.1f}s "
f"({n_done/max(0.1,el):.2f} fps) -> {args.out}", flush=True)
if __name__ == "__main__":
main()