"""Corpus-wide dedup + roop-unleashed optimization. Two passes: 1. Cross-family byte-identical PNG dedup (same SHA256 in two different identity families) — keep the higher-tier family copy. Era splits of the same parent identity (faceset_NNN_*) are intentional duplications and are NOT deduped within their family. 2. Within-faceset near-duplicate dedup using cached arcface embeddings (cosine sim >= 0.95). Keep highest quality.composite, drop the rest. Plus a Windows-DML multi-face audit (separate phase via clip_worker-style split): 3. Re-detect each PNG with insightface; flag any with 0 or >1 detected faces. The roop loader appends every detected face per PNG, so multi-face crops pollute identity averaging. All flagged PNGs are MOVED to /faces/_dropped/ (reversible). Affected .fsz files are re-zipped, manifests updated. CLI: analyze --out work/dedup_audit/dedup_plan.json apply --plan ... [--dry-run] stage_multiface --out work/dedup_audit/multiface_queue.json merge_multiface --results --out work/dedup_audit/multiface_plan.json apply_multiface --plan ... [--dry-run] report --dedup ... --multiface ... --out work/dedup_audit """ from __future__ import annotations import argparse import hashlib import json import re import shutil import sys import time from concurrent.futures import ThreadPoolExecutor from pathlib import Path import numpy as np ROOT = Path("/mnt/e/temp_things/fcswp/nl_sorted/facesets_swap_ready") WIN_ROOT = r"E:\temp_things\fcswp\nl_sorted\facesets_swap_ready" CACHES = [ Path("/opt/face-sets/work/cache/nl_full.npz"), Path("/opt/face-sets/work/cache/immich_peter.npz"), Path("/opt/face-sets/work/cache/immich_nic.npz"), ] NEAR_DUP_THRESHOLD = 0.95 HASH_PARALLEL = 16 # ----------------------------- helpers ----------------------------- def faceset_tier(name: str) -> int: m = re.match(r"^faceset_0*(\d+)(?:_.+)?$", name) if not m: return 99 n = int(m.group(1)) if 13 <= n <= 19: return 0 if 1 <= n <= 12: return 1 if 20 <= n <= 25: return 2 if 26 <= n <= 264: return 3 if 265 <= n: return 4 return 99 def faceset_family(name: str) -> str: """faceset_001_2010-13 → faceset_001; faceset_001 → faceset_001.""" m = re.match(r"^(faceset_\d+)(?:_.+)?$", name) return m.group(1) if m else name def wsl_to_win(p: str) -> str: s = str(p) if s.startswith("/mnt/"): return f"{s[5].upper()}:\\{s[7:].replace('/', chr(92))}" return s def iter_active_facesets() -> list[Path]: out = [] for d in sorted(ROOT.iterdir()): if d.is_dir() and not d.name.startswith("_"): out.append(d) return out def sha256_file(p: Path) -> str: h = hashlib.sha256() with open(p, "rb") as f: while True: b = f.read(1 << 20) if not b: break h.update(b) return h.hexdigest() def load_caches(): rec_index: dict[tuple[str, tuple[int, int, int, int]], np.ndarray] = {} alias_map: dict[str, str] = {} for c in CACHES: if not c.exists(): continue d = np.load(c, allow_pickle=True) emb = d["embeddings"] meta = json.loads(str(d["meta"])) face_records = [m for m in meta if not m.get("noface")] if "path_aliases" in d.files: paliases = json.loads(str(d["path_aliases"])) for canon, alist in paliases.items(): alias_map.setdefault(canon, canon) for a in alist: alias_map[a] = canon for i, rec in enumerate(face_records): p = rec["path"] bbox = tuple(int(x) for x in rec["bbox"]) v = emb[i].astype(np.float32) n = float(np.linalg.norm(v)) if n > 0: v = v / n rec_index[(p, bbox)] = v alias_map.setdefault(p, p) return rec_index, alias_map def lookup_emb(rec_index, alias_map, src: str, bbox): bbox_t = tuple(int(x) for x in bbox) canon = alias_map.get(src, src) v = rec_index.get((canon, bbox_t)) if v is None and canon != src: v = rec_index.get((src, bbox_t)) return v # ----------------------------- analyze ----------------------------- def cmd_analyze(args): rec_index, alias_map = load_caches() facesets = iter_active_facesets() print(f"[scan] {len(facesets)} active facesets", file=sys.stderr) # Phase 1: walk every PNG, collect (faceset, file, src, bbox, quality, emb, sha256) all_pngs = [] # list of dicts t0 = time.time() for fs in facesets: manifest_path = fs / "manifest.json" if not manifest_path.exists(): continue m = json.loads(manifest_path.read_text()) for f in m.get("faces", []): png_rel = f.get("png") if not png_rel: continue disk_path = fs / png_rel if not disk_path.exists(): continue all_pngs.append({ "faceset": fs.name, "family": faceset_family(fs.name), "tier": faceset_tier(fs.name), "file": Path(png_rel).name, "rank": f.get("rank"), "source": f.get("source"), "bbox": f.get("bbox"), "quality": f.get("quality", {}).get("composite", 0), "disk_path": str(disk_path), }) print(f"[scan] {len(all_pngs)} PNGs walked in {time.time()-t0:.1f}s", file=sys.stderr) # Phase 2: SHA256 hash each PNG (parallel I/O) t0 = time.time() def _hash_one(idx): all_pngs[idx]["sha256"] = sha256_file(Path(all_pngs[idx]["disk_path"])) with ThreadPoolExecutor(max_workers=HASH_PARALLEL) as ex: # exhaust the iterator to actually run for _ in ex.map(_hash_one, range(len(all_pngs)), chunksize=16): pass print(f"[hash] {len(all_pngs)} PNGs hashed in {time.time()-t0:.1f}s", file=sys.stderr) # Phase 3: cross-family byte-dedup by_sha: dict[str, list[int]] = {} for i, p in enumerate(all_pngs): by_sha.setdefault(p["sha256"], []).append(i) cross_family_groups = [] byte_drops: set[int] = set() # indices of PNGs to drop for sha, idxs in by_sha.items(): if len(idxs) < 2: continue families = {all_pngs[i]["family"] for i in idxs} if len(families) < 2: continue # all in same family — intentional era duplication # multiple families share this content → dedup keeping the best one cross_family_groups.append({"sha256": sha, "members": [ {"faceset": all_pngs[i]["faceset"], "file": all_pngs[i]["file"], "tier": all_pngs[i]["tier"], "quality": all_pngs[i]["quality"], "rank": all_pngs[i]["rank"]} for i in idxs ]}) # keeper rule: lowest tier number, then highest quality best = sorted(idxs, key=lambda i: (all_pngs[i]["tier"], -all_pngs[i]["quality"]))[0] for i in idxs: # NEVER drop within-family copies (preserve era duplication intentionally) # We only drop indices whose family != best's family if i != best and all_pngs[i]["family"] != all_pngs[best]["family"]: byte_drops.add(i) print(f"[byte] {len(cross_family_groups)} cross-family hash groups; " f"{len(byte_drops)} PNGs marked for byte-dedup drop", file=sys.stderr) # Phase 4: within-faceset near-dup (embedding sim >= threshold) by_faceset: dict[str, list[int]] = {} for i, p in enumerate(all_pngs): by_faceset.setdefault(p["faceset"], []).append(i) near_dup_groups = [] near_drops: set[int] = set() miss_emb_total = 0 t0 = time.time() for fs_name, idxs in by_faceset.items(): if len(idxs) < 2: continue # gather embeddings embs = [] kept_idxs = [] for i in idxs: v = lookup_emb(rec_index, alias_map, all_pngs[i]["source"], all_pngs[i]["bbox"]) if v is None: miss_emb_total += 1 continue embs.append(v) kept_idxs.append(i) if len(kept_idxs) < 2: continue M = np.stack(embs).astype(np.float32) sim = M @ M.T np.fill_diagonal(sim, -1) # ignore self # find connected components in the (sim >= threshold) graph adj = {k: set() for k in range(len(kept_idxs))} for a in range(len(kept_idxs)): # only check a < b to avoid double work hi = np.where(sim[a, a+1:] >= NEAR_DUP_THRESHOLD)[0] for off in hi: b = a + 1 + int(off) adj[a].add(b) adj[b].add(a) seen = set() for k in adj: if k in seen or not adj[k]: continue stack = [k] comp = [] while stack: x = stack.pop() if x in seen: continue seen.add(x) comp.append(x) for y in adj[x]: if y not in seen: stack.append(y) if len(comp) < 2: continue comp_idxs = [kept_idxs[c] for c in comp] # keeper: highest quality.composite, tie-break: lowest rank best = sorted(comp_idxs, key=lambda i: (-all_pngs[i]["quality"], all_pngs[i]["rank"] or 9999))[0] sims_in_group = [] for ci in range(len(comp)): for cj in range(ci+1, len(comp)): sims_in_group.append(float(sim[comp[ci], comp[cj]])) near_dup_groups.append({ "faceset": fs_name, "members": [{"file": all_pngs[i]["file"], "rank": all_pngs[i]["rank"], "quality": all_pngs[i]["quality"]} for i in comp_idxs], "keeper": all_pngs[best]["file"], "min_sim": min(sims_in_group) if sims_in_group else None, "max_sim": max(sims_in_group) if sims_in_group else None, }) for i in comp_idxs: if i != best: near_drops.add(i) print(f"[near] {len(near_dup_groups)} near-dup groups; " f"{len(near_drops)} PNGs marked for near-dup drop " f"(miss_emb={miss_emb_total}); {time.time()-t0:.1f}s", file=sys.stderr) # Combined drop set; for output, group by faceset all_drops = byte_drops | near_drops drops_by_faceset: dict[str, list] = {} for i in all_drops: p = all_pngs[i] reason = [] if i in byte_drops: reason.append("byte_dup") if i in near_drops: reason.append("near_dup") drops_by_faceset.setdefault(p["faceset"], []).append({ "file": p["file"], "rank": p["rank"], "reason": "+".join(reason), "sha256": p["sha256"], "quality": p["quality"], }) plan = { "thresholds": {"near_dup_sim": NEAR_DUP_THRESHOLD}, "totals": { "active_facesets": len(facesets), "active_pngs": len(all_pngs), "byte_dup_groups": len(cross_family_groups), "byte_dup_drops": len(byte_drops), "near_dup_groups": len(near_dup_groups), "near_dup_drops": len(near_drops), "all_drops": len(all_drops), "facesets_affected": len(drops_by_faceset), }, "byte_dup_groups": cross_family_groups, "near_dup_groups": near_dup_groups, "drops_by_faceset": drops_by_faceset, } op = Path(args.out) op.parent.mkdir(parents=True, exist_ok=True) op.write_text(json.dumps(plan, indent=2)) print(f"[done] plan -> {op}", file=sys.stderr) # ----------------------------- apply ----------------------------- def _zip_png_list(pngs: list[Path], zip_path: Path) -> None: import zipfile with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=4) as zf: for i, p in enumerate(pngs): zf.write(p, arcname=f"{i:04d}.png") def _apply_drops_to_facesets(drops_by_faceset: dict[str, list], reason_label: str, master_path: Path): """Move flagged PNGs to /faces/_dropped/, rebuild manifests + .fsz. drops_by_faceset values are lists of {"file": str, ...}. Returns total moved + counts per faceset.""" master = json.loads(master_path.read_text()) by_name = {f["name"]: f for f in master.get("facesets", [])} total_moved = 0 per_faceset_counts = {} for fs_name, drops in drops_by_faceset.items(): fs_dir = ROOT / fs_name if not fs_dir.is_dir(): print(f"[warn] {fs_name}: dir missing, skip", file=sys.stderr) continue faces_dir = fs_dir / "faces" dropped_dir = faces_dir / "_dropped" dropped_dir.mkdir(exist_ok=True) drop_files = {d["file"] for d in drops} moved_here = 0 for fname in sorted(drop_files): src = faces_dir / fname if not src.exists(): continue shutil.move(str(src), str(dropped_dir / fname)) moved_here += 1 # rebuild manifest by filtering out dropped files manifest_path = fs_dir / "manifest.json" if manifest_path.exists(): mm = json.loads(manifest_path.read_text()) new_faces = [f for f in mm.get("faces", []) if Path(f.get("png", "")).name not in drop_files] mm["faces"] = new_faces mm["exported"] = len(new_faces) mm.setdefault(f"{reason_label}_history", []).append({"dropped": moved_here}) # re-zip survivor_pngs = sorted(faces_dir.glob("*.png")) top_n = mm.get("top_n", 30) top_n_eff = min(top_n, len(survivor_pngs)) for old in fs_dir.glob("*.fsz"): old.unlink() top_fsz_name = f"{fs_name}_top{top_n_eff}.fsz" all_fsz_name = f"{fs_name}_all.fsz" if top_n_eff > 0: _zip_png_list(survivor_pngs[:top_n_eff], fs_dir / top_fsz_name) mm["fsz_top"] = top_fsz_name mm["top_n"] = top_n_eff else: mm["fsz_top"] = None mm["top_n"] = 0 if len(survivor_pngs) > top_n_eff: _zip_png_list(survivor_pngs, fs_dir / all_fsz_name) mm["fsz_all"] = all_fsz_name else: mm["fsz_all"] = None manifest_path.write_text(json.dumps(mm, indent=2)) if fs_name in by_name: by_name[fs_name]["exported"] = len(new_faces) by_name[fs_name]["fsz_top"] = mm["fsz_top"] by_name[fs_name]["fsz_all"] = mm["fsz_all"] by_name[fs_name]["top_n"] = mm["top_n"] by_name[fs_name].setdefault(f"{reason_label}_dropped", 0) by_name[fs_name][f"{reason_label}_dropped"] += moved_here total_moved += moved_here per_faceset_counts[fs_name] = moved_here # rewrite master with same ordering new_facesets = [by_name.get(e["name"], e) for e in master.get("facesets", [])] master["facesets"] = new_facesets master.setdefault(f"{reason_label}_runs", []).append({ "facesets_affected": len(per_faceset_counts), "pngs_moved": total_moved, }) tmp = master_path.with_suffix(".tmp.json") tmp.write_text(json.dumps(master, indent=2)) tmp.replace(master_path) return total_moved, per_faceset_counts def cmd_apply(args): plan = json.loads(Path(args.plan).read_text()) drops = plan["drops_by_faceset"] if args.dry_run: for fs, items in sorted(drops.items()): reasons = {} for it in items: reasons[it["reason"]] = reasons.get(it["reason"], 0) + 1 print(f" {fs}: {len(items)} dropped ({reasons})") print(f"=== total: {sum(len(v) for v in drops.values())} PNGs across {len(drops)} facesets ===") return master_path = ROOT / "manifest.json" total, _ = _apply_drops_to_facesets(drops, "dedup", master_path) print(f"[done] {total} PNGs moved to faces/_dropped/ across {len(drops)} facesets", file=sys.stderr) # ----------------------------- multiface staging + apply ----------------------------- def cmd_stage_multiface(args): """Build queue.json of all currently-active PNGs in the corpus for the Windows DML multi-face audit worker.""" queue = [] for fs in iter_active_facesets(): faces_dir = fs / "faces" if not faces_dir.is_dir(): continue for p in sorted(faces_dir.glob("*.png")): queue.append({ "wsl_path": str(p), "win_path": wsl_to_win(str(p)), "faceset": fs.name, "file": p.name, }) op = Path(args.out) op.parent.mkdir(parents=True, exist_ok=True) op.write_text(json.dumps(queue, indent=2)) print(f"[stage] {len(queue)} PNGs -> {op}", file=sys.stderr) def cmd_merge_multiface(args): """Convert worker results.json into a drops_by_faceset plan.""" src = json.loads(Path(args.results).read_text()) drops_by_faceset: dict[str, list] = {} bad_count = 0 for r in src.get("results", []): n_faces = r.get("face_count", -1) if n_faces == 1: continue bad_count += 1 drops_by_faceset.setdefault(r["faceset"], []).append({ "file": r["file"], "reason": f"multiface_{n_faces}", "face_count": n_faces, }) plan = { "totals": {"bad_pngs": bad_count, "facesets_affected": len(drops_by_faceset), "scored": len(src.get("results", []))}, "drops_by_faceset": drops_by_faceset, } op = Path(args.out) op.parent.mkdir(parents=True, exist_ok=True) op.write_text(json.dumps(plan, indent=2)) print(f"[merge] {bad_count} bad PNGs across {len(drops_by_faceset)} facesets -> {op}", file=sys.stderr) def cmd_apply_multiface(args): plan = json.loads(Path(args.plan).read_text()) drops = plan["drops_by_faceset"] if args.dry_run: for fs, items in sorted(drops.items()): print(f" {fs}: {len(items)} bad PNG(s)") print(f"=== total: {sum(len(v) for v in drops.values())} ===") return master_path = ROOT / "manifest.json" total, _ = _apply_drops_to_facesets(drops, "multiface", master_path) print(f"[done] {total} PNGs moved to faces/_dropped/ across {len(drops)} facesets", file=sys.stderr) # ----------------------------- report ----------------------------- def cmd_report(args): out_dir = Path(args.out) out_dir.mkdir(parents=True, exist_ok=True) sections = [] if args.dedup: d = json.loads(Path(args.dedup).read_text()) t = d["totals"] sections.append(f"

