"""Windows / DirectML video frame face worker. Reads a queue.json from /opt/face-sets/work/video_target_pipeline.py:stage (WSL side), each entry: {video_path, win_video_path, frame_idx, time_s, queue_id}. Decodes frame N from the video, runs insightface FaceAnalysis, emits per-face records (bbox, det_score, pose, embedding, face_short). CLI: py -3.12 video_face_worker.py [--limit N] Resumable: existing entries in out_results.json with the same queue_id are skipped. """ from __future__ import annotations import argparse import json import os import sys import time from pathlib import Path import numpy as np import cv2 from insightface.app import FaceAnalysis MODEL_ROOT = r"C:\face_embed_venv\models" MIN_DET = 0.5 MIN_FACE_PIX = 40 FLUSH_EVERY = 100 def jsonl_path_for(out_path: Path) -> Path: """Sister JSONL file: one result-record per line, append-only.""" return out_path.with_suffix(".jsonl") def load_existing(out_path: Path): """Load existing results from .jsonl (preferred) or legacy .json (one-time conversion). Returns (records_list, processed_set).""" jsonl = jsonl_path_for(out_path) if jsonl.exists(): records = [] processed = set() with open(jsonl) as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line: continue try: r = json.loads(line) records.append(r) if r.get("queue_id"): processed.add(r["queue_id"]) except json.JSONDecodeError: print(f"[warn] {jsonl}:{line_num} corrupt; skipping", file=sys.stderr) return records, processed # legacy JSON support: load once, convert to JSONL if out_path.exists(): try: d = json.loads(out_path.read_text()) records = d.get("results", []) processed = set(d.get("processed", [])) print(f"[migrate] converting {len(records)} legacy JSON records to JSONL", file=sys.stderr) with open(jsonl, "w") as f: for r in records: f.write(json.dumps(r) + "\n") return records, processed except Exception as e: print(f"[warn] could not parse {out_path}: {e}; starting fresh", file=sys.stderr) return [], set() def append_records(out_path: Path, new_records: list): """Append-only write to the sister .jsonl file. No re-serialization of prior records.""" if not new_records: return jsonl = jsonl_path_for(out_path) with open(jsonl, "a") as f: for r in new_records: f.write(json.dumps(r) + "\n") def write_compat_summary(out_path: Path, total_records: int, processed: set): """Write a tiny JSON pointer file at the legacy out_path so older consumers still see *something*, but the canonical store is the .jsonl. Cheap.""" summary = { "_format": "jsonl-pointer", "_jsonl": str(jsonl_path_for(out_path).name), "results_count": total_records, "processed_count": len(processed), } tmp = out_path.with_suffix(".tmp.json") tmp.write_text(json.dumps(summary, indent=2)) os.replace(tmp, out_path) def main(): ap = argparse.ArgumentParser() ap.add_argument("queue", type=Path) ap.add_argument("out", type=Path) ap.add_argument("--limit", type=int, default=None) args = ap.parse_args() queue = json.loads(args.queue.read_text()) print(f"[queue] {len(queue)} entries from {args.queue}", flush=True) args.out.parent.mkdir(parents=True, exist_ok=True) results, processed = load_existing(args.out) if processed: print(f"[resume] {len(processed)} already scored", flush=True) pending = [e for e in queue if e["queue_id"] not in processed] if args.limit is not None: pending = pending[: args.limit] print(f"[pending] {len(pending)} entries", flush=True) if not pending: print("[done] nothing to do") return print("[load] FaceAnalysis with DmlExecutionProvider", flush=True) app = FaceAnalysis( name="buffalo_l", root=MODEL_ROOT, providers=["DmlExecutionProvider", "CPUExecutionProvider"], ) app.prepare(ctx_id=0, det_size=(640, 640)) # group queue by video so we can keep one VideoCapture open and seek from collections import defaultdict by_video = defaultdict(list) for e in pending: by_video[e["win_video_path"]].append(e) n_done = 0 n_load_err = 0 last_flush = time.time() t_start = time.time() new_buffer: list = [] def flush(): # append-only: only NEW records since last flush get written. O(new_records), # not O(total_records). Was 11s/flush at 9k records; now <50ms. if new_buffer: append_records(args.out, new_buffer) new_buffer.clear() write_compat_summary(args.out, len(results), processed) for vidpath, entries in by_video.items(): # entries are already sorted by frame_idx. Hybrid decode strategy: # 1. Seek ONCE to the first pending target (cheap keyframe-seek). # 2. Sequential cap.grab() between subsequent targets (decode without # BGR conversion until we reach a target, then cap.retrieve()). # This avoids per-sample seek cost (the original pathology that # caused 1.4 fps deep in long videos) AND avoids grab-walking from # frame 0 on resume (the over-correction that gave 0.08 fps). entries.sort(key=lambda e: e["frame_idx"]) cap = cv2.VideoCapture(vidpath) if not cap.isOpened(): print(f"[err] cannot open {vidpath}", flush=True) for e in entries: rec = { "queue_id": e["queue_id"], "video_path": e["video_path"], "frame_idx": e["frame_idx"], "time_s": e["time_s"], "faces": [], "error": "cap_open", } results.append(rec); new_buffer.append(rec) processed.add(e["queue_id"]) n_done += 1 n_load_err += 1 continue first_target = entries[0]["frame_idx"] if first_target > 0: cap.set(cv2.CAP_PROP_POS_FRAMES, first_target) cur_frame_idx = first_target - 1 else: cur_frame_idx = -1 for e in entries: target = e["frame_idx"] if target < cur_frame_idx + 1: # backward jump (only triggers for unsorted entries — defensive) cap.set(cv2.CAP_PROP_POS_FRAMES, target) cur_frame_idx = target - 1 # advance up to (but not including) target via grab()-only ran_out = False while cur_frame_idx + 1 < target: ok = cap.grab() if not ok: ran_out = True break cur_frame_idx += 1 if not ran_out: ok = cap.grab() if not ok: ran_out = True else: cur_frame_idx = target if ran_out: rec = { "queue_id": e["queue_id"], "video_path": e["video_path"], "frame_idx": e["frame_idx"], "time_s": e["time_s"], "faces": [], "error": "frame_read", } results.append(rec); new_buffer.append(rec) processed.add(e["queue_id"]) n_done += 1 n_load_err += 1 continue ok, bgr = cap.retrieve() if not ok or bgr is None: rec = { "queue_id": e["queue_id"], "video_path": e["video_path"], "frame_idx": e["frame_idx"], "time_s": e["time_s"], "faces": [], "error": "frame_read", } results.append(rec); new_buffer.append(rec) processed.add(e["queue_id"]) n_done += 1 n_load_err += 1 continue faces = app.get(bgr) kept_faces = [] H, W = bgr.shape[:2] for f in faces: if float(f.det_score) < MIN_DET: continue x1, y1, x2, y2 = [int(round(v)) for v in f.bbox] x1 = max(x1, 0); y1 = max(y1, 0) x2 = min(x2, W); y2 = min(y2, H) w, h = x2 - x1, y2 - y1 short = min(w, h) if short < MIN_FACE_PIX: continue rec = { "bbox": [x1, y1, x2, y2], "det_score": float(f.det_score), "face_short": int(short), } if hasattr(f, "pose") and f.pose is not None: rec["pose"] = [float(x) for x in f.pose] # pitch, yaw, roll if hasattr(f, "normed_embedding") and f.normed_embedding is not None: rec["embedding"] = f.normed_embedding.astype(np.float32).tolist() kept_faces.append(rec) rec = { "queue_id": e["queue_id"], "video_path": e["video_path"], "frame_idx": e["frame_idx"], "time_s": e["time_s"], "frame_w": W, "frame_h": H, "faces": kept_faces, } results.append(rec); new_buffer.append(rec) processed.add(e["queue_id"]) n_done += 1 if (n_done % FLUSH_EVERY == 0) or (time.time() - last_flush) > 30.0: flush() last_flush = time.time() el = time.time() - t_start rate = n_done / max(0.1, el) eta = (len(pending) - n_done) / max(0.1, rate) / 60.0 print(f"[scan] {n_done}/{len(pending)} rate={rate:.2f} fps eta={eta:.1f}min " f"errs={n_load_err}", flush=True) cap.release() flush() el = time.time() - t_start print(f"[done] {n_done} scored, {n_load_err} errors, {el:.1f}s " f"({n_done/max(0.1,el):.2f} fps) -> {args.out}", flush=True) if __name__ == "__main__": main()