"""Windows / DirectML embed worker. Reads a queue.json staged by /opt/face-sets/work/immich_stage.py (WSL side), runs InsightFace's FaceAnalysis on each image with the DmlExecutionProvider backed by the AMD Vega, and writes a cache file in the schema produced by sort_faces.py:cmd_embed (so it can be merged into nl_full.npz). CLI: py -3.12 embed_worker.py [--limit N] The queue.json entry shape (each item) is: { "asset_id": "...", "sha256": "...", "wsl_path": "/mnt/x/src/immich//", # canonical path stored "win_path": "X:\\src\\immich\\\\", # what we read from "size_bytes": int, "width": int, "height": int, ... } Per face record matches cmd_embed's schema: path, face_idx, det_score, bbox, face_short, face_area, blur, noface=False, hash plus landmark_2d_106, landmark_3d_68, pose (FaceAnalysis returns these for free and the existing cache already carries them after `enrich`). """ from __future__ import annotations import argparse import json import os import sys import time from pathlib import Path import numpy as np from PIL import Image, ImageOps from insightface.app import FaceAnalysis MODEL_ROOT = r"C:\face_embed_venv\models" MIN_DET_SCORE = 0.5 MIN_FACE_PIX = 40 FLUSH_EVERY = 50 def load_rgb_bgr(path: Path): try: with Image.open(path) as im: im = ImageOps.exif_transpose(im) im = im.convert("RGB") rgb = np.array(im) bgr = rgb[:, :, ::-1].copy() return rgb, bgr except Exception as e: print(f"[warn] failed to load {path}: {e}", file=sys.stderr) return None, None def laplacian_variance(gray: np.ndarray) -> float: g = gray.astype(np.float32) lap = ( -4.0 * g[1:-1, 1:-1] + g[:-2, 1:-1] + g[2:, 1:-1] + g[1:-1, :-2] + g[1:-1, 2:] ) return float(lap.var()) def save_cache(out_path: Path, emb_chunks: list, meta: list, processed: set, src_root: str): emb = np.concatenate(emb_chunks) if emb_chunks else np.zeros((0, 512), dtype=np.float32) tmp = out_path.with_suffix(".tmp.npz") np.savez( str(tmp), embeddings=emb, meta=json.dumps(meta), src_root=str(src_root), processed_paths=json.dumps(sorted(processed)), path_aliases=json.dumps({}), schema="v2", ) os.replace(tmp, out_path) def load_cache_if_exists(out_path: Path): """Resume helper. Returns (emb_chunks, meta, processed_set).""" if not out_path.exists(): return [], [], set() data = np.load(out_path, allow_pickle=True) emb = data["embeddings"] meta = json.loads(str(data["meta"])) processed = set(json.loads(str(data["processed_paths"]))) return [emb] if len(emb) else [], list(meta), processed def main(): p = argparse.ArgumentParser() p.add_argument("queue", type=Path) p.add_argument("out", type=Path) p.add_argument("--limit", type=int, default=None) args = p.parse_args() queue = json.loads(args.queue.read_text()) print(f"queue: {len(queue)} entries from {args.queue}") args.out.parent.mkdir(parents=True, exist_ok=True) emb_chunks, meta, processed = load_cache_if_exists(args.out) n_existing_records = len(meta) n_existing_emb = sum(e.shape[0] for e in emb_chunks) if n_existing_records: print(f"resume: {n_existing_records} existing meta records " f"({n_existing_emb} embeddings, {len(processed)} processed paths)") print("initializing FaceAnalysis with DmlExecutionProvider") app = FaceAnalysis( name="buffalo_l", root=MODEL_ROOT, providers=["DmlExecutionProvider", "CPUExecutionProvider"], ) app.prepare(ctx_id=0, det_size=(640, 640)) src_root = "/mnt/x/src/immich" n_done = 0 n_face_records_added = 0 n_noface_added = 0 n_skipped = 0 n_load_err = 0 t0 = time.perf_counter() last_flush = time.perf_counter() new_emb_chunks: list[np.ndarray] = [] new_meta: list[dict] = [] def flush(): nonlocal new_emb_chunks, new_meta, last_flush if not new_emb_chunks and not new_meta: return if new_emb_chunks: emb_chunks.append(np.concatenate(new_emb_chunks)) new_emb_chunks = [] for r in new_meta: meta.append(r) new_meta = [] save_cache(args.out, emb_chunks, meta, processed, src_root) last_flush = time.perf_counter() for i, entry in enumerate(queue): if args.limit is not None and n_done >= args.limit: break wsl_path = entry["wsl_path"] win_path = entry["win_path"] sha = entry["sha256"] if wsl_path in processed: n_skipped += 1 continue rgb, bgr = load_rgb_bgr(Path(win_path)) if bgr is None: new_meta.append({ "path": wsl_path, "face_idx": -1, "noface": True, "hash": sha, "error": "load", }) processed.add(wsl_path) n_load_err += 1 n_done += 1 continue faces = app.get(bgr) kept_any = False for j, f in enumerate(faces): if float(f.det_score) < MIN_DET_SCORE: continue x1, y1, x2, y2 = [int(round(v)) for v in f.bbox] x1 = max(x1, 0); y1 = max(y1, 0) x2 = min(x2, rgb.shape[1]); y2 = min(y2, rgb.shape[0]) w, h = x2 - x1, y2 - y1 short = min(w, h) if short < MIN_FACE_PIX: continue crop = rgb[y1:y2, x1:x2] if crop.size == 0: continue gray = crop.mean(axis=2) blur = laplacian_variance(gray) if min(gray.shape) > 3 else 0.0 emb = f.normed_embedding.astype(np.float32) new_emb_chunks.append(emb[None, :]) rec = { "path": wsl_path, "face_idx": j, "det_score": float(f.det_score), "bbox": [x1, y1, x2, y2], "face_short": int(short), "face_area": int(w * h), "blur": blur, "noface": False, "hash": sha, } # Enrichment-equivalent fields (FaceAnalysis returns these for free) if hasattr(f, "landmark_2d_106") and f.landmark_2d_106 is not None: rec["landmark_2d_106"] = f.landmark_2d_106.astype(np.float32).tolist() if hasattr(f, "landmark_3d_68") and f.landmark_3d_68 is not None: rec["landmark_3d_68"] = f.landmark_3d_68.astype(np.float32).tolist() if hasattr(f, "pose") and f.pose is not None: rec["pose"] = [float(x) for x in f.pose] new_meta.append(rec) kept_any = True n_face_records_added += 1 if not kept_any: new_meta.append({ "path": wsl_path, "face_idx": -1, "noface": True, "hash": sha, }) n_noface_added += 1 processed.add(wsl_path) n_done += 1 if (n_done % FLUSH_EVERY == 0) or (time.perf_counter() - last_flush) > 30.0: flush() elapsed = time.perf_counter() - t0 rate = n_done / max(0.1, elapsed) print( f"[embed] done={n_done:5d}/{len(queue)} faces+={n_face_records_added:5d} " f"noface+={n_noface_added:4d} skipped={n_skipped:4d} " f"load_err={n_load_err:3d} rate={rate:.1f} img/s " f"({elapsed:.1f}s elapsed)" ) flush() elapsed = time.perf_counter() - t0 print() print("=== embed done ===") print(f" done: {n_done}") print(f" new face records: {n_face_records_added}") print(f" new noface records: {n_noface_added}") print(f" skipped (already done): {n_skipped}") print(f" load errors: {n_load_err}") print(f" elapsed: {elapsed:.1f}s ({n_done/max(0.1,elapsed):.1f} img/s)") print(f" cache: {args.out}") if __name__ == "__main__": main()