Dedup

") sections.append( f"
    " f"
  • Active facesets: {t['active_facesets']}, active PNGs: {t['active_pngs']}
  • " f"
  • Cross-family byte-dup groups: {t['byte_dup_groups']} → {t['byte_dup_drops']} PNGs dropped
  • " f"
  • Within-faceset near-dup groups (sim≥{d['thresholds']['near_dup_sim']}): {t['near_dup_groups']} → {t['near_dup_drops']} PNGs dropped
  • " f"
  • Total dedup drops: {t['all_drops']} across {t['facesets_affected']} facesets
  • " f"
" ) # top-N affected facesets rows = sorted(d["drops_by_faceset"].items(), key=lambda x: -len(x[1]))[:25] sections.append("

Top 25 most-affected facesets

") for fs, items in rows: r = {} for it in items: r[it["reason"]] = r.get(it["reason"], 0) + 1 sections.append(f"") sections.append("
facesetdroppedreasons
{fs}{len(items)}{r}
") if args.multiface: m = json.loads(Path(args.multiface).read_text()) t = m["totals"] sections.append("

Multi-face audit

") sections.append( f"
    " f"
  • PNGs scored: {t['scored']}
  • " f"
  • Bad PNGs (0 or >1 face): {t['bad_pngs']} across {t['facesets_affected']} facesets
  • " f"
