#!/usr/bin/env python3 """Discover new identities in /mnt/x/src/osrc and emit them as facesets. Workflow (mirrors the shape of build_folders.py, but identities are discovered by clustering rather than asserted by folder): 1. Load cache; restrict to face records whose canonical or alias path lies under /mnt/x/src/osrc/. 2. Build centroids of the existing 19 canonical identities in facesets_swap_ready/faceset_001..019. Drop any osrc face whose nearest-existing-identity cos-dist <= EXISTING_MATCH_THRESHOLD; those are already covered by `extend` and shouldn't seed new facesets. 3. Cluster the remaining osrc faces among themselves at INITIAL_THRESHOLD (matches `extend`'s new_cluster_threshold default). 4. Per cluster, apply refine-equivalent gates: face_short >= MIN_SHORT, blur >= MIN_BLUR, det_score >= MIN_DET_SCORE; for clusters >= 4, drop faces with cos-dist > OUTLIER_THRESHOLD from the cluster centroid. 5. Keep clusters whose surviving unique source-path count is >= MIN_FACES. 6. Number kept clusters faceset_020, 021, ... (past the highest existing in facesets_swap_ready, which is 019). Order by descending size. 7. Synthesize a refine_manifest.json and call cmd_export_swap on it, emitting into a temp dir. Move new dirs into facesets_swap_ready/. 8. Append new entries to the top-level facesets_swap_ready/manifest.json (preserving existing facesets / thin_eras). """ from __future__ import annotations import json import shutil import sys from pathlib import Path import numpy as np REPO = Path(__file__).resolve().parent.parent sys.path.insert(0, str(REPO)) from sort_faces import ( # noqa: E402 _cluster_embeddings, cmd_export_swap, load_cache, ) # ---- config -------------------------------------------------------------- # CACHE = REPO / "work" / "cache" / "nl_full.npz" SWAP_READY = Path("/mnt/e/temp_things/fcswp/nl_sorted/facesets_swap_ready") OUT_TMP = Path("/mnt/e/temp_things/fcswp/nl_sorted/facesets_swap_ready_osrc_new") SYNTH_MANIFEST = REPO / "work" / "synthetic_osrc_manifest.json" OSRC_DIR = Path("/mnt/x/src/osrc") START_NNN = 20 # facesets_swap_ready max is 019; pick up here. # Existing-identity exclusion: drop osrc faces whose nearest existing # identity centroid is within this cosine distance. 0.45 matches the # build_folders.py OSRC_THRESHOLD: at this cutoff the face is already # routed to an existing identity by extend / build_folders.py. EXISTING_MATCH_THRESHOLD = 0.45 # Cluster the unmatched. INITIAL_THRESHOLD = 0.55 # Refine-equivalent gates (per the user's request: drop min_faces to 6). MIN_FACES = 6 MIN_SHORT = 90 MIN_BLUR = 40.0 MIN_DET_SCORE = 0.6 OUTLIER_THRESHOLD = 0.55 # only applied if cluster >= 4 # export-swap params (defaults from sort_faces.py). TOP_N = 30 EXPORT_OUTLIER_THRESHOLD = 0.45 PAD_RATIO = 0.5 OUT_SIZE = 512 EXPORT_MIN_FACE_SHORT = 100 # ---- helpers ------------------------------------------------------------- # def _normalize(v: np.ndarray) -> np.ndarray: n = np.linalg.norm(v) return v / n if n > 0 else v def _under(folder: Path, p: str) -> bool: fs = str(folder).rstrip("/") + "/" return p == str(folder) or p.startswith(fs) def _record_in_folder(rec: dict, folder: Path, path_aliases: dict[str, list[str]]) -> bool: if _under(folder, rec["path"]): return True for alias in path_aliases.get(rec["path"], []): if _under(folder, alias): return True return False def _existing_identity_centroids( emb: np.ndarray, face_records: list[dict] ) -> tuple[np.ndarray, list[str]]: """Build a (n_identities, 512) matrix of L2-normalized centroids and a parallel name list, drawn from the canonical faceset_001..019 manifests in facesets_swap_ready/.""" bbox_idx: dict[tuple[str, tuple], int] = { (m["path"], tuple(m.get("bbox") or ())): i for i, m in enumerate(face_records) } centroids: list[np.ndarray] = [] names: list[str] = [] for n in range(1, 20): d = SWAP_READY / f"faceset_{n:03d}" man_path = d / "manifest.json" if not man_path.exists(): continue man = json.loads(man_path.read_text()) keys = [(f["source"], tuple(f.get("bbox") or ())) for f in man.get("faces", [])] idxs = [bbox_idx[k] for k in keys if k in bbox_idx] if not idxs: continue centroids.append(_normalize(emb[idxs].mean(axis=0))) names.append(d.name) return np.stack(centroids), names # ---- phase 1: identify new osrc clusters --------------------------------- # def discover_new_clusters() -> tuple[dict, list[dict]]: emb, meta, _src_root, _proc, path_aliases = load_cache(CACHE) face_records = [m for m in meta if not m.get("noface")] if len(face_records) != len(emb): raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}") print(f"Cache: {len(face_records)} face records.") # Step 1: filter to osrc. osrc_idx = [ i for i, m in enumerate(face_records) if _record_in_folder(m, OSRC_DIR, path_aliases) ] print(f"osrc face records: {len(osrc_idx)}") # Step 2: drop those already matching an existing identity. cents, cent_names = _existing_identity_centroids(emb, face_records) osrc_emb = emb[osrc_idx] sims = osrc_emb @ cents.T nearest_d = 1.0 - sims.max(axis=1) nearest_id = sims.argmax(axis=1) covered_mask = nearest_d <= EXISTING_MATCH_THRESHOLD n_covered = int(covered_mask.sum()) print( f"Already covered by existing 19 identities at cos-dist <= " f"{EXISTING_MATCH_THRESHOLD}: {n_covered}/{len(osrc_idx)}" ) # Per-identity coverage breakdown (for logging only). for j, name in enumerate(cent_names): c = int(((nearest_id == j) & covered_mask).sum()) if c: print(f" -> {name}: {c}") new_idx = [osrc_idx[k] for k in range(len(osrc_idx)) if not covered_mask[k]] print(f"\nUnmatched osrc faces to cluster: {len(new_idx)}") # Step 3: cluster the unmatched among themselves. new_emb = emb[new_idx] if len(new_idx) <= 1: labels = np.zeros(len(new_idx), dtype=int) else: labels = _cluster_embeddings(new_emb, INITIAL_THRESHOLD) n_clusters = len(set(int(l) for l in labels)) print( f"Initial clusters at threshold {INITIAL_THRESHOLD}: {n_clusters} " f"(top sizes: {sorted([int((labels==l).sum()) for l in set(labels)], reverse=True)[:10]})" ) # Step 4 + 5: per-cluster refine gates + min_faces. clusters: dict[int, list[int]] = {} for k, lab in enumerate(labels): clusters.setdefault(int(lab), []).append(new_idx[k]) kept_clusters: list[dict] = [] drop_quality_total = 0 drop_outlier_total = 0 for cid, idxs in clusters.items(): # Per-face quality gate. good: list[int] = [] for i in idxs: r = face_records[i] if r.get("face_short", 0) < MIN_SHORT: drop_quality_total += 1 continue if r.get("blur", 0.0) < MIN_BLUR: drop_quality_total += 1 continue if r.get("det_score", 0.0) < MIN_DET_SCORE: drop_quality_total += 1 continue good.append(i) if not good: continue # Outlier rejection (only if cluster >= 4). if len(good) >= 4: cent = _normalize(emb[good].mean(axis=0)) d = 1.0 - emb[good] @ cent tight = [good[k] for k, dist in enumerate(d) if dist <= OUTLIER_THRESHOLD] drop_outlier_total += len(good) - len(tight) good = tight if not good: continue unique_paths = sorted({face_records[i]["path"] for i in good}) if len(unique_paths) < MIN_FACES: continue kept_clusters.append({ "indices": good, "unique_paths": unique_paths, "size_face": len(good), "size_paths": len(unique_paths), }) kept_clusters.sort(key=lambda c: -c["size_paths"]) print( f"\nAfter quality gate ({drop_quality_total} dropped) + outlier " f"rejection ({drop_outlier_total} dropped) + min_faces={MIN_FACES}: " f"{len(kept_clusters)} clusters kept" ) for rank, c in enumerate(kept_clusters, start=START_NNN): print( f" faceset_{rank:03d}: faces={c['size_face']:3d} " f"unique_paths={c['size_paths']:3d}" ) # Build synthetic refine_manifest.json compatible with cmd_export_swap. facesets = [ { "name": f"faceset_{rank:03d}", "image_count": c["size_paths"], "face_count": c["size_face"], "images": c["unique_paths"], } for rank, c in enumerate(kept_clusters, start=START_NNN) ] manifest = { "params": { "existing_match_threshold": EXISTING_MATCH_THRESHOLD, "initial_threshold": INITIAL_THRESHOLD, "outlier_threshold": OUTLIER_THRESHOLD, "min_faces": MIN_FACES, "min_short": MIN_SHORT, "min_blur": MIN_BLUR, "min_det_score": MIN_DET_SCORE, "source_root": str(OSRC_DIR), }, "facesets": facesets, } SYNTH_MANIFEST.write_text(json.dumps(manifest, indent=2)) print(f"\nSynthetic manifest -> {SYNTH_MANIFEST}") return manifest, kept_clusters # ---- phase 2: export + relocate + merge top-level manifest -------------- # def export_and_relocate(manifest: dict) -> None: if OUT_TMP.exists(): shutil.rmtree(OUT_TMP) OUT_TMP.mkdir(parents=True) print(f"\nRunning cmd_export_swap -> {OUT_TMP}") cmd_export_swap( cache_path=CACHE, refine_manifest_path=SYNTH_MANIFEST, raw_manifest_path=None, out_dir=OUT_TMP, top_n=TOP_N, outlier_threshold=EXPORT_OUTLIER_THRESHOLD, pad_ratio=PAD_RATIO, out_size=OUT_SIZE, include_candidates=False, candidate_match_threshold=0.55, candidate_min_score=0.40, min_face_short=EXPORT_MIN_FACE_SHORT, ) new_top = json.loads((OUT_TMP / "manifest.json").read_text()) new_entries = new_top.get("facesets", []) moved = 0 for fs_meta in new_entries: name = fs_meta["name"] src_dir = OUT_TMP / name if not src_dir.exists(): print(f"[{name}] export dir missing; skipping") continue dst_dir = SWAP_READY / name if dst_dir.exists(): print(f"[{name}] {dst_dir} already exists; refusing to overwrite") continue # Add a marker file so the source provenance is obvious. (src_dir / "osrc.txt").write_text( f"{name}\n\nSource: osrc cluster (auto-discovered, {OSRC_DIR}).\n" ) shutil.move(str(src_dir), str(dst_dir)) moved += 1 print(f"[{name}] -> {dst_dir}") # Merge top-level manifest, preserving facesets / thin_eras / etc. final_manifest_path = SWAP_READY / "manifest.json" if final_manifest_path.exists(): existing = json.loads(final_manifest_path.read_text()) else: existing = {"facesets": []} existing.setdefault("facesets", []) existing_names = {fs["name"] for fs in existing["facesets"]} appended = 0 for entry in new_entries: if entry["name"] in existing_names: print(f"[manifest] {entry['name']} already present; not duplicating") continue existing["facesets"].append(entry) appended += 1 final_manifest_path.write_text(json.dumps(existing, indent=2)) print(f"\nMerged manifest: appended {appended} entries -> {final_manifest_path}") print(f"Moved {moved} faceset directories into {SWAP_READY}") # Clean up temp dir if empty. if OUT_TMP.exists(): leftover = list(OUT_TMP.iterdir()) if not leftover: OUT_TMP.rmdir() # ---- main ---------------------------------------------------------------- # def main() -> None: dry = "--dry-run" in sys.argv manifest, kept = discover_new_clusters() if dry: print("\n--dry-run: stopping after cluster discovery (no exports written).") return if not manifest.get("facesets"): print("No new facesets to build; nothing to do.") return export_and_relocate(manifest) print("\nDone.") if __name__ == "__main__": main()