Files
face-sets/sort_faces.py
Peter 484278e70e Rewrite pipeline: resumable embed, byte-dedup, extend, dedup report
- embed: sha256-based dedup at listing (embed each unique hash once, carry
  other paths as aliases via a top-level path_aliases dict); resumable from
  any existing cache; atomic incremental flush every 50 files; explicit
  skip-ext filtering; schema bumped with processed_paths + path_aliases.
- extend: new subcommand that merges new embeddings into an existing raw +
  facesets output without renumbering. Nearest person-centroid match above
  threshold, unmatched faces re-clustered into new person_NNN / _singletons.
  Optional --refine-out also extends facesets by centroid + quality gate.
- dedup: new subcommand producing byte-identical + visual near-duplicate
  groups as a JSON report.
- cluster/refine: fan every placement across canonical + aliases so each
  on-disk location gets represented.
- safe_dst_name now always flattens the absolute path so filenames stay
  stable across runs when src_root shifts (fixes duplicate-copy bug that
  surfaced during the lzbkp_red extend).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 19:21:50 +02:00

1020 lines
38 KiB
Python

"""Sort photos by similar faces using InsightFace embeddings + agglomerative clustering.
Subcommands:
embed <src_dir> <cache.npz> recursively scan, detect+embed faces
cluster <cache.npz> <out_dir> [opts] raw agglomerative clustering -> person_NNN/
refine <cache.npz> <out_dir> [opts] merge + outlier + quality pass -> faceset-ready folders
dedup <cache.npz> post-hoc visual near-duplicate analysis
Dedup model (for embed):
At listing time every eligible file is sha256-hashed and grouped. Each hash-group
is embedded exactly once; other paths with the same hash are carried as `aliases`
on the canonical meta record. cluster/refine materialize every alias so each
on-disk location ends up represented in the output.
Cache format (v2):
embeddings (N, 512) float32
meta JSON list of dicts, one per face record; fields:
path, aliases[], hash, face_idx, det_score, bbox,
face_short, face_area, blur, noface
src_root absolute source root of the latest embed/resume run
processed_paths JSON list of every path whose fate is decided
(embedded, noface, aliased, or load-error)
schema "v2"
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import shutil
import sys
import time
from pathlib import Path
import numpy as np
from PIL import Image, ImageOps
from tqdm import tqdm
IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff", ".webp", ".heic", ".gif"}
SKIP_EXTS = {
".psd", ".avi", ".mov", ".mp4", ".mkv", ".m4v", ".wmv", ".webm",
".mpg", ".mpeg", ".flv", ".3gp", ".m2ts", ".mts",
".zip", ".rar", ".7z", ".tar", ".gz",
".ini", ".db", ".txt", ".log", ".xmp", ".thm",
}
MIN_DET_SCORE = 0.5
MIN_FACE_PIX = 40
HASH_CHUNK = 1 << 20
FLUSH_DEFAULT = 50
def sha256_of(path: Path, chunk: int = HASH_CHUNK) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
while True:
b = f.read(chunk)
if not b:
break
h.update(b)
return h.hexdigest()
def list_eligible(src: Path) -> tuple[list[Path], dict[str, int]]:
"""Recursive scan; returns (kept_paths, skipped_counts_by_ext)."""
kept: list[Path] = []
skipped: dict[str, int] = {}
for p in src.rglob("*"):
if not p.is_file():
continue
ext = p.suffix.lower()
if ext in IMG_EXTS:
kept.append(p)
else:
skipped[ext or "<noext>"] = skipped.get(ext or "<noext>", 0) + 1
return sorted(kept), skipped
def load_rgb_bgr(path: Path):
try:
with Image.open(path) as im:
im = ImageOps.exif_transpose(im)
im = im.convert("RGB")
rgb = np.array(im)
bgr = rgb[:, :, ::-1].copy()
return rgb, bgr
except Exception as e:
print(f"[warn] failed to load {path}: {e}", file=sys.stderr)
return None, None
def laplacian_variance(gray: np.ndarray) -> float:
g = gray.astype(np.float32)
lap = (
-4.0 * g[1:-1, 1:-1]
+ g[:-2, 1:-1] + g[2:, 1:-1]
+ g[1:-1, :-2] + g[1:-1, 2:]
)
return float(lap.var())
def safe_dst_name(path: Path, root: Path | None = None) -> str:
# Always flatten the absolute path. Root-relative names would change when
# the cache src_root moves between runs (e.g. during extend), producing
# duplicate copies in the same folder under different filenames.
flat = str(path).lstrip("/").replace("/", "__").replace("\\", "__").replace(" ", "_")
return flat
# ---------- cache I/O ---------- #
def load_cache(cache_path: Path) -> tuple[np.ndarray, list[dict], Path | None, set[str], dict[str, list[str]]]:
data = np.load(cache_path, allow_pickle=True)
emb = data["embeddings"]
meta = json.loads(str(data["meta"]))
src_root = Path(str(data["src_root"])) if "src_root" in data.files else None
if "processed_paths" in data.files:
processed = set(json.loads(str(data["processed_paths"])))
else:
processed = {m["path"] for m in meta}
path_aliases: dict[str, list[str]] = {}
if "path_aliases" in data.files:
path_aliases = json.loads(str(data["path_aliases"]))
else:
# v2a legacy: aliases lived inside meta records; migrate up.
for m in meta:
al = m.get("aliases")
if al:
path_aliases.setdefault(m["path"], [])
for a in al:
if a not in path_aliases[m["path"]]:
path_aliases[m["path"]].append(a)
for m in meta:
m.pop("aliases", None)
m.setdefault("hash", None)
return emb, meta, src_root, processed, path_aliases
def save_cache(
cache_path: Path,
emb: np.ndarray,
meta: list[dict],
src_root: Path,
processed: set[str],
path_aliases: dict[str, list[str]],
) -> None:
# np.savez auto-appends ".npz" unless the name already ends with it, so
# the tmp file must end in ".npz" to avoid a double-suffix and a broken rename.
tmp = cache_path.with_suffix(".tmp.npz")
np.savez(
str(tmp),
embeddings=emb if len(emb) else np.zeros((0, 512), dtype=np.float32),
meta=json.dumps(meta),
src_root=str(src_root),
processed_paths=json.dumps(sorted(processed)),
path_aliases=json.dumps(path_aliases),
schema="v2",
)
os.replace(tmp, cache_path)
# ---------- embed ---------- #
def cmd_embed(src_dir: Path, cache_path: Path, resume: bool, flush_every: int) -> None:
from insightface.app import FaceAnalysis
t0 = time.time()
images, skipped = list_eligible(src_dir)
print(f"Found {len(images)} candidate images under {src_dir}")
if skipped:
top = sorted(skipped.items(), key=lambda kv: -kv[1])
print("Skipped non-image files: " + ", ".join(f"{ext}={n}" for ext, n in top[:10]))
emb_list: list[np.ndarray] = []
meta: list[dict] = []
processed: set[str] = set()
path_aliases: dict[str, list[str]] = {}
hash_to_canon: dict[str, str] = {} # hash -> canonical path (covers both face and noface)
if resume and cache_path.exists():
print(f"Resume: loading existing cache {cache_path}")
old_emb, old_meta, _old_root, old_processed, old_aliases = load_cache(cache_path)
emb_list = [old_emb] if len(old_emb) else []
meta = list(old_meta)
processed = set(old_processed)
path_aliases = {k: list(v) for k, v in old_aliases.items()}
need_hash = [m for m in meta if not m.get("hash")]
if need_hash:
print(f"Backfilling hashes for {len(need_hash)} existing records")
for m in tqdm(need_hash, desc="rehash"):
p = Path(m["path"])
if p.exists():
try:
m["hash"] = sha256_of(p)
except Exception as e:
print(f"[warn] rehash failed {p}: {e}", file=sys.stderr)
for m in meta:
h = m.get("hash")
if h and h not in hash_to_canon:
hash_to_canon[h] = m["path"]
print("Hashing candidates...")
to_embed: list[tuple[Path, str]] = []
alias_added = 0
alias_scanned = 0
for p in tqdm(images, desc="hashing"):
ps = str(p)
if ps in processed:
continue
try:
h = sha256_of(p)
except Exception as e:
print(f"[warn] hash failed {p}: {e}", file=sys.stderr)
processed.add(ps)
continue
if h in hash_to_canon:
canon = hash_to_canon[h]
alias_scanned += 1
if ps != canon:
lst = path_aliases.setdefault(canon, [])
if ps not in lst:
lst.append(ps)
alias_added += 1
processed.add(ps)
else:
to_embed.append((p, h))
hash_to_canon[h] = ps
if alias_scanned:
print(f"Byte-dedup: {alias_added} paths aliased to existing canonicals ({alias_scanned} dupes scanned)")
print(f"To embed: {len(to_embed)} unique files")
if not to_embed:
save_cache(cache_path, np.concatenate(emb_list) if emb_list else np.zeros((0, 512), np.float32), meta, src_dir, processed, path_aliases)
print(f"Cache written to {cache_path} (no new embeddings)")
return
app = FaceAnalysis(name="buffalo_l", providers=["CPUExecutionProvider"])
app.prepare(ctx_id=-1, det_size=(640, 640))
new_emb_chunks: list[np.ndarray] = []
new_records: list[dict] = []
since_flush = 0
emb_total_before = sum(e.shape[0] for e in emb_list)
t_embed = time.time()
def flush():
nonlocal emb_list, new_emb_chunks, new_records, since_flush
if not new_emb_chunks and not new_records:
return
if new_emb_chunks:
emb_list.append(np.concatenate(new_emb_chunks))
new_emb_chunks = []
for r in new_records:
meta.append(r)
new_records = []
save_cache(cache_path, np.concatenate(emb_list) if emb_list else np.zeros((0, 512), np.float32), meta, src_dir, processed, path_aliases)
since_flush = 0
try:
for p, h in tqdm(to_embed, desc="embedding"):
ps = str(p)
rgb, bgr = load_rgb_bgr(p)
if bgr is None:
new_records.append({"path": ps, "face_idx": -1, "noface": True, "hash": h, "error": "load"})
processed.add(ps)
since_flush += 1
if since_flush >= flush_every:
flush()
continue
faces = app.get(bgr)
kept_any = False
for i, f in enumerate(faces):
if float(f.det_score) < MIN_DET_SCORE:
continue
x1, y1, x2, y2 = [int(round(v)) for v in f.bbox]
x1, y1 = max(x1, 0), max(y1, 0)
x2, y2 = min(x2, rgb.shape[1]), min(y2, rgb.shape[0])
w, hh = x2 - x1, y2 - y1
short = min(w, hh)
if short < MIN_FACE_PIX:
continue
crop = rgb[y1:y2, x1:x2]
if crop.size == 0:
continue
gray = crop.mean(axis=2)
blur = laplacian_variance(gray) if min(gray.shape) > 3 else 0.0
emb = f.normed_embedding.astype(np.float32)
new_emb_chunks.append(emb[None, :])
new_records.append({
"path": ps,
"face_idx": i,
"det_score": float(f.det_score),
"bbox": [x1, y1, x2, y2],
"face_short": int(short),
"face_area": int(w * hh),
"blur": blur,
"noface": False,
"hash": h,
})
kept_any = True
if not kept_any:
new_records.append({"path": ps, "face_idx": -1, "noface": True, "hash": h})
processed.add(ps)
since_flush += 1
if since_flush >= flush_every:
flush()
finally:
flush()
emb_total_after = sum(e.shape[0] for e in emb_list)
dt = time.time() - t_embed
print(f"Embedded {emb_total_after - emb_total_before} new faces across {len(to_embed)} files in {dt:.1f}s")
noface_count = sum(1 for m in meta if m.get("noface"))
alias_total = sum(len(v) for v in path_aliases.values())
print(f"Cache totals: {emb_total_after} faces, {noface_count} noface, {alias_total} alias paths")
print(f"Wrote {cache_path} (wall {time.time()-t0:.1f}s)")
# ---------- cluster / refine helpers ---------- #
def _fan_paths(rec: dict, path_aliases: dict[str, list[str]]) -> list[str]:
return [rec["path"]] + list(path_aliases.get(rec["path"], []))
def _transfer(src: Path, dst: Path, mode: str) -> None:
if dst.exists():
return
if mode == "copy":
shutil.copy2(src, dst)
elif mode == "move":
shutil.move(str(src), str(dst))
elif mode == "symlink":
dst.symlink_to(src)
def _cluster_embeddings(emb: np.ndarray, threshold: float) -> np.ndarray:
from sklearn.cluster import AgglomerativeClustering
clusterer = AgglomerativeClustering(
n_clusters=None,
distance_threshold=threshold,
metric="cosine",
linkage="average",
)
return clusterer.fit_predict(emb)
# ---------- cluster ---------- #
def cmd_cluster(cache_path: Path, out_dir: Path, threshold: float, mode: str, dry_run: bool) -> None:
emb, meta, src_root, _, path_aliases = load_cache(cache_path)
if src_root is None:
src_root = Path("/")
face_records = [m for m in meta if not m.get("noface")]
noface_records = [m for m in meta if m.get("noface")]
if len(face_records) != len(emb):
raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}")
if len(emb) == 0:
print("No faces detected; nothing to cluster.")
return
print(f"Clustering {len(emb)} face embeddings (threshold={threshold})")
labels = _cluster_embeddings(emb, threshold)
clusters: dict[int, list[dict]] = {}
for rec, lbl in zip(face_records, labels):
rec = dict(rec)
rec["cluster"] = int(lbl)
clusters.setdefault(int(lbl), []).append(rec)
ordered = sorted(clusters.items(), key=lambda kv: (-len(kv[1]), kv[0]))
sizes = [len(v) for _, v in ordered]
singletons = sum(1 for s in sizes if s == 1)
print(f"Clusters: {len(ordered)} | top sizes: {sizes[:15]}")
print(f"Multi-face clusters: {len(sizes) - singletons} singletons: {singletons}")
print(f"No-face images: {len(noface_records)}")
if dry_run:
for cid, recs in ordered[:20]:
imgs = {r["path"] for r in recs}
print(f" cluster {cid:3d} faces={len(recs):3d} imgs={len(imgs)}")
return
out_dir.mkdir(parents=True, exist_ok=True)
rank = 0
cluster_dir: dict[int, Path] = {}
for cid, recs in ordered:
if len(recs) == 1:
cluster_dir[cid] = out_dir / "_singletons"
else:
rank += 1
cluster_dir[cid] = out_dir / f"person_{rank:03d}"
cluster_dir[cid].mkdir(parents=True, exist_ok=True)
per_cluster_recs: dict[int, list[dict]] = {cid: [] for cid, _ in ordered}
for cid, recs in ordered:
seen = set()
for r in recs:
if r["path"] in seen:
continue
seen.add(r["path"])
per_cluster_recs[cid].append(r)
total_paths = sum(len(_fan_paths(r, path_aliases)) for v in per_cluster_recs.values() for r in v)
print(f"Placing {total_paths} file instances (incl. aliases, mode={mode}) -> {out_dir}")
for cid, recs in tqdm(per_cluster_recs.items(), desc="transferring"):
dst_dir = cluster_dir[cid]
for r in recs:
for p in _fan_paths(r, path_aliases):
src = Path(p)
if not src.exists():
continue
_transfer(src, dst_dir / safe_dst_name(src, src_root), mode)
if noface_records:
noface_dir = out_dir / "_noface"
noface_dir.mkdir(exist_ok=True)
seen_noface: set[str] = set()
for r in noface_records:
for p in _fan_paths(r, path_aliases):
if p in seen_noface:
continue
seen_noface.add(p)
src = Path(p)
if not src.exists():
continue
_transfer(src, noface_dir / safe_dst_name(src, src_root), mode)
print(f"{len(noface_records)} no-face images -> {noface_dir}")
manifest = []
for cid, recs in ordered:
for r in recs:
manifest.append({
"image": Path(r["path"]).name,
"source": r["path"],
"aliases": path_aliases.get(r["path"], []),
"cluster": cid,
"folder": cluster_dir[cid].name,
"bbox": r.get("bbox"),
"det_score": r.get("det_score"),
"face_short": r.get("face_short"),
"blur": r.get("blur"),
"hash": r.get("hash"),
})
(out_dir / "manifest.json").write_text(json.dumps(manifest, indent=2))
print(f"Manifest -> {out_dir / 'manifest.json'}")
# ---------- refine ---------- #
def _cluster_centroids(emb: np.ndarray, labels: np.ndarray) -> tuple[np.ndarray, list[int]]:
ids = sorted(set(int(l) for l in labels))
cents = []
for cid in ids:
mask = labels == cid
v = emb[mask].mean(axis=0)
n = np.linalg.norm(v)
if n > 0:
v = v / n
cents.append(v)
return np.stack(cents), ids
def cmd_refine(
cache_path: Path,
out_dir: Path,
initial_threshold: float,
merge_threshold: float,
outlier_threshold: float,
min_faces: int,
min_short: int,
min_blur: float,
min_det_score: float,
mode: str,
dry_run: bool,
) -> None:
emb, meta, src_root, _, path_aliases = load_cache(cache_path)
if src_root is None:
src_root = Path("/")
face_records = [m for m in meta if not m.get("noface")]
if len(face_records) != len(emb):
raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}")
print(f"Stage 1: initial clustering (threshold={initial_threshold})")
labels = _cluster_embeddings(emb, initial_threshold)
cents, cent_ids = _cluster_centroids(emb, labels)
print(f"Stage 2: centroid merge on {len(cent_ids)} clusters (merge_threshold={merge_threshold})")
cent_labels = _cluster_embeddings(cents, merge_threshold) if len(cents) > 1 else np.zeros(1, dtype=int)
label_map = {cid: int(ml) for cid, ml in zip(cent_ids, cent_labels)}
merged = np.array([label_map[int(l)] for l in labels])
clusters: dict[int, list[tuple[int, dict]]] = {}
for idx, (rec, lbl) in enumerate(zip(face_records, merged)):
clusters.setdefault(int(lbl), []).append((idx, dict(rec)))
print(f"After merge: {len(clusters)} clusters")
kept_by_cluster: dict[int, list[tuple[int, dict]]] = {}
dropped_quality = 0
dropped_outlier = 0
for cid, items in clusters.items():
idxs = [i for i, _ in items]
cvecs = emb[idxs]
c = cvecs.mean(axis=0)
n = np.linalg.norm(c)
if n > 0:
c = c / n
kept: list[tuple[int, dict]] = []
for (idx, rec), v in zip(items, cvecs):
if rec.get("face_short", 0) < min_short:
dropped_quality += 1
continue
if rec.get("blur", 0.0) < min_blur:
dropped_quality += 1
continue
if rec.get("det_score", 0.0) < min_det_score:
dropped_quality += 1
continue
if len(items) >= 4:
cos_dist = 1.0 - float(v @ c)
if cos_dist > outlier_threshold:
dropped_outlier += 1
continue
kept.append((idx, rec))
if kept:
kept_by_cluster[cid] = kept
print(f"Dropped {dropped_quality} faces by quality gate, {dropped_outlier} as outliers")
final: list[tuple[int, list[tuple[int, dict]]]] = []
for cid, items in kept_by_cluster.items():
unique_imgs = {rec["path"] for _, rec in items}
if len(unique_imgs) >= min_faces:
final.append((cid, items))
final.sort(key=lambda kv: -len(kv[1]))
print(f"Facesets meeting min_faces={min_faces}: {len(final)}")
for rank, (_cid, items) in enumerate(final, 1):
unique_imgs = {rec["path"] for _, rec in items}
total_aliases = sum(len(path_aliases.get(p, [])) for p in unique_imgs)
print(f" faceset_{rank:03d}: faces={len(items):3d} imgs={len(unique_imgs):3d} aliases={total_aliases}")
if dry_run:
return
out_dir.mkdir(parents=True, exist_ok=True)
for rank, (_cid, items) in enumerate(final, 1):
dst_dir = out_dir / f"faceset_{rank:03d}"
dst_dir.mkdir(exist_ok=True)
seen_paths: set[str] = set()
for _, rec in items:
for p in _fan_paths(rec, path_aliases):
if p in seen_paths:
continue
seen_paths.add(p)
src = Path(p)
if not src.exists():
continue
_transfer(src, dst_dir / safe_dst_name(src, src_root), mode)
manifest = {
"params": {
"initial_threshold": initial_threshold,
"merge_threshold": merge_threshold,
"outlier_threshold": outlier_threshold,
"min_faces": min_faces,
"min_short": min_short,
"min_blur": min_blur,
"min_det_score": min_det_score,
},
"facesets": [
{
"name": f"faceset_{rank:03d}",
"face_count": len(items),
"image_count": len({rec["path"] for _, rec in items}),
"alias_count": sum(len(path_aliases.get(p, [])) for p in {rec["path"] for _, rec in items}),
"images": sorted({rec["path"] for _, rec in items}),
}
for rank, (_cid, items) in enumerate(final, 1)
],
}
(out_dir / "refine_manifest.json").write_text(json.dumps(manifest, indent=2))
print(f"Refine manifest -> {out_dir / 'refine_manifest.json'}")
# ---------- dedup (post-hoc visual) ---------- #
def cmd_dedup(cache_path: Path, cos_threshold: float, out_path: Path | None) -> None:
emb, meta, _src_root, _proc, path_aliases = load_cache(cache_path)
face_records = [m for m in meta if not m.get("noface")]
if len(face_records) != len(emb):
raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}")
byte_groups: dict[str, list[str]] = {}
# De-duplicate: one group per canonical path that has aliases.
seen_canon: set[str] = set()
for m in face_records:
h = m.get("hash")
p = m["path"]
if not h or p in seen_canon:
continue
seen_canon.add(p)
aliases = path_aliases.get(p, [])
if aliases:
byte_groups[h] = [p] + list(aliases)
n = len(emb)
parent = list(range(n))
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(a, b):
ra, rb = find(a), find(b)
if ra != rb:
parent[rb] = ra
print(f"Scanning {n} face embeddings for visual near-duplicates (cos<={cos_threshold})...")
block = 512
thr = 1.0 - cos_threshold
for i in range(0, n, block):
a = emb[i:i+block]
sims = a @ emb.T
for row_i, row in enumerate(sims):
global_i = i + row_i
hits = np.where(row >= thr)[0]
for j in hits:
if j <= global_i:
continue
ra = face_records[global_i]
rb = face_records[int(j)]
if ra["path"] == rb["path"]:
continue
sa, sb = ra.get("face_short", 0), rb.get("face_short", 0)
if sa and sb and max(sa, sb) / max(min(sa, sb), 1) > 1.15:
continue
union(global_i, int(j))
visual_groups: dict[int, list[int]] = {}
for i in range(n):
r = find(i)
visual_groups.setdefault(r, []).append(i)
vg_out = []
for idxs in visual_groups.values():
paths = sorted({face_records[i]["path"] for i in idxs})
if len(paths) > 1:
vg_out.append(paths)
out_path = out_path or cache_path.with_suffix(".duplicates.json")
out_path.write_text(json.dumps({
"byte_groups": [sorted(v) for v in byte_groups.values()],
"visual_groups": sorted(vg_out, key=len, reverse=True),
"visual_cos_threshold": cos_threshold,
}, indent=2))
print(f"Byte-identical groups: {len(byte_groups)}")
print(f"Visual near-dupe groups (cross-file, size-match): {len(vg_out)}")
print(f"Report -> {out_path}")
# ---------- extend (incremental, preserves existing folder numbering) ---------- #
def _normalize(v: np.ndarray) -> np.ndarray:
n = np.linalg.norm(v)
return v / n if n > 0 else v
def cmd_extend(
cache_path: Path,
raw_out: Path,
refine_out: Path | None,
match_threshold: float,
new_cluster_threshold: float,
mode: str,
refine_min_short: int,
refine_min_blur: float,
refine_min_det_score: float,
refine_centroid_threshold: float,
) -> None:
emb, meta, src_root, _processed, path_aliases = load_cache(cache_path)
if src_root is None:
src_root = Path("/")
raw_manifest_path = raw_out / "manifest.json"
if not raw_manifest_path.exists():
raise SystemExit(f"raw manifest not found: {raw_manifest_path}. Run 'cluster' first.")
old_raw = json.loads(raw_manifest_path.read_text())
# (path, bbox_tuple) -> folder name
entry_to_folder: dict[tuple[str, tuple | None], str] = {}
for e in old_raw:
key = (e["source"], tuple(e["bbox"]) if e.get("bbox") else None)
entry_to_folder[key] = e["folder"]
face_records = [m for m in meta if not m.get("noface")]
noface_records = [m for m in meta if m.get("noface")]
if len(face_records) != len(emb):
raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}")
placed_idx_to_folder: dict[int, str] = {}
unplaced_idx: list[int] = []
for i, m in enumerate(face_records):
key = (m["path"], tuple(m["bbox"]) if m.get("bbox") else None)
if key in entry_to_folder:
placed_idx_to_folder[i] = entry_to_folder[key]
else:
unplaced_idx.append(i)
print(f"Cache: {len(emb)} face embeddings, {len(placed_idx_to_folder)} already placed, {len(unplaced_idx)} unplaced")
# Per-person centroids from already-placed embeddings (skip _singletons/_noface).
from collections import defaultdict
folder_vecs: dict[str, list[np.ndarray]] = defaultdict(list)
for i, folder in placed_idx_to_folder.items():
folder_vecs[folder].append(emb[i])
person_folders = sorted(
[f for f in folder_vecs if f.startswith("person_")],
key=lambda s: int(s.split("_")[1]),
)
if not person_folders:
raise SystemExit("no person_NNN folders found in existing manifest")
person_cents = np.stack([_normalize(np.stack(folder_vecs[f]).mean(axis=0)) for f in person_folders])
max_num = max(int(f.split("_")[1]) for f in person_folders)
# Phase 1: nearest-centroid assignment.
assignments: dict[int, str] = {}
unmatched_idx: list[int] = []
thr_sim = 1.0 - match_threshold
for face_i in unplaced_idx:
v = emb[face_i]
sims = person_cents @ v
best = int(np.argmax(sims))
if sims[best] >= thr_sim:
assignments[face_i] = person_folders[best]
else:
unmatched_idx.append(face_i)
print(f"Phase 1 (nearest person): {len(assignments)} matched, {len(unmatched_idx)} unmatched")
# Phase 2: cluster the unmatched among themselves into new person_XXX or _singletons.
new_num = max_num
if unmatched_idx:
u_vecs = np.stack([emb[i] for i in unmatched_idx])
labels = _cluster_embeddings(u_vecs, new_cluster_threshold) if len(u_vecs) > 1 else np.zeros(1, dtype=int)
groups: dict[int, list[int]] = {}
for face_i, lbl in zip(unmatched_idx, labels):
groups.setdefault(int(lbl), []).append(face_i)
ordered = sorted(groups.items(), key=lambda kv: -len(kv[1]))
for _gid, indices in ordered:
if len(indices) == 1:
assignments[indices[0]] = "_singletons"
else:
new_num += 1
folder = f"person_{new_num:03d}"
for i in indices:
assignments[i] = folder
new_persons = new_num - max_num
new_singletons = sum(1 for f in assignments.values() if f == "_singletons")
print(f"Phase 2 (new clusters): {new_persons} new person_NNN, {new_singletons} new singletons")
# Materialize: for each newly-assigned face, copy canonical + aliases to its folder.
raw_out.mkdir(parents=True, exist_ok=True)
copied_new = 0
for face_i, folder in assignments.items():
dst_dir = raw_out / folder
dst_dir.mkdir(parents=True, exist_ok=True)
m = face_records[face_i]
for p in _fan_paths(m, path_aliases):
src = Path(p)
if not src.exists():
continue
dst = dst_dir / safe_dst_name(src, src_root)
if not dst.exists():
_transfer(src, dst, mode)
copied_new += 1
# Also fan newly-added aliases of already-placed canonicals into their existing folders.
copied_aliases = 0
for face_i, folder in placed_idx_to_folder.items():
dst_dir = raw_out / folder
m = face_records[face_i]
for p in _fan_paths(m, path_aliases):
src = Path(p)
if not src.exists():
continue
dst = dst_dir / safe_dst_name(src, src_root)
if not dst.exists():
_transfer(src, dst, mode)
copied_aliases += 1
# Noface: idempotent drop into _noface/ for every noface record + aliases.
noface_dir = raw_out / "_noface"
noface_dir.mkdir(exist_ok=True)
copied_noface = 0
seen_noface: set[str] = set()
for m in noface_records:
for p in _fan_paths(m, path_aliases):
if p in seen_noface:
continue
seen_noface.add(p)
src = Path(p)
if not src.exists():
continue
dst = noface_dir / safe_dst_name(src, src_root)
if not dst.exists():
_transfer(src, dst, mode)
copied_noface += 1
print(f"Copied: {copied_new} new-face files, {copied_aliases} new aliases of existing placements, {copied_noface} noface")
# Rewrite raw manifest to include everything.
all_placements = dict(placed_idx_to_folder)
all_placements.update(assignments)
new_manifest = []
for i, folder in all_placements.items():
m = face_records[i]
new_manifest.append({
"image": Path(m["path"]).name,
"source": m["path"],
"aliases": path_aliases.get(m["path"], []),
"folder": folder,
"bbox": m.get("bbox"),
"det_score": m.get("det_score"),
"face_short": m.get("face_short"),
"blur": m.get("blur"),
"hash": m.get("hash"),
})
raw_manifest_path.write_text(json.dumps(new_manifest, indent=2))
print(f"Updated manifest -> {raw_manifest_path}")
if refine_out is None:
return
# ---------- extend facesets ---------- #
refine_manifest_path = refine_out / "refine_manifest.json"
if not refine_manifest_path.exists():
raise SystemExit(f"refine manifest not found: {refine_manifest_path}. Run 'refine' first.")
old_refine = json.loads(refine_manifest_path.read_text())
# Build faceset centroids from cache embeddings whose paths appear in the faceset's image list.
# Multiple face records per image may exist, so include all face_records whose path is in the set.
face_set_paths: dict[str, set[str]] = {f["name"]: set(f["images"]) for f in old_refine.get("facesets", [])}
faceset_names = sorted(face_set_paths.keys(), key=lambda s: int(s.split("_")[1]))
if not faceset_names:
print("No facesets to extend.")
return
faceset_vecs: dict[str, list[np.ndarray]] = {name: [] for name in faceset_names}
path_to_faceset: dict[str, str] = {}
for name, paths in face_set_paths.items():
for p in paths:
path_to_faceset[p] = name
# Identify which face records in the cache belong to which faceset (path-match).
# Collect embeddings for centroid calculation.
already_in_faceset: set[int] = set()
for i, m in enumerate(face_records):
name = path_to_faceset.get(m["path"])
if name:
faceset_vecs[name].append(emb[i])
already_in_faceset.add(i)
for name in list(faceset_vecs.keys()):
vecs = faceset_vecs[name]
if not vecs:
faceset_vecs[name] = None # type: ignore
continue
faceset_vecs[name] = _normalize(np.stack(vecs).mean(axis=0)) # type: ignore
live_names = [n for n, v in faceset_vecs.items() if v is not None]
faceset_cents = np.stack([faceset_vecs[n] for n in live_names]) # type: ignore
thr_fs_sim = 1.0 - refine_centroid_threshold
fs_assigned: dict[int, str] = {}
for face_i in unplaced_idx:
if face_i in already_in_faceset:
continue
m = face_records[face_i]
# Quality gate identical to refine defaults.
if m.get("face_short", 0) < refine_min_short:
continue
if m.get("blur", 0.0) < refine_min_blur:
continue
if m.get("det_score", 0.0) < refine_min_det_score:
continue
v = emb[face_i]
sims = faceset_cents @ v
best = int(np.argmax(sims))
if sims[best] >= thr_fs_sim:
fs_assigned[face_i] = live_names[best]
print(f"Faceset extend: {len(fs_assigned)} new faces qualify and match existing facesets")
fs_copied = 0
for face_i, name in fs_assigned.items():
m = face_records[face_i]
dst_dir = refine_out / name
dst_dir.mkdir(parents=True, exist_ok=True)
for p in _fan_paths(m, path_aliases):
src = Path(p)
if not src.exists():
continue
dst = dst_dir / safe_dst_name(src, src_root)
if not dst.exists():
_transfer(src, dst, mode)
fs_copied += 1
# Also fan new aliases of already-in-faceset canonicals.
fs_alias_copied = 0
for i in already_in_faceset:
m = face_records[i]
name = path_to_faceset[m["path"]]
dst_dir = refine_out / name
for p in _fan_paths(m, path_aliases):
src = Path(p)
if not src.exists():
continue
dst = dst_dir / safe_dst_name(src, src_root)
if not dst.exists():
_transfer(src, dst, mode)
fs_alias_copied += 1
print(f"Faceset copied: {fs_copied} new faces, {fs_alias_copied} new aliases")
# Update refine manifest with extended image lists.
new_fs_images: dict[str, set[str]] = {name: set(face_set_paths[name]) for name in faceset_names}
for face_i, name in fs_assigned.items():
new_fs_images[name].add(face_records[face_i]["path"])
for f in old_refine.get("facesets", []):
name = f["name"]
f["images"] = sorted(new_fs_images.get(name, set()))
f["image_count"] = len(f["images"])
f["alias_count"] = sum(len(path_aliases.get(p, [])) for p in f["images"])
old_refine["extended"] = True
refine_manifest_path.write_text(json.dumps(old_refine, indent=2))
print(f"Updated refine manifest -> {refine_manifest_path}")
# ---------- main ---------- #
def main() -> None:
p = argparse.ArgumentParser()
sub = p.add_subparsers(dest="cmd", required=True)
pe = sub.add_parser("embed")
pe.add_argument("src_dir", type=Path)
pe.add_argument("cache", type=Path)
pe.add_argument("--no-resume", action="store_true", help="ignore any existing cache at <cache> path")
pe.add_argument("--flush-every", type=int, default=FLUSH_DEFAULT)
pc = sub.add_parser("cluster")
pc.add_argument("cache", type=Path)
pc.add_argument("out_dir", type=Path)
pc.add_argument("--threshold", type=float, default=0.55)
pc.add_argument("--mode", choices=["copy", "move", "symlink"], default="copy")
pc.add_argument("--dry-run", action="store_true")
pr = sub.add_parser("refine")
pr.add_argument("cache", type=Path)
pr.add_argument("out_dir", type=Path)
pr.add_argument("--initial-threshold", type=float, default=0.55)
pr.add_argument("--merge-threshold", type=float, default=0.40)
pr.add_argument("--outlier-threshold", type=float, default=0.55)
pr.add_argument("--min-faces", type=int, default=15)
pr.add_argument("--min-short", type=int, default=90)
pr.add_argument("--min-blur", type=float, default=40.0)
pr.add_argument("--min-det-score", type=float, default=0.6)
pr.add_argument("--mode", choices=["copy", "move", "symlink"], default="copy")
pr.add_argument("--dry-run", action="store_true")
pd = sub.add_parser("dedup")
pd.add_argument("cache", type=Path)
pd.add_argument("--cos", type=float, default=0.03, help="cosine-distance threshold for visual dupes")
pd.add_argument("--out", type=Path, default=None)
px = sub.add_parser("extend", help="Add new embeddings to existing raw/refine dirs without renumbering")
px.add_argument("cache", type=Path)
px.add_argument("raw_out", type=Path, help="existing raw cluster dir (must contain manifest.json)")
px.add_argument("--refine-out", type=Path, default=None, help="optional existing facesets dir")
px.add_argument("--threshold", type=float, default=0.55, help="cosine-dist cutoff for matching new face to an existing person centroid")
px.add_argument("--new-cluster-threshold", type=float, default=0.55, help="threshold for clustering the unmatched new faces among themselves")
px.add_argument("--mode", choices=["copy", "move", "symlink"], default="copy")
px.add_argument("--refine-min-short", type=int, default=90)
px.add_argument("--refine-min-blur", type=float, default=40.0)
px.add_argument("--refine-min-det-score", type=float, default=0.6)
px.add_argument("--refine-centroid-threshold", type=float, default=0.55)
args = p.parse_args()
if args.cmd == "embed":
cmd_embed(args.src_dir, args.cache, resume=not args.no_resume, flush_every=args.flush_every)
elif args.cmd == "cluster":
cmd_cluster(args.cache, args.out_dir, args.threshold, args.mode, args.dry_run)
elif args.cmd == "refine":
cmd_refine(
args.cache, args.out_dir,
args.initial_threshold, args.merge_threshold, args.outlier_threshold,
args.min_faces, args.min_short, args.min_blur, args.min_det_score,
args.mode, args.dry_run,
)
elif args.cmd == "dedup":
cmd_dedup(args.cache, args.cos, args.out)
elif args.cmd == "extend":
cmd_extend(
args.cache, args.raw_out, args.refine_out,
args.threshold, args.new_cluster_threshold, args.mode,
args.refine_min_short, args.refine_min_blur, args.refine_min_det_score,
args.refine_centroid_threshold,
)
if __name__ == "__main__":
main()