" ) html = f""" Dedup + multi-face audit

facesets_swap_ready dedup + roop optimization audit

{''.join(sections)} """ out_html = out_dir / "index.html" out_html.write_text(html) print(f"[done] {out_html}", file=sys.stderr) # ----------------------------- main ----------------------------- def main(): ap = argparse.ArgumentParser() sub = ap.add_subparsers(dest="cmd", required=True) a = sub.add_parser("analyze") a.add_argument("--out", required=True) a.set_defaults(func=cmd_analyze) p = sub.add_parser("apply") p.add_argument("--plan", required=True) p.add_argument("--dry-run", action="store_true") p.set_defaults(func=cmd_apply) sm = sub.add_parser("stage_multiface") sm.add_argument("--out", required=True) sm.set_defaults(func=cmd_stage_multiface) mm = sub.add_parser("merge_multiface") mm.add_argument("--results", required=True) mm.add_argument("--out", required=True) mm.set_defaults(func=cmd_merge_multiface) am = sub.add_parser("apply_multiface") am.add_argument("--plan", required=True) am.add_argument("--dry-run", action="store_true") am.set_defaults(func=cmd_apply_multiface) r = sub.add_parser("report") r.add_argument("--dedup", default=None) r.add_argument("--multiface", default=None) r.add_argument("--out", required=True) r.set_defaults(func=cmd_report) args = ap.parse_args() args.func(args) if __name__ == "__main__": main()