From 484278e70ef055f66277e7fda8ac2f75c64d1b12 Mon Sep 17 00:00:00 2001 From: Peter Date: Thu, 23 Apr 2026 19:21:50 +0200 Subject: [PATCH] Rewrite pipeline: resumable embed, byte-dedup, extend, dedup report - embed: sha256-based dedup at listing (embed each unique hash once, carry other paths as aliases via a top-level path_aliases dict); resumable from any existing cache; atomic incremental flush every 50 files; explicit skip-ext filtering; schema bumped with processed_paths + path_aliases. - extend: new subcommand that merges new embeddings into an existing raw + facesets output without renumbering. Nearest person-centroid match above threshold, unmatched faces re-clustered into new person_NNN / _singletons. Optional --refine-out also extends facesets by centroid + quality gate. - dedup: new subcommand producing byte-identical + visual near-duplicate groups as a JSON report. - cluster/refine: fan every placement across canonical + aliases so each on-disk location gets represented. - safe_dst_name now always flattens the absolute path so filenames stay stable across runs when src_root shifts (fixes duplicate-copy bug that surfaced during the lzbkp_red extend). Co-Authored-By: Claude Opus 4.7 (1M context) --- README.md | 18 +- sort_faces.py | 824 ++++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 713 insertions(+), 129 deletions(-) diff --git a/README.md b/README.md index 9a0a652..400e2f0 100644 --- a/README.md +++ b/README.md @@ -4,27 +4,35 @@ Sort photos by similar face using InsightFace embeddings + agglomerative cluster ## Pipeline -`sort_faces.py` is a single-file CLI with three subcommands: +`sort_faces.py` is a single-file CLI with four subcommands: | step | what it does | |---------|------------------------------------------------------------------------------| | embed | Recursively scan a source tree, detect + embed every face, write `.npz` cache | | cluster | Raw agglomerative clustering of the cache into `person_NNN/` / `_singletons/` / `_noface/` | | refine | Initial cluster → centroid merge → quality gate → outlier rejection → size filter → `faceset_NNN/` | +| dedup | Post-hoc near-duplicate report: byte-identical groups + visual near-dupes (same face + same size within a tight cosine threshold) | + +`embed` is resumable and incremental: it loads any existing cache at the target path and only hashes/embeds files it hasn't processed before. A periodic flush (default every 50 new files) writes the cache atomically, so a mid-run crash loses at most a few dozen embeddings. + +Byte-identical duplicates are detected via sha256 during the listing phase. The canonical file is embedded once; other paths with the same hash are carried as `aliases` on the cache's top-level `path_aliases` dict. Every alias is materialized by `cluster`/`refine`, so each on-disk location ends up represented in the output. Cache and outputs are kept out of the repo via `.gitignore`; defaults live under `work/`. ## Typical run ```bash -# 1. Embed (CPU; InsightFace buffalo_l). Caches faces + metadata. -python sort_faces.py embed "/mnt/x/src/nl/Neuer Ordner (2)/New Folder" work/cache/nl_all.npz +# 1. Embed (CPU; InsightFace buffalo_l). Caches faces + metadata. Resumable. +python sort_faces.py embed /mnt/x/src/nl work/cache/nl_full.npz # 2. Raw clusters (every multi-face cluster -> a person_NNN/ folder). -python sort_faces.py cluster work/cache/nl_all.npz /mnt/e/temp_things/fcswp/nl_sorted/raw +python sort_faces.py cluster work/cache/nl_full.npz /mnt/e/temp_things/fcswp/nl_sorted/raw_full # 3. Refined facesets (filters for faceset-ready quality). -python sort_faces.py refine work/cache/nl_all.npz /mnt/e/temp_things/fcswp/nl_sorted/facesets +python sort_faces.py refine work/cache/nl_full.npz /mnt/e/temp_things/fcswp/nl_sorted/facesets_full + +# 4. (Optional) report on byte-identical + visual near-duplicates. +python sort_faces.py dedup work/cache/nl_full.npz ``` ## Refine defaults diff --git a/sort_faces.py b/sort_faces.py index ca6a862..a9177fb 100644 --- a/sort_faces.py +++ b/sort_faces.py @@ -4,11 +4,30 @@ Subcommands: embed recursively scan, detect+embed faces cluster [opts] raw agglomerative clustering -> person_NNN/ refine [opts] merge + outlier + quality pass -> faceset-ready folders + dedup post-hoc visual near-duplicate analysis + +Dedup model (for embed): + At listing time every eligible file is sha256-hashed and grouped. Each hash-group + is embedded exactly once; other paths with the same hash are carried as `aliases` + on the canonical meta record. cluster/refine materialize every alias so each + on-disk location ends up represented in the output. + +Cache format (v2): + embeddings (N, 512) float32 + meta JSON list of dicts, one per face record; fields: + path, aliases[], hash, face_idx, det_score, bbox, + face_short, face_area, blur, noface + src_root absolute source root of the latest embed/resume run + processed_paths JSON list of every path whose fate is decided + (embedded, noface, aliased, or load-error) + schema "v2" """ from __future__ import annotations import argparse +import hashlib import json +import os import shutil import sys import time @@ -18,17 +37,43 @@ import numpy as np from PIL import Image, ImageOps from tqdm import tqdm -IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff", ".webp", ".heic"} +IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff", ".webp", ".heic", ".gif"} +SKIP_EXTS = { + ".psd", ".avi", ".mov", ".mp4", ".mkv", ".m4v", ".wmv", ".webm", + ".mpg", ".mpeg", ".flv", ".3gp", ".m2ts", ".mts", + ".zip", ".rar", ".7z", ".tar", ".gz", + ".ini", ".db", ".txt", ".log", ".xmp", ".thm", +} MIN_DET_SCORE = 0.5 MIN_FACE_PIX = 40 +HASH_CHUNK = 1 << 20 +FLUSH_DEFAULT = 50 -def list_images(src: Path) -> list[Path]: - out: list[Path] = [] +def sha256_of(path: Path, chunk: int = HASH_CHUNK) -> str: + h = hashlib.sha256() + with open(path, "rb") as f: + while True: + b = f.read(chunk) + if not b: + break + h.update(b) + return h.hexdigest() + + +def list_eligible(src: Path) -> tuple[list[Path], dict[str, int]]: + """Recursive scan; returns (kept_paths, skipped_counts_by_ext).""" + kept: list[Path] = [] + skipped: dict[str, int] = {} for p in src.rglob("*"): - if p.is_file() and p.suffix.lower() in IMG_EXTS: - out.append(p) - return sorted(out) + if not p.is_file(): + continue + ext = p.suffix.lower() + if ext in IMG_EXTS: + kept.append(p) + else: + skipped[ext or ""] = skipped.get(ext or "", 0) + 1 + return sorted(kept), skipped def load_rgb_bgr(path: Path): @@ -45,9 +90,6 @@ def load_rgb_bgr(path: Path): def laplacian_variance(gray: np.ndarray) -> float: - """Simple blur metric without OpenCV Laplacian call (uses numpy).""" - k = np.array([[0, 1, 0], [1, -4, 1], [0, 1, 0]], dtype=np.float32) - # same-size convolution via numpy slicing g = gray.astype(np.float32) lap = ( -4.0 * g[1:-1, 1:-1] @@ -57,88 +99,230 @@ def laplacian_variance(gray: np.ndarray) -> float: return float(lap.var()) -def make_rel(path: Path, root: Path) -> str: - try: - return str(path.relative_to(root)) - except ValueError: - return path.name - - -def safe_dst_name(path: Path, root: Path) -> str: - """Collision-safe filename built from source-relative path.""" - rel = make_rel(path, root) - # Flatten: replace separators with double underscore - flat = rel.replace("/", "__").replace("\\", "__").replace(" ", "_") +def safe_dst_name(path: Path, root: Path | None = None) -> str: + # Always flatten the absolute path. Root-relative names would change when + # the cache src_root moves between runs (e.g. during extend), producing + # duplicate copies in the same folder under different filenames. + flat = str(path).lstrip("/").replace("/", "__").replace("\\", "__").replace(" ", "_") return flat -def cmd_embed(src_dir: Path, cache_path: Path) -> None: - from insightface.app import FaceAnalysis +# ---------- cache I/O ---------- # - app = FaceAnalysis(name="buffalo_l", providers=["CPUExecutionProvider"]) - app.prepare(ctx_id=-1, det_size=(640, 640)) - - images = list_images(src_dir) - print(f"Found {len(images)} images under {src_dir}") - - embeddings: list[np.ndarray] = [] - meta: list[dict] = [] - - t0 = time.time() - for img_path in tqdm(images, desc="embedding"): - rgb, bgr = load_rgb_bgr(img_path) - if bgr is None: - meta.append({"path": str(img_path), "face_idx": -1, "noface": True, "error": "load"}) - continue - faces = app.get(bgr) - kept = 0 - for i, f in enumerate(faces): - if float(f.det_score) < MIN_DET_SCORE: - continue - x1, y1, x2, y2 = [int(round(v)) for v in f.bbox] - x1, y1 = max(x1, 0), max(y1, 0) - x2, y2 = min(x2, rgb.shape[1]), min(y2, rgb.shape[0]) - w, h = x2 - x1, y2 - y1 - short = min(w, h) - if short < MIN_FACE_PIX: - continue - # Blur metric on the face crop (grayscale) - crop = rgb[y1:y2, x1:x2] - if crop.size == 0: - continue - gray = crop.mean(axis=2) - blur = laplacian_variance(gray) if min(gray.shape) > 3 else 0.0 - - emb = f.normed_embedding.astype(np.float32) - embeddings.append(emb) - meta.append({ - "path": str(img_path), - "face_idx": i, - "det_score": float(f.det_score), - "bbox": [x1, y1, x2, y2], - "face_short": int(short), - "face_area": int(w * h), - "blur": blur, - "noface": False, - }) - kept += 1 - if kept == 0: - meta.append({"path": str(img_path), "face_idx": -1, "noface": True}) - - dt = time.time() - t0 - print(f"Detected {len(embeddings)} faces across {len(images)} images in {dt:.1f}s") - - emb_arr = np.stack(embeddings) if embeddings else np.zeros((0, 512), dtype=np.float32) - np.savez(cache_path, embeddings=emb_arr, meta=json.dumps(meta), src_root=str(src_dir)) - print(f"Cache written to {cache_path}") - - -def load_cache(cache_path: Path): +def load_cache(cache_path: Path) -> tuple[np.ndarray, list[dict], Path | None, set[str], dict[str, list[str]]]: data = np.load(cache_path, allow_pickle=True) emb = data["embeddings"] meta = json.loads(str(data["meta"])) src_root = Path(str(data["src_root"])) if "src_root" in data.files else None - return emb, meta, src_root + if "processed_paths" in data.files: + processed = set(json.loads(str(data["processed_paths"]))) + else: + processed = {m["path"] for m in meta} + path_aliases: dict[str, list[str]] = {} + if "path_aliases" in data.files: + path_aliases = json.loads(str(data["path_aliases"])) + else: + # v2a legacy: aliases lived inside meta records; migrate up. + for m in meta: + al = m.get("aliases") + if al: + path_aliases.setdefault(m["path"], []) + for a in al: + if a not in path_aliases[m["path"]]: + path_aliases[m["path"]].append(a) + for m in meta: + m.pop("aliases", None) + m.setdefault("hash", None) + return emb, meta, src_root, processed, path_aliases + + +def save_cache( + cache_path: Path, + emb: np.ndarray, + meta: list[dict], + src_root: Path, + processed: set[str], + path_aliases: dict[str, list[str]], +) -> None: + # np.savez auto-appends ".npz" unless the name already ends with it, so + # the tmp file must end in ".npz" to avoid a double-suffix and a broken rename. + tmp = cache_path.with_suffix(".tmp.npz") + np.savez( + str(tmp), + embeddings=emb if len(emb) else np.zeros((0, 512), dtype=np.float32), + meta=json.dumps(meta), + src_root=str(src_root), + processed_paths=json.dumps(sorted(processed)), + path_aliases=json.dumps(path_aliases), + schema="v2", + ) + os.replace(tmp, cache_path) + + +# ---------- embed ---------- # + +def cmd_embed(src_dir: Path, cache_path: Path, resume: bool, flush_every: int) -> None: + from insightface.app import FaceAnalysis + + t0 = time.time() + images, skipped = list_eligible(src_dir) + print(f"Found {len(images)} candidate images under {src_dir}") + if skipped: + top = sorted(skipped.items(), key=lambda kv: -kv[1]) + print("Skipped non-image files: " + ", ".join(f"{ext}={n}" for ext, n in top[:10])) + + emb_list: list[np.ndarray] = [] + meta: list[dict] = [] + processed: set[str] = set() + path_aliases: dict[str, list[str]] = {} + hash_to_canon: dict[str, str] = {} # hash -> canonical path (covers both face and noface) + + if resume and cache_path.exists(): + print(f"Resume: loading existing cache {cache_path}") + old_emb, old_meta, _old_root, old_processed, old_aliases = load_cache(cache_path) + emb_list = [old_emb] if len(old_emb) else [] + meta = list(old_meta) + processed = set(old_processed) + path_aliases = {k: list(v) for k, v in old_aliases.items()} + need_hash = [m for m in meta if not m.get("hash")] + if need_hash: + print(f"Backfilling hashes for {len(need_hash)} existing records") + for m in tqdm(need_hash, desc="rehash"): + p = Path(m["path"]) + if p.exists(): + try: + m["hash"] = sha256_of(p) + except Exception as e: + print(f"[warn] rehash failed {p}: {e}", file=sys.stderr) + for m in meta: + h = m.get("hash") + if h and h not in hash_to_canon: + hash_to_canon[h] = m["path"] + + print("Hashing candidates...") + to_embed: list[tuple[Path, str]] = [] + alias_added = 0 + alias_scanned = 0 + + for p in tqdm(images, desc="hashing"): + ps = str(p) + if ps in processed: + continue + try: + h = sha256_of(p) + except Exception as e: + print(f"[warn] hash failed {p}: {e}", file=sys.stderr) + processed.add(ps) + continue + if h in hash_to_canon: + canon = hash_to_canon[h] + alias_scanned += 1 + if ps != canon: + lst = path_aliases.setdefault(canon, []) + if ps not in lst: + lst.append(ps) + alias_added += 1 + processed.add(ps) + else: + to_embed.append((p, h)) + hash_to_canon[h] = ps + + if alias_scanned: + print(f"Byte-dedup: {alias_added} paths aliased to existing canonicals ({alias_scanned} dupes scanned)") + + print(f"To embed: {len(to_embed)} unique files") + if not to_embed: + save_cache(cache_path, np.concatenate(emb_list) if emb_list else np.zeros((0, 512), np.float32), meta, src_dir, processed, path_aliases) + print(f"Cache written to {cache_path} (no new embeddings)") + return + + app = FaceAnalysis(name="buffalo_l", providers=["CPUExecutionProvider"]) + app.prepare(ctx_id=-1, det_size=(640, 640)) + + new_emb_chunks: list[np.ndarray] = [] + new_records: list[dict] = [] + since_flush = 0 + emb_total_before = sum(e.shape[0] for e in emb_list) + t_embed = time.time() + + def flush(): + nonlocal emb_list, new_emb_chunks, new_records, since_flush + if not new_emb_chunks and not new_records: + return + if new_emb_chunks: + emb_list.append(np.concatenate(new_emb_chunks)) + new_emb_chunks = [] + for r in new_records: + meta.append(r) + new_records = [] + save_cache(cache_path, np.concatenate(emb_list) if emb_list else np.zeros((0, 512), np.float32), meta, src_dir, processed, path_aliases) + since_flush = 0 + + try: + for p, h in tqdm(to_embed, desc="embedding"): + ps = str(p) + rgb, bgr = load_rgb_bgr(p) + if bgr is None: + new_records.append({"path": ps, "face_idx": -1, "noface": True, "hash": h, "error": "load"}) + processed.add(ps) + since_flush += 1 + if since_flush >= flush_every: + flush() + continue + faces = app.get(bgr) + kept_any = False + for i, f in enumerate(faces): + if float(f.det_score) < MIN_DET_SCORE: + continue + x1, y1, x2, y2 = [int(round(v)) for v in f.bbox] + x1, y1 = max(x1, 0), max(y1, 0) + x2, y2 = min(x2, rgb.shape[1]), min(y2, rgb.shape[0]) + w, hh = x2 - x1, y2 - y1 + short = min(w, hh) + if short < MIN_FACE_PIX: + continue + crop = rgb[y1:y2, x1:x2] + if crop.size == 0: + continue + gray = crop.mean(axis=2) + blur = laplacian_variance(gray) if min(gray.shape) > 3 else 0.0 + + emb = f.normed_embedding.astype(np.float32) + new_emb_chunks.append(emb[None, :]) + new_records.append({ + "path": ps, + "face_idx": i, + "det_score": float(f.det_score), + "bbox": [x1, y1, x2, y2], + "face_short": int(short), + "face_area": int(w * hh), + "blur": blur, + "noface": False, + "hash": h, + }) + kept_any = True + if not kept_any: + new_records.append({"path": ps, "face_idx": -1, "noface": True, "hash": h}) + processed.add(ps) + since_flush += 1 + if since_flush >= flush_every: + flush() + finally: + flush() + + emb_total_after = sum(e.shape[0] for e in emb_list) + dt = time.time() - t_embed + print(f"Embedded {emb_total_after - emb_total_before} new faces across {len(to_embed)} files in {dt:.1f}s") + noface_count = sum(1 for m in meta if m.get("noface")) + alias_total = sum(len(v) for v in path_aliases.values()) + print(f"Cache totals: {emb_total_after} faces, {noface_count} noface, {alias_total} alias paths") + print(f"Wrote {cache_path} (wall {time.time()-t0:.1f}s)") + + +# ---------- cluster / refine helpers ---------- # + +def _fan_paths(rec: dict, path_aliases: dict[str, list[str]]) -> list[str]: + return [rec["path"]] + list(path_aliases.get(rec["path"], [])) def _transfer(src: Path, dst: Path, mode: str) -> None: @@ -164,8 +348,10 @@ def _cluster_embeddings(emb: np.ndarray, threshold: float) -> np.ndarray: return clusterer.fit_predict(emb) +# ---------- cluster ---------- # + def cmd_cluster(cache_path: Path, out_dir: Path, threshold: float, mode: str, dry_run: bool) -> None: - emb, meta, src_root = load_cache(cache_path) + emb, meta, src_root, _, path_aliases = load_cache(cache_path) if src_root is None: src_root = Path("/") face_records = [m for m in meta if not m.get("noface")] @@ -177,7 +363,7 @@ def cmd_cluster(cache_path: Path, out_dir: Path, threshold: float, mode: str, dr print("No faces detected; nothing to cluster.") return - print(f"Clustering {len(emb)} face embeddings (threshold={threshold} cosine distance)") + print(f"Clustering {len(emb)} face embeddings (threshold={threshold})") labels = _cluster_embeddings(emb, threshold) clusters: dict[int, list[dict]] = {} @@ -210,30 +396,40 @@ def cmd_cluster(cache_path: Path, out_dir: Path, threshold: float, mode: str, dr cluster_dir[cid] = out_dir / f"person_{rank:03d}" cluster_dir[cid].mkdir(parents=True, exist_ok=True) - per_cluster_imgs: dict[int, set[str]] = {cid: set() for cid, _ in ordered} + per_cluster_recs: dict[int, list[dict]] = {cid: [] for cid, _ in ordered} for cid, recs in ordered: + seen = set() for r in recs: - per_cluster_imgs[cid].add(r["path"]) + if r["path"] in seen: + continue + seen.add(r["path"]) + per_cluster_recs[cid].append(r) - total = sum(len(v) for v in per_cluster_imgs.values()) - unique = len({p for s in per_cluster_imgs.values() for p in s}) - print(f"Placing {total} file instances across {unique} unique images (mode={mode}) -> {out_dir}") + total_paths = sum(len(_fan_paths(r, path_aliases)) for v in per_cluster_recs.values() for r in v) + print(f"Placing {total_paths} file instances (incl. aliases, mode={mode}) -> {out_dir}") - for cid, paths in tqdm(per_cluster_imgs.items(), desc="transferring"): + for cid, recs in tqdm(per_cluster_recs.items(), desc="transferring"): dst_dir = cluster_dir[cid] - for p in sorted(paths): - src = Path(p) - dst = dst_dir / safe_dst_name(src, src_root) - _transfer(src, dst, mode) + for r in recs: + for p in _fan_paths(r, path_aliases): + src = Path(p) + if not src.exists(): + continue + _transfer(src, dst_dir / safe_dst_name(src, src_root), mode) if noface_records: noface_dir = out_dir / "_noface" noface_dir.mkdir(exist_ok=True) + seen_noface: set[str] = set() for r in noface_records: - src = Path(r["path"]) - if not src.exists(): - continue - _transfer(src, noface_dir / safe_dst_name(src, src_root), mode) + for p in _fan_paths(r, path_aliases): + if p in seen_noface: + continue + seen_noface.add(p) + src = Path(p) + if not src.exists(): + continue + _transfer(src, noface_dir / safe_dst_name(src, src_root), mode) print(f"{len(noface_records)} no-face images -> {noface_dir}") manifest = [] @@ -242,17 +438,21 @@ def cmd_cluster(cache_path: Path, out_dir: Path, threshold: float, mode: str, dr manifest.append({ "image": Path(r["path"]).name, "source": r["path"], + "aliases": path_aliases.get(r["path"], []), "cluster": cid, "folder": cluster_dir[cid].name, "bbox": r.get("bbox"), "det_score": r.get("det_score"), "face_short": r.get("face_short"), "blur": r.get("blur"), + "hash": r.get("hash"), }) (out_dir / "manifest.json").write_text(json.dumps(manifest, indent=2)) print(f"Manifest -> {out_dir / 'manifest.json'}") +# ---------- refine ---------- # + def _cluster_centroids(emb: np.ndarray, labels: np.ndarray) -> tuple[np.ndarray, list[int]]: ids = sorted(set(int(l) for l in labels)) cents = [] @@ -279,7 +479,7 @@ def cmd_refine( mode: str, dry_run: bool, ) -> None: - emb, meta, src_root = load_cache(cache_path) + emb, meta, src_root, _, path_aliases = load_cache(cache_path) if src_root is None: src_root = Path("/") face_records = [m for m in meta if not m.get("noface")] @@ -289,29 +489,23 @@ def cmd_refine( print(f"Stage 1: initial clustering (threshold={initial_threshold})") labels = _cluster_embeddings(emb, initial_threshold) - # Stage 2: merge similar clusters by centroid cents, cent_ids = _cluster_centroids(emb, labels) print(f"Stage 2: centroid merge on {len(cent_ids)} clusters (merge_threshold={merge_threshold})") cent_labels = _cluster_embeddings(cents, merge_threshold) if len(cents) > 1 else np.zeros(1, dtype=int) - # remap original labels via centroid merge label_map = {cid: int(ml) for cid, ml in zip(cent_ids, cent_labels)} merged = np.array([label_map[int(l)] for l in labels]) - # Build merged clusters - clusters: dict[int, list[tuple[int, dict]]] = {} # cluster -> list of (global_idx, rec) + clusters: dict[int, list[tuple[int, dict]]] = {} for idx, (rec, lbl) in enumerate(zip(face_records, merged)): clusters.setdefault(int(lbl), []).append((idx, dict(rec))) - print(f"After merge: {len(clusters)} clusters") - # Stage 3: outlier rejection + quality filter per cluster kept_by_cluster: dict[int, list[tuple[int, dict]]] = {} dropped_quality = 0 dropped_outlier = 0 for cid, items in clusters.items(): idxs = [i for i, _ in items] cvecs = emb[idxs] - # centroid from the in-cluster faces c = cvecs.mean(axis=0) n = np.linalg.norm(c) if n > 0: @@ -319,7 +513,6 @@ def cmd_refine( kept: list[tuple[int, dict]] = [] for (idx, rec), v in zip(items, cvecs): - # Quality gate if rec.get("face_short", 0) < min_short: dropped_quality += 1 continue @@ -329,7 +522,6 @@ def cmd_refine( if rec.get("det_score", 0.0) < min_det_score: dropped_quality += 1 continue - # Outlier: only apply if the merged cluster has >=4 surviving-ish members if len(items) >= 4: cos_dist = 1.0 - float(v @ c) if cos_dist > outlier_threshold: @@ -341,7 +533,6 @@ def cmd_refine( print(f"Dropped {dropped_quality} faces by quality gate, {dropped_outlier} as outliers") - # Stage 4: enforce minimum cluster size (by unique images, not faces) final: list[tuple[int, list[tuple[int, dict]]]] = [] for cid, items in kept_by_cluster.items(): unique_imgs = {rec["path"] for _, rec in items} @@ -350,29 +541,29 @@ def cmd_refine( final.sort(key=lambda kv: -len(kv[1])) print(f"Facesets meeting min_faces={min_faces}: {len(final)}") - for rank, (cid, items) in enumerate(final, 1): + for rank, (_cid, items) in enumerate(final, 1): unique_imgs = {rec["path"] for _, rec in items} - print(f" faceset_{rank:03d}: faces={len(items):3d} imgs={len(unique_imgs):3d}") + total_aliases = sum(len(path_aliases.get(p, [])) for p in unique_imgs) + print(f" faceset_{rank:03d}: faces={len(items):3d} imgs={len(unique_imgs):3d} aliases={total_aliases}") if dry_run: return out_dir.mkdir(parents=True, exist_ok=True) - for rank, (cid, items) in enumerate(final, 1): + for rank, (_cid, items) in enumerate(final, 1): dst_dir = out_dir / f"faceset_{rank:03d}" dst_dir.mkdir(exist_ok=True) seen_paths: set[str] = set() for _, rec in items: - p = rec["path"] - if p in seen_paths: - continue - seen_paths.add(p) - src = Path(p) - if not src.exists(): - continue - _transfer(src, dst_dir / safe_dst_name(src, src_root), mode) + for p in _fan_paths(rec, path_aliases): + if p in seen_paths: + continue + seen_paths.add(p) + src = Path(p) + if not src.exists(): + continue + _transfer(src, dst_dir / safe_dst_name(src, src_root), mode) - # Write refinement manifest manifest = { "params": { "initial_threshold": initial_threshold, @@ -388,15 +579,372 @@ def cmd_refine( "name": f"faceset_{rank:03d}", "face_count": len(items), "image_count": len({rec["path"] for _, rec in items}), + "alias_count": sum(len(path_aliases.get(p, [])) for p in {rec["path"] for _, rec in items}), "images": sorted({rec["path"] for _, rec in items}), } - for rank, (_, items) in enumerate(final, 1) + for rank, (_cid, items) in enumerate(final, 1) ], } (out_dir / "refine_manifest.json").write_text(json.dumps(manifest, indent=2)) print(f"Refine manifest -> {out_dir / 'refine_manifest.json'}") +# ---------- dedup (post-hoc visual) ---------- # + +def cmd_dedup(cache_path: Path, cos_threshold: float, out_path: Path | None) -> None: + emb, meta, _src_root, _proc, path_aliases = load_cache(cache_path) + face_records = [m for m in meta if not m.get("noface")] + if len(face_records) != len(emb): + raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}") + + byte_groups: dict[str, list[str]] = {} + # De-duplicate: one group per canonical path that has aliases. + seen_canon: set[str] = set() + for m in face_records: + h = m.get("hash") + p = m["path"] + if not h or p in seen_canon: + continue + seen_canon.add(p) + aliases = path_aliases.get(p, []) + if aliases: + byte_groups[h] = [p] + list(aliases) + + n = len(emb) + parent = list(range(n)) + + def find(x): + while parent[x] != x: + parent[x] = parent[parent[x]] + x = parent[x] + return x + + def union(a, b): + ra, rb = find(a), find(b) + if ra != rb: + parent[rb] = ra + + print(f"Scanning {n} face embeddings for visual near-duplicates (cos<={cos_threshold})...") + block = 512 + thr = 1.0 - cos_threshold + for i in range(0, n, block): + a = emb[i:i+block] + sims = a @ emb.T + for row_i, row in enumerate(sims): + global_i = i + row_i + hits = np.where(row >= thr)[0] + for j in hits: + if j <= global_i: + continue + ra = face_records[global_i] + rb = face_records[int(j)] + if ra["path"] == rb["path"]: + continue + sa, sb = ra.get("face_short", 0), rb.get("face_short", 0) + if sa and sb and max(sa, sb) / max(min(sa, sb), 1) > 1.15: + continue + union(global_i, int(j)) + + visual_groups: dict[int, list[int]] = {} + for i in range(n): + r = find(i) + visual_groups.setdefault(r, []).append(i) + vg_out = [] + for idxs in visual_groups.values(): + paths = sorted({face_records[i]["path"] for i in idxs}) + if len(paths) > 1: + vg_out.append(paths) + + out_path = out_path or cache_path.with_suffix(".duplicates.json") + out_path.write_text(json.dumps({ + "byte_groups": [sorted(v) for v in byte_groups.values()], + "visual_groups": sorted(vg_out, key=len, reverse=True), + "visual_cos_threshold": cos_threshold, + }, indent=2)) + print(f"Byte-identical groups: {len(byte_groups)}") + print(f"Visual near-dupe groups (cross-file, size-match): {len(vg_out)}") + print(f"Report -> {out_path}") + + +# ---------- extend (incremental, preserves existing folder numbering) ---------- # + +def _normalize(v: np.ndarray) -> np.ndarray: + n = np.linalg.norm(v) + return v / n if n > 0 else v + + +def cmd_extend( + cache_path: Path, + raw_out: Path, + refine_out: Path | None, + match_threshold: float, + new_cluster_threshold: float, + mode: str, + refine_min_short: int, + refine_min_blur: float, + refine_min_det_score: float, + refine_centroid_threshold: float, +) -> None: + emb, meta, src_root, _processed, path_aliases = load_cache(cache_path) + if src_root is None: + src_root = Path("/") + + raw_manifest_path = raw_out / "manifest.json" + if not raw_manifest_path.exists(): + raise SystemExit(f"raw manifest not found: {raw_manifest_path}. Run 'cluster' first.") + old_raw = json.loads(raw_manifest_path.read_text()) + + # (path, bbox_tuple) -> folder name + entry_to_folder: dict[tuple[str, tuple | None], str] = {} + for e in old_raw: + key = (e["source"], tuple(e["bbox"]) if e.get("bbox") else None) + entry_to_folder[key] = e["folder"] + + face_records = [m for m in meta if not m.get("noface")] + noface_records = [m for m in meta if m.get("noface")] + if len(face_records) != len(emb): + raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}") + + placed_idx_to_folder: dict[int, str] = {} + unplaced_idx: list[int] = [] + for i, m in enumerate(face_records): + key = (m["path"], tuple(m["bbox"]) if m.get("bbox") else None) + if key in entry_to_folder: + placed_idx_to_folder[i] = entry_to_folder[key] + else: + unplaced_idx.append(i) + + print(f"Cache: {len(emb)} face embeddings, {len(placed_idx_to_folder)} already placed, {len(unplaced_idx)} unplaced") + + # Per-person centroids from already-placed embeddings (skip _singletons/_noface). + from collections import defaultdict + folder_vecs: dict[str, list[np.ndarray]] = defaultdict(list) + for i, folder in placed_idx_to_folder.items(): + folder_vecs[folder].append(emb[i]) + person_folders = sorted( + [f for f in folder_vecs if f.startswith("person_")], + key=lambda s: int(s.split("_")[1]), + ) + if not person_folders: + raise SystemExit("no person_NNN folders found in existing manifest") + person_cents = np.stack([_normalize(np.stack(folder_vecs[f]).mean(axis=0)) for f in person_folders]) + max_num = max(int(f.split("_")[1]) for f in person_folders) + + # Phase 1: nearest-centroid assignment. + assignments: dict[int, str] = {} + unmatched_idx: list[int] = [] + thr_sim = 1.0 - match_threshold + for face_i in unplaced_idx: + v = emb[face_i] + sims = person_cents @ v + best = int(np.argmax(sims)) + if sims[best] >= thr_sim: + assignments[face_i] = person_folders[best] + else: + unmatched_idx.append(face_i) + + print(f"Phase 1 (nearest person): {len(assignments)} matched, {len(unmatched_idx)} unmatched") + + # Phase 2: cluster the unmatched among themselves into new person_XXX or _singletons. + new_num = max_num + if unmatched_idx: + u_vecs = np.stack([emb[i] for i in unmatched_idx]) + labels = _cluster_embeddings(u_vecs, new_cluster_threshold) if len(u_vecs) > 1 else np.zeros(1, dtype=int) + groups: dict[int, list[int]] = {} + for face_i, lbl in zip(unmatched_idx, labels): + groups.setdefault(int(lbl), []).append(face_i) + ordered = sorted(groups.items(), key=lambda kv: -len(kv[1])) + for _gid, indices in ordered: + if len(indices) == 1: + assignments[indices[0]] = "_singletons" + else: + new_num += 1 + folder = f"person_{new_num:03d}" + for i in indices: + assignments[i] = folder + new_persons = new_num - max_num + new_singletons = sum(1 for f in assignments.values() if f == "_singletons") + print(f"Phase 2 (new clusters): {new_persons} new person_NNN, {new_singletons} new singletons") + + # Materialize: for each newly-assigned face, copy canonical + aliases to its folder. + raw_out.mkdir(parents=True, exist_ok=True) + copied_new = 0 + for face_i, folder in assignments.items(): + dst_dir = raw_out / folder + dst_dir.mkdir(parents=True, exist_ok=True) + m = face_records[face_i] + for p in _fan_paths(m, path_aliases): + src = Path(p) + if not src.exists(): + continue + dst = dst_dir / safe_dst_name(src, src_root) + if not dst.exists(): + _transfer(src, dst, mode) + copied_new += 1 + + # Also fan newly-added aliases of already-placed canonicals into their existing folders. + copied_aliases = 0 + for face_i, folder in placed_idx_to_folder.items(): + dst_dir = raw_out / folder + m = face_records[face_i] + for p in _fan_paths(m, path_aliases): + src = Path(p) + if not src.exists(): + continue + dst = dst_dir / safe_dst_name(src, src_root) + if not dst.exists(): + _transfer(src, dst, mode) + copied_aliases += 1 + + # Noface: idempotent drop into _noface/ for every noface record + aliases. + noface_dir = raw_out / "_noface" + noface_dir.mkdir(exist_ok=True) + copied_noface = 0 + seen_noface: set[str] = set() + for m in noface_records: + for p in _fan_paths(m, path_aliases): + if p in seen_noface: + continue + seen_noface.add(p) + src = Path(p) + if not src.exists(): + continue + dst = noface_dir / safe_dst_name(src, src_root) + if not dst.exists(): + _transfer(src, dst, mode) + copied_noface += 1 + + print(f"Copied: {copied_new} new-face files, {copied_aliases} new aliases of existing placements, {copied_noface} noface") + + # Rewrite raw manifest to include everything. + all_placements = dict(placed_idx_to_folder) + all_placements.update(assignments) + new_manifest = [] + for i, folder in all_placements.items(): + m = face_records[i] + new_manifest.append({ + "image": Path(m["path"]).name, + "source": m["path"], + "aliases": path_aliases.get(m["path"], []), + "folder": folder, + "bbox": m.get("bbox"), + "det_score": m.get("det_score"), + "face_short": m.get("face_short"), + "blur": m.get("blur"), + "hash": m.get("hash"), + }) + raw_manifest_path.write_text(json.dumps(new_manifest, indent=2)) + print(f"Updated manifest -> {raw_manifest_path}") + + if refine_out is None: + return + + # ---------- extend facesets ---------- # + refine_manifest_path = refine_out / "refine_manifest.json" + if not refine_manifest_path.exists(): + raise SystemExit(f"refine manifest not found: {refine_manifest_path}. Run 'refine' first.") + old_refine = json.loads(refine_manifest_path.read_text()) + + # Build faceset centroids from cache embeddings whose paths appear in the faceset's image list. + # Multiple face records per image may exist, so include all face_records whose path is in the set. + face_set_paths: dict[str, set[str]] = {f["name"]: set(f["images"]) for f in old_refine.get("facesets", [])} + faceset_names = sorted(face_set_paths.keys(), key=lambda s: int(s.split("_")[1])) + if not faceset_names: + print("No facesets to extend.") + return + + faceset_vecs: dict[str, list[np.ndarray]] = {name: [] for name in faceset_names} + path_to_faceset: dict[str, str] = {} + for name, paths in face_set_paths.items(): + for p in paths: + path_to_faceset[p] = name + + # Identify which face records in the cache belong to which faceset (path-match). + # Collect embeddings for centroid calculation. + already_in_faceset: set[int] = set() + for i, m in enumerate(face_records): + name = path_to_faceset.get(m["path"]) + if name: + faceset_vecs[name].append(emb[i]) + already_in_faceset.add(i) + for name in list(faceset_vecs.keys()): + vecs = faceset_vecs[name] + if not vecs: + faceset_vecs[name] = None # type: ignore + continue + faceset_vecs[name] = _normalize(np.stack(vecs).mean(axis=0)) # type: ignore + + live_names = [n for n, v in faceset_vecs.items() if v is not None] + faceset_cents = np.stack([faceset_vecs[n] for n in live_names]) # type: ignore + + thr_fs_sim = 1.0 - refine_centroid_threshold + fs_assigned: dict[int, str] = {} + for face_i in unplaced_idx: + if face_i in already_in_faceset: + continue + m = face_records[face_i] + # Quality gate identical to refine defaults. + if m.get("face_short", 0) < refine_min_short: + continue + if m.get("blur", 0.0) < refine_min_blur: + continue + if m.get("det_score", 0.0) < refine_min_det_score: + continue + v = emb[face_i] + sims = faceset_cents @ v + best = int(np.argmax(sims)) + if sims[best] >= thr_fs_sim: + fs_assigned[face_i] = live_names[best] + + print(f"Faceset extend: {len(fs_assigned)} new faces qualify and match existing facesets") + + fs_copied = 0 + for face_i, name in fs_assigned.items(): + m = face_records[face_i] + dst_dir = refine_out / name + dst_dir.mkdir(parents=True, exist_ok=True) + for p in _fan_paths(m, path_aliases): + src = Path(p) + if not src.exists(): + continue + dst = dst_dir / safe_dst_name(src, src_root) + if not dst.exists(): + _transfer(src, dst, mode) + fs_copied += 1 + + # Also fan new aliases of already-in-faceset canonicals. + fs_alias_copied = 0 + for i in already_in_faceset: + m = face_records[i] + name = path_to_faceset[m["path"]] + dst_dir = refine_out / name + for p in _fan_paths(m, path_aliases): + src = Path(p) + if not src.exists(): + continue + dst = dst_dir / safe_dst_name(src, src_root) + if not dst.exists(): + _transfer(src, dst, mode) + fs_alias_copied += 1 + + print(f"Faceset copied: {fs_copied} new faces, {fs_alias_copied} new aliases") + + # Update refine manifest with extended image lists. + new_fs_images: dict[str, set[str]] = {name: set(face_set_paths[name]) for name in faceset_names} + for face_i, name in fs_assigned.items(): + new_fs_images[name].add(face_records[face_i]["path"]) + for f in old_refine.get("facesets", []): + name = f["name"] + f["images"] = sorted(new_fs_images.get(name, set())) + f["image_count"] = len(f["images"]) + f["alias_count"] = sum(len(path_aliases.get(p, [])) for p in f["images"]) + old_refine["extended"] = True + refine_manifest_path.write_text(json.dumps(old_refine, indent=2)) + print(f"Updated refine manifest -> {refine_manifest_path}") + + +# ---------- main ---------- # + def main() -> None: p = argparse.ArgumentParser() sub = p.add_subparsers(dest="cmd", required=True) @@ -404,6 +952,8 @@ def main() -> None: pe = sub.add_parser("embed") pe.add_argument("src_dir", type=Path) pe.add_argument("cache", type=Path) + pe.add_argument("--no-resume", action="store_true", help="ignore any existing cache at path") + pe.add_argument("--flush-every", type=int, default=FLUSH_DEFAULT) pc = sub.add_parser("cluster") pc.add_argument("cache", type=Path) @@ -425,9 +975,26 @@ def main() -> None: pr.add_argument("--mode", choices=["copy", "move", "symlink"], default="copy") pr.add_argument("--dry-run", action="store_true") + pd = sub.add_parser("dedup") + pd.add_argument("cache", type=Path) + pd.add_argument("--cos", type=float, default=0.03, help="cosine-distance threshold for visual dupes") + pd.add_argument("--out", type=Path, default=None) + + px = sub.add_parser("extend", help="Add new embeddings to existing raw/refine dirs without renumbering") + px.add_argument("cache", type=Path) + px.add_argument("raw_out", type=Path, help="existing raw cluster dir (must contain manifest.json)") + px.add_argument("--refine-out", type=Path, default=None, help="optional existing facesets dir") + px.add_argument("--threshold", type=float, default=0.55, help="cosine-dist cutoff for matching new face to an existing person centroid") + px.add_argument("--new-cluster-threshold", type=float, default=0.55, help="threshold for clustering the unmatched new faces among themselves") + px.add_argument("--mode", choices=["copy", "move", "symlink"], default="copy") + px.add_argument("--refine-min-short", type=int, default=90) + px.add_argument("--refine-min-blur", type=float, default=40.0) + px.add_argument("--refine-min-det-score", type=float, default=0.6) + px.add_argument("--refine-centroid-threshold", type=float, default=0.55) + args = p.parse_args() if args.cmd == "embed": - cmd_embed(args.src_dir, args.cache) + cmd_embed(args.src_dir, args.cache, resume=not args.no_resume, flush_every=args.flush_every) elif args.cmd == "cluster": cmd_cluster(args.cache, args.out_dir, args.threshold, args.mode, args.dry_run) elif args.cmd == "refine": @@ -437,6 +1004,15 @@ def main() -> None: args.min_faces, args.min_short, args.min_blur, args.min_det_score, args.mode, args.dry_run, ) + elif args.cmd == "dedup": + cmd_dedup(args.cache, args.cos, args.out) + elif args.cmd == "extend": + cmd_extend( + args.cache, args.raw_out, args.refine_out, + args.threshold, args.new_cluster_threshold, args.mode, + args.refine_min_short, args.refine_min_blur, args.refine_min_det_score, + args.refine_centroid_threshold, + ) if __name__ == "__main__":