diff --git a/README.md b/README.md index 2edcb2e..d77b6f5 100644 --- a/README.md +++ b/README.md @@ -343,6 +343,7 @@ clean it up over time: | `work/consolidate_facesets.py` | Merge duplicate identities (centroid cosine sim ≥ 0.55 with confident ≥ 0.65, **complete-linkage** to defeat single-link chaining). Pulls embeddings from cache, no GPU. See `docs/analysis/identity-consolidation-and-age-extend.md`. | | `work/age_extend_001.py` | Slot newly-added PNGs into existing era buckets of `faceset_001` (anchor cosine distance ≤ 0.40 AND `|year_delta|` ≤ 5). Same anchor-fragment rule as `age_split_001.py`. | | `work/dedup_optimize.py` (+ Windows `work/multiface_worker.py`) | (a) cross-family SHA256 byte-dedup, (b) within-faceset near-dup at cosine sim ≥ 0.95, (c) multi-face audit (re-detect via insightface, drop PNGs with face_count ≠ 1). Multi-face is the load-bearing roop invariant. See `docs/analysis/dedup-and-roop-optimization.md`. | +| `work/video_target_pipeline.py` (+ Windows `work/video_face_worker.py` + `work/run_video_pipeline.sh` chain) | Target-side preprocessing: scan a folder of videos → PySceneDetect shot-cuts → 2 fps frame sampling → DML face detection + embedding → IoU+embedding tracking → quality-gated segments (yaw≤75°, face≥80px, det≥0.5, ≥70% pass-rate, 1–120s duration, 2s cross-track merge gap) → ffmpeg stream-copy into UUID-named clips with sidecar JSON. Output organized into per-source subfolders. See `docs/analysis/video-target-preprocessing.md`. | All four operate idempotently and reversibly: dropped PNGs go to `/faces/_dropped/`, quarantined whole facesets go to @@ -382,6 +383,10 @@ Highly recommended at swap time: enable **Select post-processing = GFPGAN** with ├─ consolidate_facesets.py (duplicate-identity merger; complete-linkage) ├─ dedup_optimize.py (byte + near-dup + multi-face audit driver) ├─ multiface_worker.py (Windows DML multi-face audit worker) + ├─ video_target_pipeline.py (video → swappable segment cuts orchestration) + ├─ video_face_worker.py (Windows DML per-frame face worker; JSONL append-only) + ├─ run_video_pipeline.sh (generic chain driver: scenes → stage → worker → cut) + ├─ status_video_pipeline.sh (status helper for any video_pipeline log) ├─ synthetic_*_manifest.json (per-run synthetic refine manifests) ├─ immich/ │ ├─ users.json (label -> userId map; gitignored) diff --git a/docs/analysis/video-target-preprocessing.md b/docs/analysis/video-target-preprocessing.md new file mode 100644 index 0000000..ac914f5 --- /dev/null +++ b/docs/analysis/video-target-preprocessing.md @@ -0,0 +1,129 @@ +# Video target preprocessing for roop-unleashed + +_Initial design + first batch run: 2026-04-27. Driver scripts: `work/video_target_pipeline.py`, `work/video_face_worker.py`, `work/run_video_pipeline.sh`._ + +Companion to the face-set side of the project: instead of building per-identity .fsz bundles for the *source* of a swap, this pipeline preprocesses the *target* (videos to swap into). Given a folder of video files, it identifies "swappable" segments — continuous shots where a face is detectable, sufficiently visible, and roughly within inswapper_128's working envelope — and cuts them into UUID-named clips ready to feed into roop-unleashed. + +## 1. Why build it + +I checked the obvious open-source projects for an existing implementation: + +- **FaceFusion** ([github.com/facefusion/facefusion](https://github.com/facefusion/facefusion)) — CLI has `run`, `headless-run`, `batch-run`, `job-*`, `force-download`, `benchmark`. No scene-detection or clip-extraction subcommand. Its own guides recommend "split your video manually first." +- **roop-unleashed** at `/opt/roop-unleashed/roop/util_ffmpeg.py` — has `cut_video(start_frame, end_frame)` for a manual GUI trim, no detection-driven segmentation. +- **Deep-Live-Cam** ([github.com/hacksider/Deep-Live-Cam](https://github.com/hacksider/Deep-Live-Cam)) — real-time / single-shot, no batch preprocessing. +- **DeepFaceLab** — `extract_video.bat` dumps every frame between user-supplied trim points; no quality gating. + +Closest prior art for the cut-detection pattern is the two-stage hybrid in [SportSBD MMSys'26](https://dl.acm.org/doi/10.1145/3793853.3799803) (cheap detector for cuts, accurate net for verification), but the actual implementation has to be ours. + +## 2. Pipeline architecture + +``` +WSL /opt/face-sets/work/ Windows C:\face_embed_venv\ +───────────────────────────────────── ───────────────────────────── +run_video_pipeline.sh (chain driver) + │ + ├─ scan (ffprobe metadata) + ├─ scenes (PySceneDetect AdaptiveDetector, CPU) + ├─ stage (sampled frame queue.json @ 2 fps) + │ │ + │ ▼ + │ video_face_worker.py + │ insightface FaceAnalysis + │ on DmlExecutionProvider + │ output: results.jsonl + ├─ merge (ingest results.jsonl) + ├─ track (IoU + embedding stitching, ~30 LOC) + ├─ score (track-level quality gate + cross-track merge) + ├─ cut (ffmpeg -c copy → per-source subfolders) + └─ report (HTML preview) + + Output: //.mp4 + /.json (sidecar) +``` + +`run_video_pipeline.sh` is parameterized via env vars (`WORK`, `INPUT_DIR`, `OUTPUT_DIR`, `FILTER_FROM`, `SKIP_PATTERN`, `MAX_DUR`, `IDENTITY`) so you can pin a particular batch without editing the script. + +## 3. Quality signals (matched to inswapper_128's working envelope) + +inswapper_128 is trained near-frontal at 128×128. The score gate uses defaults that admit side profiles (since rich face-sets can absorb non-frontal swap targets): + +| signal | threshold | rationale | +|--------|----------:|-----------| +| `|yaw|` | ≤ 75° | covers full 3/4 + side profile | +| `|pitch|` | ≤ 45° | covers extreme up/down looks | +| `face_short` | ≥ 80 px | inswapper resamples to 128; ≥80 still produces clean output | +| `det_score` | ≥ 0.5 | matches buffalo_l's MIN_DET; lower = unreliable detection | +| track-gate | ≥ 70 % frames pass | binary track filter rather than per-frame | +| duration | 1 s ≤ dur ≤ 120 s | below 1s = unusable slivers; above 120s probably contains a missed micro-cut | + +Plus two segment-merging knobs: +- `--bridge-gap` (default 3 s) — within a single track, brief pose-failure gaps shorter than this get bridged so single bad frames don't fragment a good run +- `--merge-gap` (default 2 s) — across tracks within the same scene, segments closer than this get fused (cross-track merge fires when face detection briefly fails between adjacent good runs) + +The defaults can be tightened (e.g. `--max-yaw 25` for portrait-only) or loosened (e.g. `--max-yaw 90 --merge-gap 5`) without re-running detection — `score` reads the existing `tracks.json`. + +## 4. Performance + the JSONL append-only fix + +This is where the engineering interest is. The first production run on 13 videos / 6.18 h of input went through three failure modes before settling at production speed: + +| attempt | issue | rate observed | +|---|---|---:| +| 1. Original `cap.set(POS_FRAMES, N)` per sample | OpenCV seeks to nearest keyframe + decodes forward at every sample. Cost grows with depth into the video; on a 60-min H.264 it falls off a cliff. | 1.4 fps → degrading | +| 2. Sequential `cap.grab()` from frame 0 | On resume, grab-walking from frame 0 to a deep target is unbounded. | 0.08 fps | +| 3. Hybrid: seek-once-per-video + sequential within | Better in principle. But hit a different bug: `flush()` was re-serializing the entire `results.json` (245 MB at this point) every 100 frames or 30 sec. Save dominated wall-clock. | 0.5 fps | +| 4. **JSONL append-only** | One result per line. Each flush is O(new records), not O(total records). | **13.77 fps** smoke / 7.57 fps cumulative across the full batch | + +Lesson: when the output is large + grows monotonically + needs frequent checkpointing, *do not* re-serialize the whole structure on each flush. Append-only line-delimited JSON is the right tool. The legacy `results.json` is auto-converted to `.jsonl` on first load (one-time migration), so resumes survive the format switch. + +## 5. Hardware decode/encode on AMD Vega + WSL + +Skipped. Per [Microsoft's WSL D3D12 video acceleration post](https://devblogs.microsoft.com/commandline/d3d12-gpu-video-acceleration-in-the-windows-subsystem-for-linux-now-available/), VAAPI-via-Mesa-D3D12 exists but is fragile on older AMD. AMF on Windows would mean a Windows-side ffmpeg leg, doubling boundary crossings. CPU software decode of 1280×720 H.264 in WSL ffmpeg is faster than realtime, and the bottleneck is buffalo_l detection on DML, not decode. + +For cutting we use `-c copy` stream-copy — no re-encode, hardware codecs are moot. + +## 6. First batch run results (ct_src_00050..00062) + +| | | +|---|---:| +| input videos | 13 | +| input duration | 6.18 h | +| sampled frames | 44,635 (@ 2 fps) | +| accepted tracks | 1,193 / 2,564 (47 %) | +| **emitted segments** | **600** | +| segments built from ≥2 tracks (cross-track merge fired) | 254 | +| accepted content total | 239.5 min (64.6 % of input) | +| segment duration min/median/mean/max | 1 / 12 / 24 / 119 s | +| output size | 3.63 GB | + +Phase timings: +- scenes: 25 min (cached on later runs) +- stage: instant +- worker: 78 min @ ~7.5 fps cumulative +- merge: 73 s +- track: 77 s +- score: 21 s +- cut (600 ffmpeg stream-copies): 19 min +- report (600 thumbs + HTML): 3 min +- **total wall-clock: 1h43m** + +## 7. Re-running + +```bash +# choose a per-batch workdir + log +WORK=/opt/face-sets/work/video_preprocess_ \ + FILTER_FROM=ct_src_00050.mp4 \ + bash work/run_video_pipeline.sh > work/logs/video_run_.log 2>&1 & + +# check status anytime +bash work/status_video_pipeline.sh work/logs/video_run_.log +``` + +Skip patterns can exclude already-processed inputs: + +```bash +SKIP_PATTERN='^ct_src_(0001[015]|005[0-9]|006[0-9])\.mp4$' \ + WORK=/opt/face-sets/work/video_preprocess_rest \ + bash work/run_video_pipeline.sh > work/logs/video_run_rest.log 2>&1 & +``` + +`scenes` outputs are cached in the batch's `WORK/scenes/` dir, so re-running the chain after an edit-to-score step doesn't redo detection. The worker is also resumable per `queue_id` — if killed mid-flight, just relaunch. diff --git a/work/run_video_pipeline.sh b/work/run_video_pipeline.sh new file mode 100755 index 0000000..ec4cc86 --- /dev/null +++ b/work/run_video_pipeline.sh @@ -0,0 +1,123 @@ +#!/bin/bash +# Generic chain driver for the video target preprocessing pipeline. +# +# Usage: +# WORK=/path/to/workdir SKIP_PATTERN='ct_src_(0001[015]|005[0-9]|006[0-9])\.mp4' \ +# bash run_video_pipeline.sh > /opt/face-sets/work/logs/.log 2>&1 +# +# Required env vars: +# WORK per-batch workdir (will hold scenes/, queue.json, results.jsonl, plan.json, review/) +# +# Optional env vars: +# INPUT_DIR default /mnt/x/src/vd +# OUTPUT_DIR default /mnt/x/src/vd/ct +# FILTER_FROM basename floor; only files with name >= this go in (e.g. ct_src_00050.mp4) +# SKIP_PATTERN regex of basenames to exclude (Python `re` syntax). Applied AFTER FILTER_FROM. +# MAX_DUR score --max-dur (default 120) +# IDENTITY "yes" to enable identity tagging; default "no" + +set -e + +: ${WORK:?WORK env var must point at a workdir} +: ${INPUT_DIR:=/mnt/x/src/vd} +: ${OUTPUT_DIR:=/mnt/x/src/vd/ct} +: ${MAX_DUR:=120} +: ${IDENTITY:=no} + +mkdir -p "$WORK" "$WORK/scenes" + +PY_WSL=/home/peter/face_sort_env/bin/python +PY_WIN="/mnt/c/face_embed_venv/Scripts/python.exe" +PIPELINE=/opt/face-sets/work/video_target_pipeline.py +WORKER=/opt/face-sets/work/video_face_worker.py +INVENTORY_FULL=/opt/face-sets/work/video_preprocess/inventory_full.json + +ts() { date +"%Y-%m-%d %H:%M:%S"; } +log() { echo "[$(ts)] [$PHASE] $*"; } + +PHASE="setup" +log "STARTED — host=$(hostname) pid=$$ work=$WORK" +log "config: input=$INPUT_DIR output=$OUTPUT_DIR filter_from=${FILTER_FROM:-} skip_pattern=${SKIP_PATTERN:-} max_dur=$MAX_DUR identity=$IDENTITY" + +PHASE="inventory" +log "building subset inventory" +T0=$(date +%s) +# rebuild full inventory if missing +if [ ! -f "$INVENTORY_FULL" ]; then + log "(no full inventory cached — running fresh scan)" + $PY_WSL $PIPELINE scan --input "$INPUT_DIR" --output-dir "$OUTPUT_DIR" --out "$INVENTORY_FULL" +fi +$PY_WSL <= filter_from] +if skip_pat: + pat = re.compile(skip_pat) + subset = [v for v in subset if not pat.search(Path(v['path']).name)] +subset.sort(key=lambda v: v['path']) +inv['videos'] = subset +json.dump(inv, open('$WORK/inventory.json','w'), indent=2) +total_dur = sum(v.get('duration_s', 0) for v in inv['videos'] if 'error' not in v) +print(f' {len(inv["videos"])} videos, total {total_dur/3600:.2f}h input') +EOF +log "done in $(($(date +%s)-T0))s" + +PHASE="scenes" +log "PySceneDetect AdaptiveDetector across all videos (cached entries skipped)" +T0=$(date +%s) +$PY_WSL $PIPELINE scenes --inventory "$WORK/inventory.json" --out-dir "$WORK/scenes" +log "done in $(($(date +%s)-T0))s" + +PHASE="stage" +log "building frame queue @ 2 fps within scenes" +T0=$(date +%s) +$PY_WSL $PIPELINE stage --inventory "$WORK/inventory.json" --scenes-dir "$WORK/scenes" --out "$WORK/queue.json" +log "done in $(($(date +%s)-T0))s" + +PHASE="worker" +log "Windows DML face detect+embed (resumable; the slow one)" +T0=$(date +%s) +$PY_WIN $WORKER "$WORK/queue.json" "$WORK/results.json" +log "done in $(($(date +%s)-T0))s" + +PHASE="merge" +log "ingesting worker output (jsonl)" +T0=$(date +%s) +$PY_WSL $PIPELINE merge --results "$WORK/results.json" --out "$WORK/frames.json" +log "done in $(($(date +%s)-T0))s" + +PHASE="track" +log "stitching detections into tracks" +T0=$(date +%s) +$PY_WSL $PIPELINE track --frames "$WORK/frames.json" --scenes-dir "$WORK/scenes" \ + --inventory "$WORK/inventory.json" --out "$WORK/tracks.json" +log "done in $(($(date +%s)-T0))s" + +PHASE="score" +log "scoring with relaxed gates + max-dur=$MAX_DUR identity=$IDENTITY" +T0=$(date +%s) +ID_FLAG="" +if [ "$IDENTITY" != "yes" ]; then ID_FLAG="--no-identity"; fi +$PY_WSL $PIPELINE score --tracks "$WORK/tracks.json" --inventory "$WORK/inventory.json" \ + --out "$WORK/plan.json" --max-dur "$MAX_DUR" $ID_FLAG +log "done in $(($(date +%s)-T0))s" + +PHASE="cut" +log "ffmpeg stream-copy into per-source subfolders (no --clean)" +T0=$(date +%s) +$PY_WSL $PIPELINE cut --plan "$WORK/plan.json" --output-dir "$OUTPUT_DIR" +log "done in $(($(date +%s)-T0))s" + +PHASE="report" +log "rendering HTML" +T0=$(date +%s) +$PY_WSL $PIPELINE report --plan "$WORK/plan.json" --output-dir "$OUTPUT_DIR" --out "$WORK/review" +log "done in $(($(date +%s)-T0))s" + +PHASE="done" +log "PIPELINE COMPLETE — review at file://$WORK/review/index.html" diff --git a/work/status_video_pipeline.sh b/work/status_video_pipeline.sh new file mode 100755 index 0000000..b75e6ad --- /dev/null +++ b/work/status_video_pipeline.sh @@ -0,0 +1,32 @@ +#!/bin/bash +# Generic status helper for run_video_pipeline.sh. +# Usage: bash status_video_pipeline.sh +# Defaults to /opt/face-sets/work/logs/video_run.log if no arg. + +LOG="${1:-/opt/face-sets/work/logs/video_run.log}" + +if [ ! -f "$LOG" ]; then + echo "no log at $LOG yet" + exit 0 +fi + +echo "=== last 8 log lines ===" +tail -8 "$LOG" +echo + +# worker progress +last=$(grep -E "^\[scan\] [0-9]+/[0-9]+" "$LOG" | tail -1) +if [ -n "$last" ]; then + echo "=== DML worker progress ===" + echo " $last" +fi + +# total elapsed +start_epoch=$(head -1 "$LOG" | sed 's/.*\[\(.*\)\].*\[setup\].*/\1/' | xargs -I{} date -d "{}" +%s 2>/dev/null) +now_epoch=$(date +%s) +if [ -n "$start_epoch" ] && [ "$start_epoch" != "" ] 2>/dev/null; then + elapsed=$((now_epoch - start_epoch)) + h=$((elapsed / 3600)) + m=$(( (elapsed % 3600) / 60 )) + echo " elapsed: ${h}h${m}m" +fi diff --git a/work/video_face_worker.py b/work/video_face_worker.py new file mode 100644 index 0000000..58a012b --- /dev/null +++ b/work/video_face_worker.py @@ -0,0 +1,274 @@ +"""Windows / DirectML video frame face worker. + +Reads a queue.json from /opt/face-sets/work/video_target_pipeline.py:stage +(WSL side), each entry: {video_path, win_video_path, frame_idx, time_s, +queue_id}. Decodes frame N from the video, runs insightface FaceAnalysis, +emits per-face records (bbox, det_score, pose, embedding, face_short). + +CLI: + py -3.12 video_face_worker.py [--limit N] + +Resumable: existing entries in out_results.json with the same queue_id are +skipped. +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import time +from pathlib import Path + +import numpy as np +import cv2 +from insightface.app import FaceAnalysis + +MODEL_ROOT = r"C:\face_embed_venv\models" +MIN_DET = 0.5 +MIN_FACE_PIX = 40 +FLUSH_EVERY = 100 + + +def jsonl_path_for(out_path: Path) -> Path: + """Sister JSONL file: one result-record per line, append-only.""" + return out_path.with_suffix(".jsonl") + + +def load_existing(out_path: Path): + """Load existing results from .jsonl (preferred) or legacy .json (one-time conversion). + Returns (records_list, processed_set).""" + jsonl = jsonl_path_for(out_path) + if jsonl.exists(): + records = [] + processed = set() + with open(jsonl) as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + if not line: + continue + try: + r = json.loads(line) + records.append(r) + if r.get("queue_id"): + processed.add(r["queue_id"]) + except json.JSONDecodeError: + print(f"[warn] {jsonl}:{line_num} corrupt; skipping", file=sys.stderr) + return records, processed + # legacy JSON support: load once, convert to JSONL + if out_path.exists(): + try: + d = json.loads(out_path.read_text()) + records = d.get("results", []) + processed = set(d.get("processed", [])) + print(f"[migrate] converting {len(records)} legacy JSON records to JSONL", file=sys.stderr) + with open(jsonl, "w") as f: + for r in records: + f.write(json.dumps(r) + "\n") + return records, processed + except Exception as e: + print(f"[warn] could not parse {out_path}: {e}; starting fresh", file=sys.stderr) + return [], set() + + +def append_records(out_path: Path, new_records: list): + """Append-only write to the sister .jsonl file. No re-serialization of prior records.""" + if not new_records: + return + jsonl = jsonl_path_for(out_path) + with open(jsonl, "a") as f: + for r in new_records: + f.write(json.dumps(r) + "\n") + + +def write_compat_summary(out_path: Path, total_records: int, processed: set): + """Write a tiny JSON pointer file at the legacy out_path so older consumers + still see *something*, but the canonical store is the .jsonl. Cheap.""" + summary = { + "_format": "jsonl-pointer", + "_jsonl": str(jsonl_path_for(out_path).name), + "results_count": total_records, + "processed_count": len(processed), + } + tmp = out_path.with_suffix(".tmp.json") + tmp.write_text(json.dumps(summary, indent=2)) + os.replace(tmp, out_path) + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("queue", type=Path) + ap.add_argument("out", type=Path) + ap.add_argument("--limit", type=int, default=None) + args = ap.parse_args() + + queue = json.loads(args.queue.read_text()) + print(f"[queue] {len(queue)} entries from {args.queue}", flush=True) + args.out.parent.mkdir(parents=True, exist_ok=True) + + results, processed = load_existing(args.out) + if processed: + print(f"[resume] {len(processed)} already scored", flush=True) + + pending = [e for e in queue if e["queue_id"] not in processed] + if args.limit is not None: + pending = pending[: args.limit] + print(f"[pending] {len(pending)} entries", flush=True) + if not pending: + print("[done] nothing to do") + return + + print("[load] FaceAnalysis with DmlExecutionProvider", flush=True) + app = FaceAnalysis( + name="buffalo_l", + root=MODEL_ROOT, + providers=["DmlExecutionProvider", "CPUExecutionProvider"], + ) + app.prepare(ctx_id=0, det_size=(640, 640)) + + # group queue by video so we can keep one VideoCapture open and seek + from collections import defaultdict + by_video = defaultdict(list) + for e in pending: + by_video[e["win_video_path"]].append(e) + + n_done = 0 + n_load_err = 0 + last_flush = time.time() + t_start = time.time() + new_buffer: list = [] + + def flush(): + # append-only: only NEW records since last flush get written. O(new_records), + # not O(total_records). Was 11s/flush at 9k records; now <50ms. + if new_buffer: + append_records(args.out, new_buffer) + new_buffer.clear() + write_compat_summary(args.out, len(results), processed) + + for vidpath, entries in by_video.items(): + # entries are already sorted by frame_idx. Hybrid decode strategy: + # 1. Seek ONCE to the first pending target (cheap keyframe-seek). + # 2. Sequential cap.grab() between subsequent targets (decode without + # BGR conversion until we reach a target, then cap.retrieve()). + # This avoids per-sample seek cost (the original pathology that + # caused 1.4 fps deep in long videos) AND avoids grab-walking from + # frame 0 on resume (the over-correction that gave 0.08 fps). + entries.sort(key=lambda e: e["frame_idx"]) + cap = cv2.VideoCapture(vidpath) + if not cap.isOpened(): + print(f"[err] cannot open {vidpath}", flush=True) + for e in entries: + rec = { + "queue_id": e["queue_id"], "video_path": e["video_path"], + "frame_idx": e["frame_idx"], "time_s": e["time_s"], + "faces": [], "error": "cap_open", + } + results.append(rec); new_buffer.append(rec) + processed.add(e["queue_id"]) + n_done += 1 + n_load_err += 1 + continue + first_target = entries[0]["frame_idx"] + if first_target > 0: + cap.set(cv2.CAP_PROP_POS_FRAMES, first_target) + cur_frame_idx = first_target - 1 + else: + cur_frame_idx = -1 + for e in entries: + target = e["frame_idx"] + if target < cur_frame_idx + 1: + # backward jump (only triggers for unsorted entries — defensive) + cap.set(cv2.CAP_PROP_POS_FRAMES, target) + cur_frame_idx = target - 1 + # advance up to (but not including) target via grab()-only + ran_out = False + while cur_frame_idx + 1 < target: + ok = cap.grab() + if not ok: + ran_out = True + break + cur_frame_idx += 1 + if not ran_out: + ok = cap.grab() + if not ok: + ran_out = True + else: + cur_frame_idx = target + if ran_out: + rec = { + "queue_id": e["queue_id"], "video_path": e["video_path"], + "frame_idx": e["frame_idx"], "time_s": e["time_s"], + "faces": [], "error": "frame_read", + } + results.append(rec); new_buffer.append(rec) + processed.add(e["queue_id"]) + n_done += 1 + n_load_err += 1 + continue + ok, bgr = cap.retrieve() + if not ok or bgr is None: + rec = { + "queue_id": e["queue_id"], "video_path": e["video_path"], + "frame_idx": e["frame_idx"], "time_s": e["time_s"], + "faces": [], "error": "frame_read", + } + results.append(rec); new_buffer.append(rec) + processed.add(e["queue_id"]) + n_done += 1 + n_load_err += 1 + continue + + faces = app.get(bgr) + kept_faces = [] + H, W = bgr.shape[:2] + for f in faces: + if float(f.det_score) < MIN_DET: + continue + x1, y1, x2, y2 = [int(round(v)) for v in f.bbox] + x1 = max(x1, 0); y1 = max(y1, 0) + x2 = min(x2, W); y2 = min(y2, H) + w, h = x2 - x1, y2 - y1 + short = min(w, h) + if short < MIN_FACE_PIX: + continue + rec = { + "bbox": [x1, y1, x2, y2], + "det_score": float(f.det_score), + "face_short": int(short), + } + if hasattr(f, "pose") and f.pose is not None: + rec["pose"] = [float(x) for x in f.pose] # pitch, yaw, roll + if hasattr(f, "normed_embedding") and f.normed_embedding is not None: + rec["embedding"] = f.normed_embedding.astype(np.float32).tolist() + kept_faces.append(rec) + + rec = { + "queue_id": e["queue_id"], "video_path": e["video_path"], + "frame_idx": e["frame_idx"], "time_s": e["time_s"], + "frame_w": W, "frame_h": H, + "faces": kept_faces, + } + results.append(rec); new_buffer.append(rec) + processed.add(e["queue_id"]) + n_done += 1 + + if (n_done % FLUSH_EVERY == 0) or (time.time() - last_flush) > 30.0: + flush() + last_flush = time.time() + el = time.time() - t_start + rate = n_done / max(0.1, el) + eta = (len(pending) - n_done) / max(0.1, rate) / 60.0 + print(f"[scan] {n_done}/{len(pending)} rate={rate:.2f} fps eta={eta:.1f}min " + f"errs={n_load_err}", flush=True) + cap.release() + + flush() + el = time.time() - t_start + print(f"[done] {n_done} scored, {n_load_err} errors, {el:.1f}s " + f"({n_done/max(0.1,el):.2f} fps) -> {args.out}", flush=True) + + +if __name__ == "__main__": + main() diff --git a/work/video_target_pipeline.py b/work/video_target_pipeline.py new file mode 100644 index 0000000..31cf864 --- /dev/null +++ b/work/video_target_pipeline.py @@ -0,0 +1,917 @@ +"""Video target preprocessing pipeline for roop-unleashed. + +Discovers video files in an input folder, runs scene-cut detection, samples +frames within each scene, runs face detection + embedding via Windows DML +worker, stitches per-frame detections into face tracks, applies quality +gates, cuts approved segments out with ffmpeg stream-copy, and writes a +report. Output clips have generic UUID names + a sidecar JSON with full +provenance. + +Subcommands: + scan list input videos, run ffprobe, write per-video index + scenes PySceneDetect AdaptiveDetector per video; write scenes_.json + stage write frame queue.json (sampled @ 2 fps within scenes) + merge ingest worker results.json into per-video frame_results + track IoU+embedding stitching of per-frame detections into tracks + score track-level quality gating + segment plan + cut ffmpeg -c copy each accepted segment to /.mp4 + report HTML preview with thumbnails + identity tags +""" + +from __future__ import annotations + +import argparse +import json +import math +import re +import shutil +import subprocess +import sys +import time +import uuid +from collections import defaultdict +from pathlib import Path + +import numpy as np + +DEFAULT_INPUT = Path("/mnt/x/src/vd") +DEFAULT_OUTPUT = Path("/mnt/x/src/vd/ct") +WORK_DIR = Path("/opt/face-sets/work/video_preprocess") + +# defaults — first set was strict-portrait; second set loosened for side-profile + segment merging +SAMPLE_FPS = 2.0 +QUALITY_YAW_MAX = 75.0 # was 25; allow full 3/4 + profile (face-sets handle it) +QUALITY_PITCH_MAX = 45.0 # was 30 +QUALITY_FACE_MIN = 80 # was 96 +QUALITY_BLUR_MIN = 50.0 +QUALITY_DET_MIN = 0.5 # was 0.6 +TRACK_GATE_FRAC = 0.7 # >=70% of frames in track must pass per-frame gates +SEGMENT_MIN_S = 1.0 +SEGMENT_MAX_S = 30.0 # was 10 +SEGMENT_BRIDGE_S = 3.0 # was 1.0 — within-track pose-failure bridging +SEGMENT_MERGE_GAP_S = 2.0 # NEW — across-track merge if same scene + within this gap +TRACK_IOU_MIN = 0.3 +TRACK_EMB_MIN = 0.5 + +CACHES = [ + Path("/opt/face-sets/work/cache/nl_full.npz"), + Path("/opt/face-sets/work/cache/immich_peter.npz"), + Path("/opt/face-sets/work/cache/immich_nic.npz"), +] +FACESETS_ROOT = Path("/mnt/e/temp_things/fcswp/nl_sorted/facesets_swap_ready") +IDENTITY_TAG_THRESHOLD = 0.6 # cosine sim to faceset centroid + + +def wsl_to_win(p: str) -> str: + s = str(p) + if s.startswith("/mnt/"): + return f"{s[5].upper()}:\\{s[7:].replace('/', chr(92))}" + return s + + +# ----------------------------- ffprobe / scan ----------------------------- + +def ffprobe(video: Path) -> dict: + cmd = [ + "ffprobe", "-v", "error", "-print_format", "json", + "-show_format", "-show_streams", str(video), + ] + r = subprocess.run(cmd, capture_output=True, text=True, timeout=60) + if r.returncode != 0: + return {"error": r.stderr.strip()} + return json.loads(r.stdout) + + +def parse_video_meta(probe: dict) -> dict: + if "error" in probe: + return {"error": probe["error"]} + fmt = probe.get("format", {}) + duration = float(fmt.get("duration", 0)) + vstream = next((s for s in probe.get("streams", []) if s.get("codec_type") == "video"), None) + if vstream is None: + return {"error": "no video stream"} + fps_str = vstream.get("avg_frame_rate", "0/1") + try: + num, den = (int(x) for x in fps_str.split("/")) + fps = num / den if den else 0.0 + except Exception: + fps = 0.0 + nb_frames = int(vstream.get("nb_frames", 0)) or int(round(duration * fps)) + return { + "duration_s": duration, + "fps": fps, + "frames": nb_frames, + "width": int(vstream.get("width", 0)), + "height": int(vstream.get("height", 0)), + "codec": vstream.get("codec_name"), + } + + +def cmd_scan(args): + in_dir = Path(args.input) + out = Path(args.out) + out.parent.mkdir(parents=True, exist_ok=True) + extensions = {".mp4", ".mov", ".mkv", ".m4v", ".avi", ".webm"} + out_root = Path(args.output_dir).resolve() + videos = [] + for p in sorted(in_dir.iterdir() if not args.recursive else in_dir.rglob("*")): + if not p.is_file(): + continue + if out_root in p.parents or p.resolve() == out_root: + continue # never include the output dir + if p.suffix.lower() not in extensions: + continue + videos.append(p) + print(f"[scan] {len(videos)} candidate videos", file=sys.stderr) + inventory = [] + for p in videos: + meta = parse_video_meta(ffprobe(p)) + meta["path"] = str(p) + meta["win_path"] = wsl_to_win(str(p)) + meta["size"] = p.stat().st_size + inventory.append(meta) + if "error" not in meta: + print(f" {p.name}: {meta['duration_s']:.1f}s @ {meta['fps']:.1f}fps " + f"{meta['width']}x{meta['height']} {meta['codec']}", file=sys.stderr) + else: + print(f" {p.name}: ERROR {meta['error']}", file=sys.stderr) + out.write_text(json.dumps({"input": str(in_dir), "videos": inventory}, indent=2)) + print(f"[scan] inventory -> {out}", file=sys.stderr) + + +# ----------------------------- scenes ----------------------------- + +def cmd_scenes(args): + from scenedetect import open_video, SceneManager + from scenedetect.detectors import AdaptiveDetector + inv = json.loads(Path(args.inventory).read_text()) + out_dir = Path(args.out_dir) + out_dir.mkdir(parents=True, exist_ok=True) + only = set(args.only.split(",")) if args.only else None + for v in inv["videos"]: + if "error" in v: + continue + path = Path(v["path"]) + if only and path.name not in only: + continue + out_file = out_dir / (path.stem + ".scenes.json") + if out_file.exists() and not args.force: + continue + print(f"[scenes] {path.name} ...", file=sys.stderr, flush=True) + t0 = time.time() + try: + video = open_video(str(path)) + sm = SceneManager() + sm.add_detector(AdaptiveDetector(min_scene_len=int(round(v.get("fps", 30) or 30) * 0.5))) + sm.detect_scenes(video, show_progress=False) + scenes = sm.get_scene_list() + entries = [] + for s, e in scenes: + entries.append({ + "start_frame": s.frame_num, "end_frame": e.frame_num, + "start_s": s.get_seconds(), "end_s": e.get_seconds(), + "duration_s": e.get_seconds() - s.get_seconds(), + }) + # if no cuts found, treat the whole video as one scene + if not entries: + entries = [{ + "start_frame": 0, "end_frame": v["frames"], + "start_s": 0.0, "end_s": v["duration_s"], + "duration_s": v["duration_s"], + }] + out_file.write_text(json.dumps({"video": str(path), "scenes": entries}, indent=2)) + print(f" {len(entries)} scenes in {time.time()-t0:.1f}s -> {out_file.name}", + file=sys.stderr) + except Exception as e: + print(f" ERROR: {e}", file=sys.stderr) + + +# ----------------------------- stage ----------------------------- + +def cmd_stage(args): + inv = json.loads(Path(args.inventory).read_text()) + scenes_dir = Path(args.scenes_dir) + queue = [] + qid = 0 + sample_every = 1.0 / args.sample_fps + for v in inv["videos"]: + if "error" in v: + continue + p = Path(v["path"]) + sf = scenes_dir / (p.stem + ".scenes.json") + if not sf.exists(): + print(f"[warn] no scenes file for {p.name}; skipping", file=sys.stderr) + continue + scenes = json.loads(sf.read_text()).get("scenes", []) + fps = v.get("fps", 30) or 30 + for sc in scenes: + t = sc["start_s"] + while t < sc["end_s"] - 0.01: + fidx = int(round(t * fps)) + if fidx >= v["frames"]: + break + queue.append({ + "queue_id": f"q{qid:08d}", + "video_path": str(p), + "win_video_path": v["win_path"], + "frame_idx": fidx, + "time_s": t, + }) + qid += 1 + t += sample_every + out = Path(args.out) + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text(json.dumps(queue, indent=2)) + print(f"[stage] {len(queue)} sampled frames @ {args.sample_fps} fps -> {out}", + file=sys.stderr) + print(f"[stage] win path for worker: {wsl_to_win(str(out))}", file=sys.stderr) + + +# ----------------------------- merge + track ----------------------------- + +def cmd_merge(args): + """Read worker output and group by video_path. Supports either JSONL (one record + per line, the new format) or legacy JSON (results.json with `results` list).""" + src_path = Path(args.results) + records = [] + # try JSONL first (sister .jsonl file or .results passed directly) + jsonl_candidate = src_path.with_suffix(".jsonl") + if jsonl_candidate.exists(): + with open(jsonl_candidate) as f: + for line in f: + line = line.strip() + if line: + records.append(json.loads(line)) + elif src_path.suffix == ".jsonl": + with open(src_path) as f: + for line in f: + line = line.strip() + if line: + records.append(json.loads(line)) + else: + # legacy: monolithic JSON + src = json.loads(src_path.read_text()) + records = src.get("results", []) + by_video: dict[str, list] = {} + for r in records: + by_video.setdefault(r["video_path"], []).append(r) + for v in by_video: + by_video[v].sort(key=lambda x: x["frame_idx"]) + out = Path(args.out) + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text(json.dumps({"by_video": by_video}, indent=2)) + print(f"[merge] {sum(len(v) for v in by_video.values())} frames across {len(by_video)} videos " + f"-> {out}", file=sys.stderr) + + +def _iou(a, b): + ax1, ay1, ax2, ay2 = a + bx1, by1, bx2, by2 = b + ix1 = max(ax1, bx1); iy1 = max(ay1, by1) + ix2 = min(ax2, bx2); iy2 = min(ay2, by2) + iw = max(ix2 - ix1, 0); ih = max(iy2 - iy1, 0) + inter = iw * ih + ua = (ax2 - ax1) * (ay2 - ay1) + (bx2 - bx1) * (by2 - by1) - inter + return inter / ua if ua > 0 else 0.0 + + +def cmd_track(args): + """Stitch per-frame face detections into tracks within each scene of each video. + Track = list of (frame_idx, face_idx) where adjacent samples have IoU>=0.3 OR + cosine(emb)>=0.5. New face → new track. No cross-scene merging.""" + fr = json.loads(Path(args.frames).read_text()) + scenes_dir = Path(args.scenes_dir) + inv = json.loads(Path(args.inventory).read_text()) + inv_by_path = {v["path"]: v for v in inv["videos"]} + + all_video_tracks: dict[str, list] = {} + for video_path, frames in fr["by_video"].items(): + v = inv_by_path.get(video_path, {}) + sf = scenes_dir / (Path(video_path).stem + ".scenes.json") + scenes = json.loads(sf.read_text()).get("scenes", []) if sf.exists() else [] + # group frames by scene + scene_for_frame = {} + for si, sc in enumerate(scenes): + for f in frames: + if f["frame_idx"] >= sc["start_frame"] and f["frame_idx"] < sc["end_frame"]: + scene_for_frame.setdefault(si, []).append(f) + video_tracks = [] + for si, scene_frames in scene_for_frame.items(): + scene_frames.sort(key=lambda x: x["frame_idx"]) + # tracks = list of dict{ "members": [(frame_idx, face_idx, face_dict)], "last_bbox", "last_emb" } + tracks = [] + for f in scene_frames: + claimed = set() + for face_idx, face in enumerate(f.get("faces", [])): + bbox = face["bbox"] + emb = np.array(face.get("embedding", []), dtype=np.float32) if face.get("embedding") else None + best_track = None + best_score = 0.0 + for ti, tr in enumerate(tracks): + if ti in claimed: + continue + # staleness in TIME (sample period independent of source fps) + last_time = tr["members"][-1][3] + if f["time_s"] - last_time > 1.5: # stale if >1.5s gap (3 sample periods @ 2fps) + continue + score = _iou(tr["last_bbox"], bbox) + if emb is not None and tr.get("last_emb") is not None: + score = max(score, float(np.dot(tr["last_emb"], emb))) + if score > best_score: + best_score = score + best_track = ti + if best_track is not None and best_score >= min(TRACK_IOU_MIN, TRACK_EMB_MIN): + tr = tracks[best_track] + tr["members"].append((f["frame_idx"], face_idx, face, f["time_s"])) + tr["last_bbox"] = bbox + if emb is not None: + tr["last_emb"] = emb + claimed.add(best_track) + else: + tracks.append({ + "members": [(f["frame_idx"], face_idx, face, f["time_s"])], + "last_bbox": bbox, + "last_emb": emb, + }) + for tr in tracks: + if len(tr["members"]) < 2: + continue + video_tracks.append({ + "scene_idx": si, + "members": [ + {"frame_idx": m[0], "face_idx": m[1], "time_s": m[3], "face": m[2]} + for m in tr["members"] + ], + }) + all_video_tracks[video_path] = video_tracks + print(f"[track] {Path(video_path).name}: {sum(len(s) for s in scene_for_frame.values())} frames " + f"-> {len(video_tracks)} tracks across {len(scene_for_frame)} scenes", + file=sys.stderr) + + out = Path(args.out) + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text(json.dumps({"by_video": all_video_tracks}, indent=2)) + print(f"[track] -> {out}", file=sys.stderr) + + +# ----------------------------- score (quality gates) ----------------------------- + +def _track_passes(track, cfg): + """Per-frame quality gating; return list of bool (does each member pass) + + aggregate stats. cfg: dict with yaw_max, pitch_max, face_min, det_min.""" + passes = [] + yaws, pitches, sizes, dets = [], [], [], [] + for m in track["members"]: + f = m["face"] + yaw = abs(f.get("pose", [0, 0, 0])[1]) if f.get("pose") else 0 + pitch = abs(f.get("pose", [0, 0, 0])[0]) if f.get("pose") else 0 + size = f.get("face_short", 0) + det = f.get("det_score", 0) + ok = (yaw <= cfg["yaw_max"] and pitch <= cfg["pitch_max"] + and size >= cfg["face_min"] and det >= cfg["det_min"]) + passes.append(ok) + yaws.append(yaw); pitches.append(pitch); sizes.append(size); dets.append(det) + return passes, { + "n": len(passes), "n_pass": sum(passes), "frac_pass": sum(passes) / max(1, len(passes)), + "yaw_med": float(np.median(yaws)) if yaws else None, + "pitch_med": float(np.median(pitches)) if pitches else None, + "size_med": float(np.median(sizes)) if sizes else None, + "det_med": float(np.median(dets)) if dets else None, + } + + +def _build_segments(track, cfg): + """Return list of (start_s, end_s) accepted sub-segments of this track: + contiguous runs of passing frames meeting min/max duration. Pose-failure + spans <= cfg['bridge_s'] long get bridged across (handles momentary head + turns / detection misses).""" + passes, stats = _track_passes(track, cfg) + members = track["members"] + if not members: + return [], stats + # bridge gaps of failing frames (any width) up to cfg["bridge_s"] seconds + bridged = list(passes) + n = len(bridged) + i = 0 + while i < n: + if bridged[i]: + i += 1 + continue + # find run of consecutive False starting at i + j = i + while j < n and not bridged[j]: + j += 1 + # bridge if surrounded by True on both sides AND time gap <= bridge_s + if i > 0 and j < n and bridged[i - 1] and bridged[j]: + t_left = members[i - 1]["time_s"] + t_right = members[j]["time_s"] + if t_right - t_left <= cfg["bridge_s"]: + for k in range(i, j): + bridged[k] = True + i = j + # find runs of True + runs = [] + i = 0 + while i < n: + if not bridged[i]: + i += 1; continue + j = i + while j + 1 < n and bridged[j + 1]: + j += 1 + s = members[i]["time_s"] + # end is the time of the last passing sample plus one sample-period + e = members[j]["time_s"] + 1.0 / max(SAMPLE_FPS, 1e-3) + runs.append((s, e)) + i = j + 1 + return runs, stats + + +def _merge_close_segments(segs_with_meta, merge_gap_s: float): + """Merge segments within the same scene that are within merge_gap_s of each other. + segs_with_meta: list of dicts with start_s, end_s, scene_idx, track_idx, stats. + Returns list of merged dicts (one per merged group). Identity-tag and stats + aggregation happen later.""" + by_scene: dict[int, list] = {} + for s in segs_with_meta: + by_scene.setdefault(s["scene_idx"], []).append(s) + merged_all = [] + for scene_idx, segs in by_scene.items(): + segs.sort(key=lambda x: x["start_s"]) + cur = None + for s in segs: + if cur is None: + cur = {**s, "track_idxs": [s["track_idx"]], "member_count": s["stats"]["n"], + "pass_count": s["stats"]["n_pass"]} + continue + gap = s["start_s"] - cur["end_s"] + if gap <= merge_gap_s: + # merge + cur["end_s"] = max(cur["end_s"], s["end_s"]) + cur["track_idxs"].append(s["track_idx"]) + cur["member_count"] += s["stats"]["n"] + cur["pass_count"] += s["stats"]["n_pass"] + # take the better-quality stats for display + if s["stats"]["n_pass"] > cur["stats"]["n_pass"]: + cur["stats"] = s["stats"] + else: + merged_all.append(cur) + cur = {**s, "track_idxs": [s["track_idx"]], "member_count": s["stats"]["n"], + "pass_count": s["stats"]["n_pass"]} + if cur is not None: + merged_all.append(cur) + return merged_all + + +def _split_long_segments(segs_with_meta, min_s: float, max_s: float): + """Apply min/max duration: drop too-short, split too-long evenly.""" + out = [] + for s in segs_with_meta: + dur = s["end_s"] - s["start_s"] + if dur < min_s: + continue + if dur <= max_s: + out.append(s) + continue + n = int(math.ceil(dur / max_s)) + chunk = dur / n + base_start = s["start_s"] + for k in range(n): + piece = dict(s) + piece["start_s"] = base_start + k * chunk + piece["end_s"] = base_start + (k + 1) * chunk + out.append(piece) + return out + + +# identity tagging via cached arcface centroids +def load_caches_index(): + rec_index = {} + alias_map = {} + for c in CACHES: + if not c.exists(): + continue + d = np.load(c, allow_pickle=True) + emb = d["embeddings"] + meta = json.loads(str(d["meta"])) + face_records = [m for m in meta if not m.get("noface")] + if "path_aliases" in d.files: + paliases = json.loads(str(d["path_aliases"])) + for canon, alist in paliases.items(): + alias_map.setdefault(canon, canon) + for a in alist: + alias_map[a] = canon + for i, rec in enumerate(face_records): + v = emb[i].astype(np.float32) + n = float(np.linalg.norm(v)) + if n > 0: + v = v / n + rec_index[(rec["path"], tuple(int(x) for x in rec["bbox"]))] = v + alias_map.setdefault(rec["path"], rec["path"]) + return rec_index, alias_map + + +def load_faceset_centroids(): + """Return dict faceset_name -> normalized centroid embedding.""" + rec_index, alias_map = load_caches_index() + centroids = {} + for fs_dir in sorted(FACESETS_ROOT.iterdir()): + if not fs_dir.is_dir() or fs_dir.name.startswith("_"): + continue + # exclude era splits to avoid double-tagging within a family + if re.match(r"^faceset_\d+_(?:\d{4}-\d{2,4}|\d{4}|undated)", fs_dir.name): + continue + mp = fs_dir / "manifest.json" + if not mp.exists(): + continue + m = json.loads(mp.read_text()) + vecs = [] + for f in m.get("faces", []): + src = f.get("source"); bbox = f.get("bbox") + if not src or not bbox: + continue + canon = alias_map.get(src, src) + v = rec_index.get((canon, tuple(int(x) for x in bbox))) + if v is None and canon != src: + v = rec_index.get((src, tuple(int(x) for x in bbox))) + if v is not None: + vecs.append(v) + if len(vecs) < 3: + continue + c = np.stack(vecs).mean(axis=0) + n = float(np.linalg.norm(c)) + if n > 0: + c = c / n + centroids[fs_dir.name] = c + return centroids + + +def _track_centroid(track): + embs = [m["face"].get("embedding") for m in track["members"] if m["face"].get("embedding")] + if not embs: + return None + arr = np.array(embs, dtype=np.float32) + c = arr.mean(axis=0) + n = float(np.linalg.norm(c)) + return c / n if n > 0 else c + + +def cmd_score(args): + tr = json.loads(Path(args.tracks).read_text()) + inv = json.loads(Path(args.inventory).read_text()) + inv_by_path = {v["path"]: v for v in inv["videos"]} + + cfg = { + "yaw_max": args.max_yaw, "pitch_max": args.max_pitch, + "face_min": args.min_face, "det_min": args.min_det, + "bridge_s": args.bridge_gap, + } + + centroids = {} + if not args.no_identity: + print("[score] loading faceset centroids ...", file=sys.stderr) + t0 = time.time() + centroids = load_faceset_centroids() + print(f"[score] {len(centroids)} active faceset centroids loaded in {time.time()-t0:.1f}s", + file=sys.stderr) + + n_total_tracks = 0 + n_accepted_tracks = 0 + # collect per-track candidate segments first; merging happens per-video below + per_video_candidates: dict[str, list] = {} + track_centroids_by_video: dict[str, dict] = {} + for video_path, tracks in tr["by_video"].items(): + per_video_candidates.setdefault(video_path, []) + track_centroids_by_video.setdefault(video_path, {}) + for ti, track in enumerate(tracks): + n_total_tracks += 1 + runs, stats = _build_segments(track, cfg) + if stats["frac_pass"] < args.track_gate_frac: + continue + if not runs: + continue + n_accepted_tracks += 1 + track_centroids_by_video[video_path][ti] = _track_centroid(track) + for (s, e) in runs: + per_video_candidates[video_path].append({ + "video_path": video_path, + "track_idx": ti, + "scene_idx": track["scene_idx"], + "start_s": s, + "end_s": e, + "stats": stats, + }) + + plan = [] + for video_path, segs in per_video_candidates.items(): + if not segs: + continue + # merge across tracks within the same scene if gap <= merge_gap_s + merged = _merge_close_segments(segs, args.merge_gap) + # apply min/max duration (split long, drop short) + merged = _split_long_segments(merged, args.min_dur, args.max_dur) + for s in merged: + tag = None + tag_sim = None + # identity from union of contributing tracks' centroids + if centroids: + track_centroid_list = [ + track_centroids_by_video[video_path].get(ti) + for ti in s.get("track_idxs", [s.get("track_idx")]) + ] + track_centroid_list = [c for c in track_centroid_list if c is not None] + if track_centroid_list: + union = np.stack(track_centroid_list).mean(axis=0) + nm = float(np.linalg.norm(union)) + if nm > 0: + union = union / nm + sims = {name: float(np.dot(c, union)) for name, c in centroids.items()} + best = max(sims, key=sims.get) + if sims[best] >= IDENTITY_TAG_THRESHOLD: + tag = best; tag_sim = round(sims[best], 4) + plan.append({ + "video_path": video_path, + "track_idxs": s.get("track_idxs", [s.get("track_idx")]), + "scene_idx": s["scene_idx"], + "start_s": round(s["start_s"], 3), + "end_s": round(s["end_s"], 3), + "duration_s": round(s["end_s"] - s["start_s"], 3), + "member_count": s.get("member_count", s["stats"]["n"]), + "pass_count": s.get("pass_count", s["stats"]["n_pass"]), + "stats": s["stats"], + "identity_tag": tag, + "identity_sim": tag_sim, + "uuid": uuid.uuid4().hex[:12], + }) + + plan.sort(key=lambda p: (p["video_path"], p["start_s"])) + out = Path(args.out) + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text(json.dumps({ + "thresholds": { + "yaw_max": args.max_yaw, "pitch_max": args.max_pitch, + "face_min": args.min_face, "blur_min": QUALITY_BLUR_MIN, + "det_min": args.min_det, "track_gate_frac": args.track_gate_frac, + "bridge_s": args.bridge_gap, "merge_gap_s": args.merge_gap, + "min_dur_s": args.min_dur, "max_dur_s": args.max_dur, + "identity_tag_threshold": IDENTITY_TAG_THRESHOLD, + }, + "totals": { + "tracks_total": n_total_tracks, "tracks_accepted": n_accepted_tracks, + "segments": len(plan), + }, + "plan": plan, + }, indent=2)) + print(f"[score] {n_accepted_tracks}/{n_total_tracks} tracks accepted -> {len(plan)} segments " + f"-> {out}", file=sys.stderr) + + +# ----------------------------- cut ----------------------------- + +def cmd_cut(args): + plan = json.loads(Path(args.plan).read_text()) + out_dir = Path(args.output_dir) + out_dir.mkdir(parents=True, exist_ok=True) + + if args.clean: + # remove only existing UUID-named clips + sidecars (12-char hex), keeping any other files + import re as _re + uuid_pat = _re.compile(r"^[0-9a-f]{12}\.(mp4|json)$") + n_removed = 0 + for child in out_dir.iterdir(): + if child.is_file() and uuid_pat.match(child.name): + child.unlink() + n_removed += 1 + elif child.is_dir() and _re.match(r"^[A-Za-z0-9_.-]+$", child.name): + # subfolder of prior runs — clear UUID files inside, then remove if empty + for inner in child.iterdir(): + if inner.is_file() and uuid_pat.match(inner.name): + inner.unlink() + n_removed += 1 + try: + child.rmdir() + except OSError: + pass + if n_removed: + print(f"[clean] removed {n_removed} prior UUID clips/sidecars", file=sys.stderr) + + n_done = 0 + n_err = 0 + sidecars = [] + for seg in plan["plan"]: + sub = Path(seg["video_path"]).stem + seg_dir = out_dir / sub + seg_dir.mkdir(parents=True, exist_ok=True) + out_video = seg_dir / f"{seg['uuid']}.mp4" + if out_video.exists() and not args.force: + continue + s = seg["start_s"]; d = seg["duration_s"] + cmd = [ + "ffmpeg", "-y", "-loglevel", "error", + "-ss", f"{s}", + "-i", seg["video_path"], + "-t", f"{d}", + "-c", "copy", + "-avoid_negative_ts", "make_zero", + str(out_video), + ] + r = subprocess.run(cmd, capture_output=True, text=True, timeout=120) + if r.returncode != 0 or not out_video.exists() or out_video.stat().st_size < 1024: + print(f"[cut-err] {seg['uuid']} {seg['video_path']}@{s}+{d}: {r.stderr.strip()[:200]}", + file=sys.stderr) + n_err += 1 + if out_video.exists() and out_video.stat().st_size < 1024: + out_video.unlink() + continue + # sidecar (alongside the clip in the source-named subfolder) + sidecar = seg_dir / f"{seg['uuid']}.json" + sidecar.write_text(json.dumps({ + "uuid": seg["uuid"], + "source_video": seg["video_path"], + "source_basename": Path(seg["video_path"]).name, + "start_s": s, "end_s": seg["end_s"], "duration_s": d, + "scene_idx": seg["scene_idx"], + "track_idxs": seg.get("track_idxs", [seg.get("track_idx")]), + "member_count": seg.get("member_count"), + "pass_count": seg.get("pass_count"), + "stats": seg["stats"], + "identity_tag": seg["identity_tag"], + "identity_sim": seg["identity_sim"], + "thresholds": plan["thresholds"], + }, indent=2)) + sidecars.append(sidecar) + n_done += 1 + print(f"[cut] {n_done} clips written, {n_err} errors -> {out_dir}", file=sys.stderr) + + +# ----------------------------- report ----------------------------- + +def cmd_report(args): + plan = json.loads(Path(args.plan).read_text()) + out_dir = Path(args.out) + out_dir.mkdir(parents=True, exist_ok=True) + thumbs_dir = out_dir / "thumbs" + thumbs_dir.mkdir(exist_ok=True) + output_dir = Path(args.output_dir) + + # group by video + by_video: dict[str, list] = {} + for seg in plan["plan"]: + by_video.setdefault(seg["video_path"], []).append(seg) + + # generate thumbs from each segment's first frame via ffmpeg + print(f"[report] generating thumbs for {len(plan['plan'])} segments", file=sys.stderr) + for seg in plan["plan"]: + thumb = thumbs_dir / f"{seg['uuid']}.jpg" + if thumb.exists(): + continue + s = seg["start_s"] + 0.1 + cmd = [ + "ffmpeg", "-y", "-loglevel", "error", + "-ss", f"{s}", + "-i", seg["video_path"], + "-frames:v", "1", + "-vf", "scale=240:-1", + str(thumb), + ] + subprocess.run(cmd, capture_output=True, timeout=30) + + # render + rows = [] + rows.append("

