Preprocesses a folder of video files into UUID-named clips suitable as target inputs for roop-unleashed-style face-swap. Counterpart to the faceset (source-side) tooling. work/video_target_pipeline.py — orchestration with subcommands scan / scenes / stage / merge / track / score / cut / report. Quality gates default to face-sets-can-handle-side-profile values (yaw<=75°, pitch<=45°, face_short>=80px, det>=0.5). Cross-track segment merge fuses adjacent-in-time tracks within the same scene up to 2s gap. Output organized into <output_dir>/<source_stem>/<uuid>.mp4 + <uuid>.json sidecar with full provenance. work/video_face_worker.py — Windows DML face detect+embed worker. Uses JSONL append-only for results.jsonl: a critical perf fix (re- serializing the monolithic 245MB results.json on every flush was the dominant cost in the first attempt, dropping throughput to 0.5 fps). Append-only got it to 13+ fps, ~7.5 fps cumulative across the first 6.18h batch. Also uses seek-once-per-video + sequential cap.grab() between samples to dodge cv2 per-sample seek pathology on long H.264. Legacy results.json is auto-migrated to .jsonl on first load. work/run_video_pipeline.sh — generic chain driver, parameterized via WORK / INPUT_DIR / OUTPUT_DIR / FILTER_FROM / SKIP_PATTERN / MAX_DUR / IDENTITY env vars. work/status_video_pipeline.sh — generic status helper. First production batch (ct_src_00050..00062, 13 files, 6.18h input): 600 emitted segments, 239.5min accepted content (64.6% of input), 254 segments built from >=2 tracks (cross-track merge), 1h43m wall clock. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
275 lines
10 KiB
Python
275 lines
10 KiB
Python
"""Windows / DirectML video frame face worker.
|
|
|
|
Reads a queue.json from /opt/face-sets/work/video_target_pipeline.py:stage
|
|
(WSL side), each entry: {video_path, win_video_path, frame_idx, time_s,
|
|
queue_id}. Decodes frame N from the video, runs insightface FaceAnalysis,
|
|
emits per-face records (bbox, det_score, pose, embedding, face_short).
|
|
|
|
CLI:
|
|
py -3.12 video_face_worker.py <queue.json> <out_results.json> [--limit N]
|
|
|
|
Resumable: existing entries in out_results.json with the same queue_id are
|
|
skipped.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
import cv2
|
|
from insightface.app import FaceAnalysis
|
|
|
|
MODEL_ROOT = r"C:\face_embed_venv\models"
|
|
MIN_DET = 0.5
|
|
MIN_FACE_PIX = 40
|
|
FLUSH_EVERY = 100
|
|
|
|
|
|
def jsonl_path_for(out_path: Path) -> Path:
|
|
"""Sister JSONL file: one result-record per line, append-only."""
|
|
return out_path.with_suffix(".jsonl")
|
|
|
|
|
|
def load_existing(out_path: Path):
|
|
"""Load existing results from .jsonl (preferred) or legacy .json (one-time conversion).
|
|
Returns (records_list, processed_set)."""
|
|
jsonl = jsonl_path_for(out_path)
|
|
if jsonl.exists():
|
|
records = []
|
|
processed = set()
|
|
with open(jsonl) as f:
|
|
for line_num, line in enumerate(f, 1):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
r = json.loads(line)
|
|
records.append(r)
|
|
if r.get("queue_id"):
|
|
processed.add(r["queue_id"])
|
|
except json.JSONDecodeError:
|
|
print(f"[warn] {jsonl}:{line_num} corrupt; skipping", file=sys.stderr)
|
|
return records, processed
|
|
# legacy JSON support: load once, convert to JSONL
|
|
if out_path.exists():
|
|
try:
|
|
d = json.loads(out_path.read_text())
|
|
records = d.get("results", [])
|
|
processed = set(d.get("processed", []))
|
|
print(f"[migrate] converting {len(records)} legacy JSON records to JSONL", file=sys.stderr)
|
|
with open(jsonl, "w") as f:
|
|
for r in records:
|
|
f.write(json.dumps(r) + "\n")
|
|
return records, processed
|
|
except Exception as e:
|
|
print(f"[warn] could not parse {out_path}: {e}; starting fresh", file=sys.stderr)
|
|
return [], set()
|
|
|
|
|
|
def append_records(out_path: Path, new_records: list):
|
|
"""Append-only write to the sister .jsonl file. No re-serialization of prior records."""
|
|
if not new_records:
|
|
return
|
|
jsonl = jsonl_path_for(out_path)
|
|
with open(jsonl, "a") as f:
|
|
for r in new_records:
|
|
f.write(json.dumps(r) + "\n")
|
|
|
|
|
|
def write_compat_summary(out_path: Path, total_records: int, processed: set):
|
|
"""Write a tiny JSON pointer file at the legacy out_path so older consumers
|
|
still see *something*, but the canonical store is the .jsonl. Cheap."""
|
|
summary = {
|
|
"_format": "jsonl-pointer",
|
|
"_jsonl": str(jsonl_path_for(out_path).name),
|
|
"results_count": total_records,
|
|
"processed_count": len(processed),
|
|
}
|
|
tmp = out_path.with_suffix(".tmp.json")
|
|
tmp.write_text(json.dumps(summary, indent=2))
|
|
os.replace(tmp, out_path)
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("queue", type=Path)
|
|
ap.add_argument("out", type=Path)
|
|
ap.add_argument("--limit", type=int, default=None)
|
|
args = ap.parse_args()
|
|
|
|
queue = json.loads(args.queue.read_text())
|
|
print(f"[queue] {len(queue)} entries from {args.queue}", flush=True)
|
|
args.out.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
results, processed = load_existing(args.out)
|
|
if processed:
|
|
print(f"[resume] {len(processed)} already scored", flush=True)
|
|
|
|
pending = [e for e in queue if e["queue_id"] not in processed]
|
|
if args.limit is not None:
|
|
pending = pending[: args.limit]
|
|
print(f"[pending] {len(pending)} entries", flush=True)
|
|
if not pending:
|
|
print("[done] nothing to do")
|
|
return
|
|
|
|
print("[load] FaceAnalysis with DmlExecutionProvider", flush=True)
|
|
app = FaceAnalysis(
|
|
name="buffalo_l",
|
|
root=MODEL_ROOT,
|
|
providers=["DmlExecutionProvider", "CPUExecutionProvider"],
|
|
)
|
|
app.prepare(ctx_id=0, det_size=(640, 640))
|
|
|
|
# group queue by video so we can keep one VideoCapture open and seek
|
|
from collections import defaultdict
|
|
by_video = defaultdict(list)
|
|
for e in pending:
|
|
by_video[e["win_video_path"]].append(e)
|
|
|
|
n_done = 0
|
|
n_load_err = 0
|
|
last_flush = time.time()
|
|
t_start = time.time()
|
|
new_buffer: list = []
|
|
|
|
def flush():
|
|
# append-only: only NEW records since last flush get written. O(new_records),
|
|
# not O(total_records). Was 11s/flush at 9k records; now <50ms.
|
|
if new_buffer:
|
|
append_records(args.out, new_buffer)
|
|
new_buffer.clear()
|
|
write_compat_summary(args.out, len(results), processed)
|
|
|
|
for vidpath, entries in by_video.items():
|
|
# entries are already sorted by frame_idx. Hybrid decode strategy:
|
|
# 1. Seek ONCE to the first pending target (cheap keyframe-seek).
|
|
# 2. Sequential cap.grab() between subsequent targets (decode without
|
|
# BGR conversion until we reach a target, then cap.retrieve()).
|
|
# This avoids per-sample seek cost (the original pathology that
|
|
# caused 1.4 fps deep in long videos) AND avoids grab-walking from
|
|
# frame 0 on resume (the over-correction that gave 0.08 fps).
|
|
entries.sort(key=lambda e: e["frame_idx"])
|
|
cap = cv2.VideoCapture(vidpath)
|
|
if not cap.isOpened():
|
|
print(f"[err] cannot open {vidpath}", flush=True)
|
|
for e in entries:
|
|
rec = {
|
|
"queue_id": e["queue_id"], "video_path": e["video_path"],
|
|
"frame_idx": e["frame_idx"], "time_s": e["time_s"],
|
|
"faces": [], "error": "cap_open",
|
|
}
|
|
results.append(rec); new_buffer.append(rec)
|
|
processed.add(e["queue_id"])
|
|
n_done += 1
|
|
n_load_err += 1
|
|
continue
|
|
first_target = entries[0]["frame_idx"]
|
|
if first_target > 0:
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, first_target)
|
|
cur_frame_idx = first_target - 1
|
|
else:
|
|
cur_frame_idx = -1
|
|
for e in entries:
|
|
target = e["frame_idx"]
|
|
if target < cur_frame_idx + 1:
|
|
# backward jump (only triggers for unsorted entries — defensive)
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, target)
|
|
cur_frame_idx = target - 1
|
|
# advance up to (but not including) target via grab()-only
|
|
ran_out = False
|
|
while cur_frame_idx + 1 < target:
|
|
ok = cap.grab()
|
|
if not ok:
|
|
ran_out = True
|
|
break
|
|
cur_frame_idx += 1
|
|
if not ran_out:
|
|
ok = cap.grab()
|
|
if not ok:
|
|
ran_out = True
|
|
else:
|
|
cur_frame_idx = target
|
|
if ran_out:
|
|
rec = {
|
|
"queue_id": e["queue_id"], "video_path": e["video_path"],
|
|
"frame_idx": e["frame_idx"], "time_s": e["time_s"],
|
|
"faces": [], "error": "frame_read",
|
|
}
|
|
results.append(rec); new_buffer.append(rec)
|
|
processed.add(e["queue_id"])
|
|
n_done += 1
|
|
n_load_err += 1
|
|
continue
|
|
ok, bgr = cap.retrieve()
|
|
if not ok or bgr is None:
|
|
rec = {
|
|
"queue_id": e["queue_id"], "video_path": e["video_path"],
|
|
"frame_idx": e["frame_idx"], "time_s": e["time_s"],
|
|
"faces": [], "error": "frame_read",
|
|
}
|
|
results.append(rec); new_buffer.append(rec)
|
|
processed.add(e["queue_id"])
|
|
n_done += 1
|
|
n_load_err += 1
|
|
continue
|
|
|
|
faces = app.get(bgr)
|
|
kept_faces = []
|
|
H, W = bgr.shape[:2]
|
|
for f in faces:
|
|
if float(f.det_score) < MIN_DET:
|
|
continue
|
|
x1, y1, x2, y2 = [int(round(v)) for v in f.bbox]
|
|
x1 = max(x1, 0); y1 = max(y1, 0)
|
|
x2 = min(x2, W); y2 = min(y2, H)
|
|
w, h = x2 - x1, y2 - y1
|
|
short = min(w, h)
|
|
if short < MIN_FACE_PIX:
|
|
continue
|
|
rec = {
|
|
"bbox": [x1, y1, x2, y2],
|
|
"det_score": float(f.det_score),
|
|
"face_short": int(short),
|
|
}
|
|
if hasattr(f, "pose") and f.pose is not None:
|
|
rec["pose"] = [float(x) for x in f.pose] # pitch, yaw, roll
|
|
if hasattr(f, "normed_embedding") and f.normed_embedding is not None:
|
|
rec["embedding"] = f.normed_embedding.astype(np.float32).tolist()
|
|
kept_faces.append(rec)
|
|
|
|
rec = {
|
|
"queue_id": e["queue_id"], "video_path": e["video_path"],
|
|
"frame_idx": e["frame_idx"], "time_s": e["time_s"],
|
|
"frame_w": W, "frame_h": H,
|
|
"faces": kept_faces,
|
|
}
|
|
results.append(rec); new_buffer.append(rec)
|
|
processed.add(e["queue_id"])
|
|
n_done += 1
|
|
|
|
if (n_done % FLUSH_EVERY == 0) or (time.time() - last_flush) > 30.0:
|
|
flush()
|
|
last_flush = time.time()
|
|
el = time.time() - t_start
|
|
rate = n_done / max(0.1, el)
|
|
eta = (len(pending) - n_done) / max(0.1, rate) / 60.0
|
|
print(f"[scan] {n_done}/{len(pending)} rate={rate:.2f} fps eta={eta:.1f}min "
|
|
f"errs={n_load_err}", flush=True)
|
|
cap.release()
|
|
|
|
flush()
|
|
el = time.time() - t_start
|
|
print(f"[done] {n_done} scored, {n_load_err} errors, {el:.1f}s "
|
|
f"({n_done/max(0.1,el):.2f} fps) -> {args.out}", flush=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|