Video target preprocessing — review

") + t = plan["totals"] + th = plan["thresholds"] + rows.append(f"

Tracks accepted: {t['tracks_accepted']}/{t['tracks_total']}; " + f"segments emitted: {t['segments']}.
" + f"Thresholds: pose ≤{th['yaw_max']}°yaw / {th['pitch_max']}°pitch, " + f"face_short ≥{th['face_min']}px, det ≥{th['det_min']}, " + f"track-gate ≥{int(100*th['track_gate_frac'])}%, " + f"duration {th['min_dur_s']}–{th['max_dur_s']}s. " + f"Output dir: {output_dir}

") + nav = " · ".join(f"{Path(v).name}" + for i, v in enumerate(by_video.keys())) + rows.append(f"") + for vi, (video_path, segs) in enumerate(by_video.items()): + rows.append(f"
") + rows.append(f"

{Path(video_path).name} ({len(segs)} segments)

") + rows.append("
") + for seg in sorted(segs, key=lambda x: x["start_s"]): + stats = seg["stats"] + tag = seg["identity_tag"] or "" + tag_sim = seg["identity_sim"] + tag_html = (f"{tag} ({tag_sim:.2f})" if tag else "untagged") + sub_name = Path(seg['video_path']).stem + rows.append( + f"
" + f"" + f"
" + f"{sub_name}/{seg['uuid']}.mp4
" + f"{seg['start_s']:.1f}s → {seg['end_s']:.1f}s ({seg['duration_s']:.1f}s)
" + f"yaw={stats['yaw_med']:.0f}° size={stats['size_med']:.0f}px det={stats['det_med']:.2f}
" + f"pass {stats['n_pass']}/{stats['n']}
" + f"{tag_html}" + f"
" + ) + rows.append("
") + html = f""" +Video targets review + + +{''.join(rows)} +""" + out_html = out_dir / "index.html" + out_html.write_text(html) + print(f"[report] -> {out_html}", file=sys.stderr) + + +# ----------------------------- main ----------------------------- + +def main(): + ap = argparse.ArgumentParser() + sub = ap.add_subparsers(dest="cmd", required=True) + + s = sub.add_parser("scan") + s.add_argument("--input", default=str(DEFAULT_INPUT)) + s.add_argument("--output-dir", default=str(DEFAULT_OUTPUT)) + s.add_argument("--recursive", action="store_true") + s.add_argument("--out", required=True) + s.set_defaults(func=cmd_scan) + + sc = sub.add_parser("scenes") + sc.add_argument("--inventory", required=True) + sc.add_argument("--out-dir", required=True) + sc.add_argument("--only", default=None, help="comma-separated basenames to limit run") + sc.add_argument("--force", action="store_true") + sc.set_defaults(func=cmd_scenes) + + st = sub.add_parser("stage") + st.add_argument("--inventory", required=True) + st.add_argument("--scenes-dir", required=True) + st.add_argument("--sample-fps", type=float, default=SAMPLE_FPS) + st.add_argument("--out", required=True) + st.set_defaults(func=cmd_stage) + + m = sub.add_parser("merge") + m.add_argument("--results", required=True) + m.add_argument("--out", required=True) + m.set_defaults(func=cmd_merge) + + tr = sub.add_parser("track") + tr.add_argument("--frames", required=True) + tr.add_argument("--scenes-dir", required=True) + tr.add_argument("--inventory", required=True) + tr.add_argument("--sample-fps", type=float, default=SAMPLE_FPS) + tr.add_argument("--out", required=True) + tr.set_defaults(func=cmd_track) + + sc2 = sub.add_parser("score") + sc2.add_argument("--tracks", required=True) + sc2.add_argument("--inventory", required=True) + sc2.add_argument("--out", required=True) + sc2.add_argument("--no-identity", action="store_true") + sc2.add_argument("--max-yaw", type=float, default=QUALITY_YAW_MAX) + sc2.add_argument("--max-pitch", type=float, default=QUALITY_PITCH_MAX) + sc2.add_argument("--min-face", type=int, default=QUALITY_FACE_MIN) + sc2.add_argument("--min-det", type=float, default=QUALITY_DET_MIN) + sc2.add_argument("--track-gate-frac", type=float, default=TRACK_GATE_FRAC) + sc2.add_argument("--bridge-gap", type=float, default=SEGMENT_BRIDGE_S, + help="bridge within-track failure gaps up to this many seconds") + sc2.add_argument("--merge-gap", type=float, default=SEGMENT_MERGE_GAP_S, + help="merge across-track segments in same scene if within this gap") + sc2.add_argument("--min-dur", type=float, default=SEGMENT_MIN_S) + sc2.add_argument("--max-dur", type=float, default=SEGMENT_MAX_S) + sc2.set_defaults(func=cmd_score) + + cu = sub.add_parser("cut") + cu.add_argument("--plan", required=True) + cu.add_argument("--output-dir", default=str(DEFAULT_OUTPUT)) + cu.add_argument("--force", action="store_true") + cu.add_argument("--clean", action="store_true", + help="remove prior UUID-named clips before cutting (preserves non-UUID files)") + cu.set_defaults(func=cmd_cut) + + rp = sub.add_parser("report") + rp.add_argument("--plan", required=True) + rp.add_argument("--output-dir", default=str(DEFAULT_OUTPUT)) + rp.add_argument("--out", required=True) + rp.set_defaults(func=cmd_report) + + args = ap.parse_args() + args.func(args) + + +if __name__ == "__main__": + main()