Files
face-sets/sort_faces.py
Peter d53ab9fbfc Add enrich + export-swap pipeline for downstream face-swap ready output
- enrich: re-detects each cached face with buffalo_l (detection +
  landmark_2d_106 + landmark_3d_68, recognition module skipped for speed)
  and persists landmarks + pose into the cache so per-face frontality and
  landmark-symmetry quality signals become available.
- compute_quality: composite score combining det_score, face short-edge,
  blur, frontality (from pose pitch/yaw), and 2D-landmark symmetry with
  tunable weights. Default weighting 0.30/0.20/0.20/0.15/0.15.
- export-swap: builds facesets_swap_ready/ from an existing refine
  manifest. Per identity: tighter outlier gate (default 0.45), visual-
  near-dupe collapse (keep best representative per group), multi-face-
  per-source-image collapse (keep best bbox), rank by composite score,
  single-face-per-PNG crops at 512x512 with 0.5 bbox padding, ready-to-
  drop .fsz bundles (top-N + full), per-faceset manifest.json, NAME.txt
  placeholder for the operator. The multi-face-per-PNG collapse is the
  critical fix: roop-unleashed's .fsz loader appends every detected face
  in each PNG to the FaceSet, so any multi-face crop would contaminate
  the averaged embedding.
- Optional --candidates rescues raw_full singletons: matches against the
  final per-faceset centroids and routes to _candidates/to_<faceset>/
  for manual review; orphaned singletons that still cluster among
  themselves land in _candidates/new_<NNN>/.
- docs/analysis/: evaluation document captures the evidence, downstream
  requirements (FaceSet averaging, inswapper_128), opportunity matrix
  (R1-R14), and the recommended target state this export implements.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 22:37:32 +02:00

1561 lines
60 KiB
Python

"""Sort photos by similar faces using InsightFace embeddings + agglomerative clustering.
Subcommands:
embed <src_dir> <cache.npz> recursively scan, detect+embed faces
cluster <cache.npz> <out_dir> [opts] raw agglomerative clustering -> person_NNN/
refine <cache.npz> <out_dir> [opts] merge + outlier + quality pass -> faceset-ready folders
dedup <cache.npz> post-hoc visual near-duplicate analysis
Dedup model (for embed):
At listing time every eligible file is sha256-hashed and grouped. Each hash-group
is embedded exactly once; other paths with the same hash are carried as `aliases`
on the canonical meta record. cluster/refine materialize every alias so each
on-disk location ends up represented in the output.
Cache format (v2):
embeddings (N, 512) float32
meta JSON list of dicts, one per face record; fields:
path, aliases[], hash, face_idx, det_score, bbox,
face_short, face_area, blur, noface
src_root absolute source root of the latest embed/resume run
processed_paths JSON list of every path whose fate is decided
(embedded, noface, aliased, or load-error)
schema "v2"
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import shutil
import sys
import time
from pathlib import Path
import numpy as np
from PIL import Image, ImageOps
from tqdm import tqdm
IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff", ".webp", ".heic", ".gif"}
SKIP_EXTS = {
".psd", ".avi", ".mov", ".mp4", ".mkv", ".m4v", ".wmv", ".webm",
".mpg", ".mpeg", ".flv", ".3gp", ".m2ts", ".mts",
".zip", ".rar", ".7z", ".tar", ".gz",
".ini", ".db", ".txt", ".log", ".xmp", ".thm",
}
MIN_DET_SCORE = 0.5
MIN_FACE_PIX = 40
HASH_CHUNK = 1 << 20
FLUSH_DEFAULT = 50
def sha256_of(path: Path, chunk: int = HASH_CHUNK) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
while True:
b = f.read(chunk)
if not b:
break
h.update(b)
return h.hexdigest()
def list_eligible(src: Path) -> tuple[list[Path], dict[str, int]]:
"""Recursive scan; returns (kept_paths, skipped_counts_by_ext)."""
kept: list[Path] = []
skipped: dict[str, int] = {}
for p in src.rglob("*"):
if not p.is_file():
continue
ext = p.suffix.lower()
if ext in IMG_EXTS:
kept.append(p)
else:
skipped[ext or "<noext>"] = skipped.get(ext or "<noext>", 0) + 1
return sorted(kept), skipped
def load_rgb_bgr(path: Path):
try:
with Image.open(path) as im:
im = ImageOps.exif_transpose(im)
im = im.convert("RGB")
rgb = np.array(im)
bgr = rgb[:, :, ::-1].copy()
return rgb, bgr
except Exception as e:
print(f"[warn] failed to load {path}: {e}", file=sys.stderr)
return None, None
def laplacian_variance(gray: np.ndarray) -> float:
g = gray.astype(np.float32)
lap = (
-4.0 * g[1:-1, 1:-1]
+ g[:-2, 1:-1] + g[2:, 1:-1]
+ g[1:-1, :-2] + g[1:-1, 2:]
)
return float(lap.var())
def safe_dst_name(path: Path, root: Path | None = None) -> str:
# Always flatten the absolute path. Root-relative names would change when
# the cache src_root moves between runs (e.g. during extend), producing
# duplicate copies in the same folder under different filenames.
flat = str(path).lstrip("/").replace("/", "__").replace("\\", "__").replace(" ", "_")
return flat
# ---------- cache I/O ---------- #
def load_cache(cache_path: Path) -> tuple[np.ndarray, list[dict], Path | None, set[str], dict[str, list[str]]]:
data = np.load(cache_path, allow_pickle=True)
emb = data["embeddings"]
meta = json.loads(str(data["meta"]))
src_root = Path(str(data["src_root"])) if "src_root" in data.files else None
if "processed_paths" in data.files:
processed = set(json.loads(str(data["processed_paths"])))
else:
processed = {m["path"] for m in meta}
path_aliases: dict[str, list[str]] = {}
if "path_aliases" in data.files:
path_aliases = json.loads(str(data["path_aliases"]))
else:
# v2a legacy: aliases lived inside meta records; migrate up.
for m in meta:
al = m.get("aliases")
if al:
path_aliases.setdefault(m["path"], [])
for a in al:
if a not in path_aliases[m["path"]]:
path_aliases[m["path"]].append(a)
for m in meta:
m.pop("aliases", None)
m.setdefault("hash", None)
return emb, meta, src_root, processed, path_aliases
def save_cache(
cache_path: Path,
emb: np.ndarray,
meta: list[dict],
src_root: Path,
processed: set[str],
path_aliases: dict[str, list[str]],
) -> None:
# np.savez auto-appends ".npz" unless the name already ends with it, so
# the tmp file must end in ".npz" to avoid a double-suffix and a broken rename.
tmp = cache_path.with_suffix(".tmp.npz")
np.savez(
str(tmp),
embeddings=emb if len(emb) else np.zeros((0, 512), dtype=np.float32),
meta=json.dumps(meta),
src_root=str(src_root),
processed_paths=json.dumps(sorted(processed)),
path_aliases=json.dumps(path_aliases),
schema="v2",
)
os.replace(tmp, cache_path)
# ---------- embed ---------- #
def cmd_embed(src_dir: Path, cache_path: Path, resume: bool, flush_every: int) -> None:
from insightface.app import FaceAnalysis
t0 = time.time()
images, skipped = list_eligible(src_dir)
print(f"Found {len(images)} candidate images under {src_dir}")
if skipped:
top = sorted(skipped.items(), key=lambda kv: -kv[1])
print("Skipped non-image files: " + ", ".join(f"{ext}={n}" for ext, n in top[:10]))
emb_list: list[np.ndarray] = []
meta: list[dict] = []
processed: set[str] = set()
path_aliases: dict[str, list[str]] = {}
hash_to_canon: dict[str, str] = {} # hash -> canonical path (covers both face and noface)
if resume and cache_path.exists():
print(f"Resume: loading existing cache {cache_path}")
old_emb, old_meta, _old_root, old_processed, old_aliases = load_cache(cache_path)
emb_list = [old_emb] if len(old_emb) else []
meta = list(old_meta)
processed = set(old_processed)
path_aliases = {k: list(v) for k, v in old_aliases.items()}
need_hash = [m for m in meta if not m.get("hash")]
if need_hash:
print(f"Backfilling hashes for {len(need_hash)} existing records")
for m in tqdm(need_hash, desc="rehash"):
p = Path(m["path"])
if p.exists():
try:
m["hash"] = sha256_of(p)
except Exception as e:
print(f"[warn] rehash failed {p}: {e}", file=sys.stderr)
for m in meta:
h = m.get("hash")
if h and h not in hash_to_canon:
hash_to_canon[h] = m["path"]
print("Hashing candidates...")
to_embed: list[tuple[Path, str]] = []
alias_added = 0
alias_scanned = 0
for p in tqdm(images, desc="hashing"):
ps = str(p)
if ps in processed:
continue
try:
h = sha256_of(p)
except Exception as e:
print(f"[warn] hash failed {p}: {e}", file=sys.stderr)
processed.add(ps)
continue
if h in hash_to_canon:
canon = hash_to_canon[h]
alias_scanned += 1
if ps != canon:
lst = path_aliases.setdefault(canon, [])
if ps not in lst:
lst.append(ps)
alias_added += 1
processed.add(ps)
else:
to_embed.append((p, h))
hash_to_canon[h] = ps
if alias_scanned:
print(f"Byte-dedup: {alias_added} paths aliased to existing canonicals ({alias_scanned} dupes scanned)")
print(f"To embed: {len(to_embed)} unique files")
if not to_embed:
save_cache(cache_path, np.concatenate(emb_list) if emb_list else np.zeros((0, 512), np.float32), meta, src_dir, processed, path_aliases)
print(f"Cache written to {cache_path} (no new embeddings)")
return
app = FaceAnalysis(name="buffalo_l", providers=["CPUExecutionProvider"])
app.prepare(ctx_id=-1, det_size=(640, 640))
new_emb_chunks: list[np.ndarray] = []
new_records: list[dict] = []
since_flush = 0
emb_total_before = sum(e.shape[0] for e in emb_list)
t_embed = time.time()
def flush():
nonlocal emb_list, new_emb_chunks, new_records, since_flush
if not new_emb_chunks and not new_records:
return
if new_emb_chunks:
emb_list.append(np.concatenate(new_emb_chunks))
new_emb_chunks = []
for r in new_records:
meta.append(r)
new_records = []
save_cache(cache_path, np.concatenate(emb_list) if emb_list else np.zeros((0, 512), np.float32), meta, src_dir, processed, path_aliases)
since_flush = 0
try:
for p, h in tqdm(to_embed, desc="embedding"):
ps = str(p)
rgb, bgr = load_rgb_bgr(p)
if bgr is None:
new_records.append({"path": ps, "face_idx": -1, "noface": True, "hash": h, "error": "load"})
processed.add(ps)
since_flush += 1
if since_flush >= flush_every:
flush()
continue
faces = app.get(bgr)
kept_any = False
for i, f in enumerate(faces):
if float(f.det_score) < MIN_DET_SCORE:
continue
x1, y1, x2, y2 = [int(round(v)) for v in f.bbox]
x1, y1 = max(x1, 0), max(y1, 0)
x2, y2 = min(x2, rgb.shape[1]), min(y2, rgb.shape[0])
w, hh = x2 - x1, y2 - y1
short = min(w, hh)
if short < MIN_FACE_PIX:
continue
crop = rgb[y1:y2, x1:x2]
if crop.size == 0:
continue
gray = crop.mean(axis=2)
blur = laplacian_variance(gray) if min(gray.shape) > 3 else 0.0
emb = f.normed_embedding.astype(np.float32)
new_emb_chunks.append(emb[None, :])
new_records.append({
"path": ps,
"face_idx": i,
"det_score": float(f.det_score),
"bbox": [x1, y1, x2, y2],
"face_short": int(short),
"face_area": int(w * hh),
"blur": blur,
"noface": False,
"hash": h,
})
kept_any = True
if not kept_any:
new_records.append({"path": ps, "face_idx": -1, "noface": True, "hash": h})
processed.add(ps)
since_flush += 1
if since_flush >= flush_every:
flush()
finally:
flush()
emb_total_after = sum(e.shape[0] for e in emb_list)
dt = time.time() - t_embed
print(f"Embedded {emb_total_after - emb_total_before} new faces across {len(to_embed)} files in {dt:.1f}s")
noface_count = sum(1 for m in meta if m.get("noface"))
alias_total = sum(len(v) for v in path_aliases.values())
print(f"Cache totals: {emb_total_after} faces, {noface_count} noface, {alias_total} alias paths")
print(f"Wrote {cache_path} (wall {time.time()-t0:.1f}s)")
# ---------- cluster / refine helpers ---------- #
def _fan_paths(rec: dict, path_aliases: dict[str, list[str]]) -> list[str]:
return [rec["path"]] + list(path_aliases.get(rec["path"], []))
def _transfer(src: Path, dst: Path, mode: str) -> None:
if dst.exists():
return
if mode == "copy":
shutil.copy2(src, dst)
elif mode == "move":
shutil.move(str(src), str(dst))
elif mode == "symlink":
dst.symlink_to(src)
def _cluster_embeddings(emb: np.ndarray, threshold: float) -> np.ndarray:
from sklearn.cluster import AgglomerativeClustering
clusterer = AgglomerativeClustering(
n_clusters=None,
distance_threshold=threshold,
metric="cosine",
linkage="average",
)
return clusterer.fit_predict(emb)
# ---------- cluster ---------- #
def cmd_cluster(cache_path: Path, out_dir: Path, threshold: float, mode: str, dry_run: bool) -> None:
emb, meta, src_root, _, path_aliases = load_cache(cache_path)
if src_root is None:
src_root = Path("/")
face_records = [m for m in meta if not m.get("noface")]
noface_records = [m for m in meta if m.get("noface")]
if len(face_records) != len(emb):
raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}")
if len(emb) == 0:
print("No faces detected; nothing to cluster.")
return
print(f"Clustering {len(emb)} face embeddings (threshold={threshold})")
labels = _cluster_embeddings(emb, threshold)
clusters: dict[int, list[dict]] = {}
for rec, lbl in zip(face_records, labels):
rec = dict(rec)
rec["cluster"] = int(lbl)
clusters.setdefault(int(lbl), []).append(rec)
ordered = sorted(clusters.items(), key=lambda kv: (-len(kv[1]), kv[0]))
sizes = [len(v) for _, v in ordered]
singletons = sum(1 for s in sizes if s == 1)
print(f"Clusters: {len(ordered)} | top sizes: {sizes[:15]}")
print(f"Multi-face clusters: {len(sizes) - singletons} singletons: {singletons}")
print(f"No-face images: {len(noface_records)}")
if dry_run:
for cid, recs in ordered[:20]:
imgs = {r["path"] for r in recs}
print(f" cluster {cid:3d} faces={len(recs):3d} imgs={len(imgs)}")
return
out_dir.mkdir(parents=True, exist_ok=True)
rank = 0
cluster_dir: dict[int, Path] = {}
for cid, recs in ordered:
if len(recs) == 1:
cluster_dir[cid] = out_dir / "_singletons"
else:
rank += 1
cluster_dir[cid] = out_dir / f"person_{rank:03d}"
cluster_dir[cid].mkdir(parents=True, exist_ok=True)
per_cluster_recs: dict[int, list[dict]] = {cid: [] for cid, _ in ordered}
for cid, recs in ordered:
seen = set()
for r in recs:
if r["path"] in seen:
continue
seen.add(r["path"])
per_cluster_recs[cid].append(r)
total_paths = sum(len(_fan_paths(r, path_aliases)) for v in per_cluster_recs.values() for r in v)
print(f"Placing {total_paths} file instances (incl. aliases, mode={mode}) -> {out_dir}")
for cid, recs in tqdm(per_cluster_recs.items(), desc="transferring"):
dst_dir = cluster_dir[cid]
for r in recs:
for p in _fan_paths(r, path_aliases):
src = Path(p)
if not src.exists():
continue
_transfer(src, dst_dir / safe_dst_name(src, src_root), mode)
if noface_records:
noface_dir = out_dir / "_noface"
noface_dir.mkdir(exist_ok=True)
seen_noface: set[str] = set()
for r in noface_records:
for p in _fan_paths(r, path_aliases):
if p in seen_noface:
continue
seen_noface.add(p)
src = Path(p)
if not src.exists():
continue
_transfer(src, noface_dir / safe_dst_name(src, src_root), mode)
print(f"{len(noface_records)} no-face images -> {noface_dir}")
manifest = []
for cid, recs in ordered:
for r in recs:
manifest.append({
"image": Path(r["path"]).name,
"source": r["path"],
"aliases": path_aliases.get(r["path"], []),
"cluster": cid,
"folder": cluster_dir[cid].name,
"bbox": r.get("bbox"),
"det_score": r.get("det_score"),
"face_short": r.get("face_short"),
"blur": r.get("blur"),
"hash": r.get("hash"),
})
(out_dir / "manifest.json").write_text(json.dumps(manifest, indent=2))
print(f"Manifest -> {out_dir / 'manifest.json'}")
# ---------- refine ---------- #
def _cluster_centroids(emb: np.ndarray, labels: np.ndarray) -> tuple[np.ndarray, list[int]]:
ids = sorted(set(int(l) for l in labels))
cents = []
for cid in ids:
mask = labels == cid
v = emb[mask].mean(axis=0)
n = np.linalg.norm(v)
if n > 0:
v = v / n
cents.append(v)
return np.stack(cents), ids
def cmd_refine(
cache_path: Path,
out_dir: Path,
initial_threshold: float,
merge_threshold: float,
outlier_threshold: float,
min_faces: int,
min_short: int,
min_blur: float,
min_det_score: float,
mode: str,
dry_run: bool,
) -> None:
emb, meta, src_root, _, path_aliases = load_cache(cache_path)
if src_root is None:
src_root = Path("/")
face_records = [m for m in meta if not m.get("noface")]
if len(face_records) != len(emb):
raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}")
print(f"Stage 1: initial clustering (threshold={initial_threshold})")
labels = _cluster_embeddings(emb, initial_threshold)
cents, cent_ids = _cluster_centroids(emb, labels)
print(f"Stage 2: centroid merge on {len(cent_ids)} clusters (merge_threshold={merge_threshold})")
cent_labels = _cluster_embeddings(cents, merge_threshold) if len(cents) > 1 else np.zeros(1, dtype=int)
label_map = {cid: int(ml) for cid, ml in zip(cent_ids, cent_labels)}
merged = np.array([label_map[int(l)] for l in labels])
clusters: dict[int, list[tuple[int, dict]]] = {}
for idx, (rec, lbl) in enumerate(zip(face_records, merged)):
clusters.setdefault(int(lbl), []).append((idx, dict(rec)))
print(f"After merge: {len(clusters)} clusters")
kept_by_cluster: dict[int, list[tuple[int, dict]]] = {}
dropped_quality = 0
dropped_outlier = 0
for cid, items in clusters.items():
idxs = [i for i, _ in items]
cvecs = emb[idxs]
c = cvecs.mean(axis=0)
n = np.linalg.norm(c)
if n > 0:
c = c / n
kept: list[tuple[int, dict]] = []
for (idx, rec), v in zip(items, cvecs):
if rec.get("face_short", 0) < min_short:
dropped_quality += 1
continue
if rec.get("blur", 0.0) < min_blur:
dropped_quality += 1
continue
if rec.get("det_score", 0.0) < min_det_score:
dropped_quality += 1
continue
if len(items) >= 4:
cos_dist = 1.0 - float(v @ c)
if cos_dist > outlier_threshold:
dropped_outlier += 1
continue
kept.append((idx, rec))
if kept:
kept_by_cluster[cid] = kept
print(f"Dropped {dropped_quality} faces by quality gate, {dropped_outlier} as outliers")
final: list[tuple[int, list[tuple[int, dict]]]] = []
for cid, items in kept_by_cluster.items():
unique_imgs = {rec["path"] for _, rec in items}
if len(unique_imgs) >= min_faces:
final.append((cid, items))
final.sort(key=lambda kv: -len(kv[1]))
print(f"Facesets meeting min_faces={min_faces}: {len(final)}")
for rank, (_cid, items) in enumerate(final, 1):
unique_imgs = {rec["path"] for _, rec in items}
total_aliases = sum(len(path_aliases.get(p, [])) for p in unique_imgs)
print(f" faceset_{rank:03d}: faces={len(items):3d} imgs={len(unique_imgs):3d} aliases={total_aliases}")
if dry_run:
return
out_dir.mkdir(parents=True, exist_ok=True)
for rank, (_cid, items) in enumerate(final, 1):
dst_dir = out_dir / f"faceset_{rank:03d}"
dst_dir.mkdir(exist_ok=True)
seen_paths: set[str] = set()
for _, rec in items:
for p in _fan_paths(rec, path_aliases):
if p in seen_paths:
continue
seen_paths.add(p)
src = Path(p)
if not src.exists():
continue
_transfer(src, dst_dir / safe_dst_name(src, src_root), mode)
manifest = {
"params": {
"initial_threshold": initial_threshold,
"merge_threshold": merge_threshold,
"outlier_threshold": outlier_threshold,
"min_faces": min_faces,
"min_short": min_short,
"min_blur": min_blur,
"min_det_score": min_det_score,
},
"facesets": [
{
"name": f"faceset_{rank:03d}",
"face_count": len(items),
"image_count": len({rec["path"] for _, rec in items}),
"alias_count": sum(len(path_aliases.get(p, [])) for p in {rec["path"] for _, rec in items}),
"images": sorted({rec["path"] for _, rec in items}),
}
for rank, (_cid, items) in enumerate(final, 1)
],
}
(out_dir / "refine_manifest.json").write_text(json.dumps(manifest, indent=2))
print(f"Refine manifest -> {out_dir / 'refine_manifest.json'}")
# ---------- dedup (post-hoc visual) ---------- #
def cmd_dedup(cache_path: Path, cos_threshold: float, out_path: Path | None) -> None:
emb, meta, _src_root, _proc, path_aliases = load_cache(cache_path)
face_records = [m for m in meta if not m.get("noface")]
if len(face_records) != len(emb):
raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}")
byte_groups: dict[str, list[str]] = {}
# De-duplicate: one group per canonical path that has aliases.
seen_canon: set[str] = set()
for m in face_records:
h = m.get("hash")
p = m["path"]
if not h or p in seen_canon:
continue
seen_canon.add(p)
aliases = path_aliases.get(p, [])
if aliases:
byte_groups[h] = [p] + list(aliases)
n = len(emb)
parent = list(range(n))
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(a, b):
ra, rb = find(a), find(b)
if ra != rb:
parent[rb] = ra
print(f"Scanning {n} face embeddings for visual near-duplicates (cos<={cos_threshold})...")
block = 512
thr = 1.0 - cos_threshold
for i in range(0, n, block):
a = emb[i:i+block]
sims = a @ emb.T
for row_i, row in enumerate(sims):
global_i = i + row_i
hits = np.where(row >= thr)[0]
for j in hits:
if j <= global_i:
continue
ra = face_records[global_i]
rb = face_records[int(j)]
if ra["path"] == rb["path"]:
continue
sa, sb = ra.get("face_short", 0), rb.get("face_short", 0)
if sa and sb and max(sa, sb) / max(min(sa, sb), 1) > 1.15:
continue
union(global_i, int(j))
visual_groups: dict[int, list[int]] = {}
for i in range(n):
r = find(i)
visual_groups.setdefault(r, []).append(i)
vg_out = []
for idxs in visual_groups.values():
paths = sorted({face_records[i]["path"] for i in idxs})
if len(paths) > 1:
vg_out.append(paths)
out_path = out_path or cache_path.with_suffix(".duplicates.json")
out_path.write_text(json.dumps({
"byte_groups": [sorted(v) for v in byte_groups.values()],
"visual_groups": sorted(vg_out, key=len, reverse=True),
"visual_cos_threshold": cos_threshold,
}, indent=2))
print(f"Byte-identical groups: {len(byte_groups)}")
print(f"Visual near-dupe groups (cross-file, size-match): {len(vg_out)}")
print(f"Report -> {out_path}")
# ---------- extend (incremental, preserves existing folder numbering) ---------- #
def _normalize(v: np.ndarray) -> np.ndarray:
n = np.linalg.norm(v)
return v / n if n > 0 else v
def cmd_extend(
cache_path: Path,
raw_out: Path,
refine_out: Path | None,
match_threshold: float,
new_cluster_threshold: float,
mode: str,
refine_min_short: int,
refine_min_blur: float,
refine_min_det_score: float,
refine_centroid_threshold: float,
) -> None:
emb, meta, src_root, _processed, path_aliases = load_cache(cache_path)
if src_root is None:
src_root = Path("/")
raw_manifest_path = raw_out / "manifest.json"
if not raw_manifest_path.exists():
raise SystemExit(f"raw manifest not found: {raw_manifest_path}. Run 'cluster' first.")
old_raw = json.loads(raw_manifest_path.read_text())
# (path, bbox_tuple) -> folder name
entry_to_folder: dict[tuple[str, tuple | None], str] = {}
for e in old_raw:
key = (e["source"], tuple(e["bbox"]) if e.get("bbox") else None)
entry_to_folder[key] = e["folder"]
face_records = [m for m in meta if not m.get("noface")]
noface_records = [m for m in meta if m.get("noface")]
if len(face_records) != len(emb):
raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}")
placed_idx_to_folder: dict[int, str] = {}
unplaced_idx: list[int] = []
for i, m in enumerate(face_records):
key = (m["path"], tuple(m["bbox"]) if m.get("bbox") else None)
if key in entry_to_folder:
placed_idx_to_folder[i] = entry_to_folder[key]
else:
unplaced_idx.append(i)
print(f"Cache: {len(emb)} face embeddings, {len(placed_idx_to_folder)} already placed, {len(unplaced_idx)} unplaced")
# Per-person centroids from already-placed embeddings (skip _singletons/_noface).
from collections import defaultdict
folder_vecs: dict[str, list[np.ndarray]] = defaultdict(list)
for i, folder in placed_idx_to_folder.items():
folder_vecs[folder].append(emb[i])
person_folders = sorted(
[f for f in folder_vecs if f.startswith("person_")],
key=lambda s: int(s.split("_")[1]),
)
if not person_folders:
raise SystemExit("no person_NNN folders found in existing manifest")
person_cents = np.stack([_normalize(np.stack(folder_vecs[f]).mean(axis=0)) for f in person_folders])
max_num = max(int(f.split("_")[1]) for f in person_folders)
# Phase 1: nearest-centroid assignment.
assignments: dict[int, str] = {}
unmatched_idx: list[int] = []
thr_sim = 1.0 - match_threshold
for face_i in unplaced_idx:
v = emb[face_i]
sims = person_cents @ v
best = int(np.argmax(sims))
if sims[best] >= thr_sim:
assignments[face_i] = person_folders[best]
else:
unmatched_idx.append(face_i)
print(f"Phase 1 (nearest person): {len(assignments)} matched, {len(unmatched_idx)} unmatched")
# Phase 2: cluster the unmatched among themselves into new person_XXX or _singletons.
new_num = max_num
if unmatched_idx:
u_vecs = np.stack([emb[i] for i in unmatched_idx])
labels = _cluster_embeddings(u_vecs, new_cluster_threshold) if len(u_vecs) > 1 else np.zeros(1, dtype=int)
groups: dict[int, list[int]] = {}
for face_i, lbl in zip(unmatched_idx, labels):
groups.setdefault(int(lbl), []).append(face_i)
ordered = sorted(groups.items(), key=lambda kv: -len(kv[1]))
for _gid, indices in ordered:
if len(indices) == 1:
assignments[indices[0]] = "_singletons"
else:
new_num += 1
folder = f"person_{new_num:03d}"
for i in indices:
assignments[i] = folder
new_persons = new_num - max_num
new_singletons = sum(1 for f in assignments.values() if f == "_singletons")
print(f"Phase 2 (new clusters): {new_persons} new person_NNN, {new_singletons} new singletons")
# Materialize: for each newly-assigned face, copy canonical + aliases to its folder.
raw_out.mkdir(parents=True, exist_ok=True)
copied_new = 0
for face_i, folder in assignments.items():
dst_dir = raw_out / folder
dst_dir.mkdir(parents=True, exist_ok=True)
m = face_records[face_i]
for p in _fan_paths(m, path_aliases):
src = Path(p)
if not src.exists():
continue
dst = dst_dir / safe_dst_name(src, src_root)
if not dst.exists():
_transfer(src, dst, mode)
copied_new += 1
# Also fan newly-added aliases of already-placed canonicals into their existing folders.
copied_aliases = 0
for face_i, folder in placed_idx_to_folder.items():
dst_dir = raw_out / folder
m = face_records[face_i]
for p in _fan_paths(m, path_aliases):
src = Path(p)
if not src.exists():
continue
dst = dst_dir / safe_dst_name(src, src_root)
if not dst.exists():
_transfer(src, dst, mode)
copied_aliases += 1
# Noface: idempotent drop into _noface/ for every noface record + aliases.
noface_dir = raw_out / "_noface"
noface_dir.mkdir(exist_ok=True)
copied_noface = 0
seen_noface: set[str] = set()
for m in noface_records:
for p in _fan_paths(m, path_aliases):
if p in seen_noface:
continue
seen_noface.add(p)
src = Path(p)
if not src.exists():
continue
dst = noface_dir / safe_dst_name(src, src_root)
if not dst.exists():
_transfer(src, dst, mode)
copied_noface += 1
print(f"Copied: {copied_new} new-face files, {copied_aliases} new aliases of existing placements, {copied_noface} noface")
# Rewrite raw manifest to include everything.
all_placements = dict(placed_idx_to_folder)
all_placements.update(assignments)
new_manifest = []
for i, folder in all_placements.items():
m = face_records[i]
new_manifest.append({
"image": Path(m["path"]).name,
"source": m["path"],
"aliases": path_aliases.get(m["path"], []),
"folder": folder,
"bbox": m.get("bbox"),
"det_score": m.get("det_score"),
"face_short": m.get("face_short"),
"blur": m.get("blur"),
"hash": m.get("hash"),
})
raw_manifest_path.write_text(json.dumps(new_manifest, indent=2))
print(f"Updated manifest -> {raw_manifest_path}")
if refine_out is None:
return
# ---------- extend facesets ---------- #
refine_manifest_path = refine_out / "refine_manifest.json"
if not refine_manifest_path.exists():
raise SystemExit(f"refine manifest not found: {refine_manifest_path}. Run 'refine' first.")
old_refine = json.loads(refine_manifest_path.read_text())
# Build faceset centroids from cache embeddings whose paths appear in the faceset's image list.
# Multiple face records per image may exist, so include all face_records whose path is in the set.
face_set_paths: dict[str, set[str]] = {f["name"]: set(f["images"]) for f in old_refine.get("facesets", [])}
faceset_names = sorted(face_set_paths.keys(), key=lambda s: int(s.split("_")[1]))
if not faceset_names:
print("No facesets to extend.")
return
faceset_vecs: dict[str, list[np.ndarray]] = {name: [] for name in faceset_names}
path_to_faceset: dict[str, str] = {}
for name, paths in face_set_paths.items():
for p in paths:
path_to_faceset[p] = name
# Identify which face records in the cache belong to which faceset (path-match).
# Collect embeddings for centroid calculation.
already_in_faceset: set[int] = set()
for i, m in enumerate(face_records):
name = path_to_faceset.get(m["path"])
if name:
faceset_vecs[name].append(emb[i])
already_in_faceset.add(i)
for name in list(faceset_vecs.keys()):
vecs = faceset_vecs[name]
if not vecs:
faceset_vecs[name] = None # type: ignore
continue
faceset_vecs[name] = _normalize(np.stack(vecs).mean(axis=0)) # type: ignore
live_names = [n for n, v in faceset_vecs.items() if v is not None]
faceset_cents = np.stack([faceset_vecs[n] for n in live_names]) # type: ignore
thr_fs_sim = 1.0 - refine_centroid_threshold
fs_assigned: dict[int, str] = {}
for face_i in unplaced_idx:
if face_i in already_in_faceset:
continue
m = face_records[face_i]
# Quality gate identical to refine defaults.
if m.get("face_short", 0) < refine_min_short:
continue
if m.get("blur", 0.0) < refine_min_blur:
continue
if m.get("det_score", 0.0) < refine_min_det_score:
continue
v = emb[face_i]
sims = faceset_cents @ v
best = int(np.argmax(sims))
if sims[best] >= thr_fs_sim:
fs_assigned[face_i] = live_names[best]
print(f"Faceset extend: {len(fs_assigned)} new faces qualify and match existing facesets")
fs_copied = 0
for face_i, name in fs_assigned.items():
m = face_records[face_i]
dst_dir = refine_out / name
dst_dir.mkdir(parents=True, exist_ok=True)
for p in _fan_paths(m, path_aliases):
src = Path(p)
if not src.exists():
continue
dst = dst_dir / safe_dst_name(src, src_root)
if not dst.exists():
_transfer(src, dst, mode)
fs_copied += 1
# Also fan new aliases of already-in-faceset canonicals.
fs_alias_copied = 0
for i in already_in_faceset:
m = face_records[i]
name = path_to_faceset[m["path"]]
dst_dir = refine_out / name
for p in _fan_paths(m, path_aliases):
src = Path(p)
if not src.exists():
continue
dst = dst_dir / safe_dst_name(src, src_root)
if not dst.exists():
_transfer(src, dst, mode)
fs_alias_copied += 1
print(f"Faceset copied: {fs_copied} new faces, {fs_alias_copied} new aliases")
# Update refine manifest with extended image lists.
new_fs_images: dict[str, set[str]] = {name: set(face_set_paths[name]) for name in faceset_names}
for face_i, name in fs_assigned.items():
new_fs_images[name].add(face_records[face_i]["path"])
for f in old_refine.get("facesets", []):
name = f["name"]
f["images"] = sorted(new_fs_images.get(name, set()))
f["image_count"] = len(f["images"])
f["alias_count"] = sum(len(path_aliases.get(p, [])) for p in f["images"])
old_refine["extended"] = True
refine_manifest_path.write_text(json.dumps(old_refine, indent=2))
print(f"Updated refine manifest -> {refine_manifest_path}")
# ---------- enrich (landmarks + pose per face record) ---------- #
def _pick_face_for_bbox(faces: list, stored_bbox: list[int]):
"""Given freshly-detected faces and a stored bbox, return the detected face whose
bbox has the highest IoU with stored_bbox (or None if no overlap)."""
if not faces:
return None
sx1, sy1, sx2, sy2 = stored_bbox
sa = max(1, (sx2 - sx1) * (sy2 - sy1))
best = None
best_iou = 0.0
for f in faces:
x1, y1, x2, y2 = [int(round(v)) for v in f.bbox]
ix1, iy1 = max(sx1, x1), max(sy1, y1)
ix2, iy2 = min(sx2, x2), min(sy2, y2)
if ix2 <= ix1 or iy2 <= iy1:
continue
inter = (ix2 - ix1) * (iy2 - iy1)
fa = max(1, (x2 - x1) * (y2 - y1))
union = sa + fa - inter
iou = inter / union
if iou > best_iou:
best_iou = iou
best = f
return best if best_iou >= 0.3 else None
def cmd_enrich(cache_path: Path, force: bool, flush_every: int) -> None:
"""Re-detect every face record's source image to persist landmarks + pose.
Skips the recognition module (we already have embeddings) so detection + the two
landmark models are the only ones loaded.
"""
emb, meta, src_root, processed, path_aliases = load_cache(cache_path)
if src_root is None:
src_root = Path("/")
to_do: list[int] = []
for i, m in enumerate(meta):
if m.get("noface"):
continue
if force or not m.get("pose"):
to_do.append(i)
if not to_do:
print("Enrich: nothing to do; every face record already has pose.")
return
# Group indices by source path so each image is decoded exactly once.
path_to_indices: dict[str, list[int]] = {}
for i in to_do:
path_to_indices.setdefault(meta[i]["path"], []).append(i)
print(f"Enrich: {len(to_do)} face records to enrich across {len(path_to_indices)} unique files")
from insightface.app import FaceAnalysis
app = FaceAnalysis(
name="buffalo_l",
providers=["CPUExecutionProvider"],
allowed_modules=["detection", "landmark_2d_106", "landmark_3d_68"],
)
app.prepare(ctx_id=-1, det_size=(640, 640))
since_flush = 0
missing = 0
ok = 0
try:
for path, idxs in tqdm(path_to_indices.items(), desc="enriching"):
rgb, bgr = load_rgb_bgr(Path(path))
if bgr is None:
missing += len(idxs)
continue
faces = app.get(bgr)
for i in idxs:
match = _pick_face_for_bbox(faces, meta[i].get("bbox"))
if match is None:
missing += 1
continue
if match.landmark_2d_106 is not None:
meta[i]["landmark_2d_106"] = match.landmark_2d_106.astype(np.float32).tolist()
if match.landmark_3d_68 is not None:
meta[i]["landmark_3d_68"] = match.landmark_3d_68.astype(np.float32).tolist()
if match.pose is not None:
meta[i]["pose"] = match.pose.astype(np.float32).tolist() # [pitch, yaw, roll]
ok += 1
since_flush += 1
if since_flush >= flush_every:
save_cache(cache_path, emb, meta, src_root, processed, path_aliases)
since_flush = 0
finally:
save_cache(cache_path, emb, meta, src_root, processed, path_aliases)
print(f"Enrich done: {ok} records enriched, {missing} could not be matched")
# ---------- quality scoring ---------- #
QUALITY_WEIGHTS = {
"det": 0.20,
"size": 0.15,
"sharp": 0.15,
"frontal": 0.30,
"symmetry": 0.20,
}
def _norm01(x: float, lo: float, hi: float) -> float:
if hi <= lo:
return 0.0
return max(0.0, min(1.0, (x - lo) / (hi - lo)))
def _landmark_symmetry(lm: list[list[float]] | None, bbox: list[int] | None) -> float:
"""Score [0,1] based on how symmetric the 2D 106 landmarks are about the bbox vertical center.
A head-on, un-occluded face has high symmetry; a strong profile or half-occluded face has low.
Returns 0.5 if landmarks unavailable (neutral)."""
if not lm or not bbox:
return 0.5
try:
arr = np.asarray(lm, dtype=np.float32)
cx = 0.5 * (bbox[0] + bbox[2])
width = max(1.0, bbox[2] - bbox[0])
# Mirror each landmark around cx and measure closest-landmark distance (normalized by bbox width).
mirrored = arr.copy()
mirrored[:, 0] = 2 * cx - mirrored[:, 0]
# For each mirrored point, find nearest real landmark.
d = np.linalg.norm(mirrored[:, None, :] - arr[None, :, :], axis=2).min(axis=1)
mean_err = d.mean() / width
# Empirically mean_err is ~0.02 for frontal, ~0.15 for strong profile.
score = 1.0 - _norm01(mean_err, 0.02, 0.15)
return float(score)
except Exception:
return 0.5
def _frontality(pose: list[float] | None) -> float:
if not pose or len(pose) < 2:
return 0.5
pitch, yaw = abs(pose[0]), abs(pose[1])
# yaw is the dominant signal for arcface-style embedding degradation.
yaw_score = 1.0 - _norm01(yaw, 10.0, 45.0)
pitch_score = 1.0 - _norm01(pitch, 10.0, 35.0)
return 0.7 * yaw_score + 0.3 * pitch_score
def compute_quality(rec: dict) -> dict:
"""Return dict with per-signal sub-scores and a composite score in [0,1]."""
det = _norm01(float(rec.get("det_score", 0.0)), 0.50, 0.95)
size = _norm01(float(rec.get("face_short", 0)), 90.0, 300.0)
sharp = _norm01(float(rec.get("blur", 0.0)), 40.0, 250.0)
frontal = _frontality(rec.get("pose"))
symmetry = _landmark_symmetry(rec.get("landmark_2d_106"), rec.get("bbox"))
w = QUALITY_WEIGHTS
composite = (
w["det"] * det + w["size"] * size + w["sharp"] * sharp
+ w["frontal"] * frontal + w["symmetry"] * symmetry
)
return {
"composite": float(composite),
"det": float(det), "size": float(size), "sharp": float(sharp),
"frontal": float(frontal), "symmetry": float(symmetry),
}
# ---------- export-swap ---------- #
def _crop_face_square(rgb: np.ndarray, bbox: list[int], pad_ratio: float, out_size: int) -> np.ndarray:
"""Pad bbox by `pad_ratio` on each side, clamp to image, pad to square, resize to out_size."""
import cv2
h, w = rgb.shape[:2]
x1, y1, x2, y2 = [int(v) for v in bbox]
bw, bh = x2 - x1, y2 - y1
px = int(bw * pad_ratio)
py = int(bh * pad_ratio)
ex1 = max(0, x1 - px)
ey1 = max(0, y1 - py)
ex2 = min(w, x2 + px)
ey2 = min(h, y2 + py)
crop = rgb[ey1:ey2, ex1:ex2]
ch, cw = crop.shape[:2]
if ch == 0 or cw == 0:
return np.zeros((out_size, out_size, 3), dtype=np.uint8)
if ch != cw:
sz = max(ch, cw)
padded = np.zeros((sz, sz, 3), dtype=crop.dtype)
y_off = (sz - ch) // 2
x_off = (sz - cw) // 2
padded[y_off:y_off + ch, x_off:x_off + cw] = crop
crop = padded
if crop.shape[0] != out_size:
crop = cv2.resize(crop, (out_size, out_size), interpolation=cv2.INTER_AREA)
return crop
def _zip_png_list(pngs: list[Path], zip_path: Path) -> None:
"""Write a .fsz (zip) with the given PNGs named 0000.png, 0001.png, ..."""
import zipfile
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=4) as zf:
for i, p in enumerate(pngs):
zf.write(p, arcname=f"{i:04d}.png")
def cmd_export_swap(
cache_path: Path,
refine_manifest_path: Path,
raw_manifest_path: Path | None,
out_dir: Path,
top_n: int,
outlier_threshold: float,
pad_ratio: float,
out_size: int,
include_candidates: bool,
candidate_match_threshold: float,
candidate_min_score: float,
min_face_short: int,
) -> None:
import cv2
emb, meta, src_root, _processed, path_aliases = load_cache(cache_path)
rm = json.loads(refine_manifest_path.read_text())
dup_path = cache_path.with_suffix(".duplicates.json")
if not dup_path.exists():
dup_path = cache_path.parent / (cache_path.stem + ".duplicates.json")
visual_groups: list[list[str]] = []
if dup_path.exists():
visual_groups = json.loads(dup_path.read_text()).get("visual_groups", [])
path_to_vgroup: dict[str, tuple[str, ...]] = {}
for g in visual_groups:
key = tuple(sorted(g))
for p in g:
path_to_vgroup[p] = key
face_records = [m for m in meta if not m.get("noface")]
if len(face_records) != len(emb):
raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}")
path_idx: dict[str, list[int]] = {}
for i, m in enumerate(face_records):
path_idx.setdefault(m["path"], []).append(i)
out_dir.mkdir(parents=True, exist_ok=True)
faceset_summary = []
final_centroids: dict[str, np.ndarray] = {}
placed_cache_indices: set[int] = set()
for fs in rm.get("facesets", []):
name = fs["name"]
paths = set(fs.get("images", []))
indices = [i for p in paths for i in path_idx.get(p, [])]
if not indices:
continue
# Initial centroid for this faceset from all its current members.
init_vecs = emb[indices]
init_cent = init_vecs.mean(axis=0)
nrm = np.linalg.norm(init_cent)
if nrm > 0:
init_cent = init_cent / nrm
# Tight outlier filter + quality.
ranked: list[dict] = []
dropped_outlier = 0
for i in indices:
cosd = 1.0 - float(emb[i] @ init_cent)
if cosd > outlier_threshold:
dropped_outlier += 1
continue
rec = face_records[i]
if rec.get("face_short", 0) < min_face_short:
continue
q = compute_quality(rec)
ranked.append({"cache_idx": i, "rec": rec, "cosd": cosd, "quality": q})
# Visual-dupe collapse: keep best score per group.
groups_best: dict[tuple[str, ...], dict] = {}
singletons: list[dict] = []
for r in ranked:
g = path_to_vgroup.get(r["rec"]["path"])
if g is None:
singletons.append(r)
continue
prev = groups_best.get(g)
if prev is None or r["quality"]["composite"] > prev["quality"]["composite"]:
groups_best[g] = r
kept = singletons + list(groups_best.values())
kept.sort(key=lambda r: -r["quality"]["composite"])
dropped_vdupe = len(ranked) - len(kept)
if not kept:
print(f"[{name}] empty after filtering; skipping")
continue
# Recompute centroid from the kept embeddings (used for singleton rescue).
kept_vecs = np.stack([emb[r["cache_idx"]] for r in kept])
final_cent = kept_vecs.mean(axis=0)
nrm = np.linalg.norm(final_cent)
if nrm > 0:
final_cent = final_cent / nrm
final_centroids[name] = final_cent
for r in kept:
placed_cache_indices.add(r["cache_idx"])
# Materialize.
fs_out = out_dir / name
faces_dir = fs_out / "faces"
faces_dir.mkdir(parents=True, exist_ok=True)
# Deduplicate by source path: within the same faceset, a multi-face photo could
# have produced 2 records with different bboxes; we want the one with the best quality
# to win, and only crop that face.
seen_path = {}
unique_kept: list[dict] = []
for r in kept:
p = r["rec"]["path"]
if p not in seen_path or r["quality"]["composite"] > seen_path[p]["quality"]["composite"]:
seen_path[p] = r
unique_kept = sorted(seen_path.values(), key=lambda r: -r["quality"]["composite"])
written_pngs: list[Path] = []
manifest_faces: list[dict] = []
for rank, r in enumerate(unique_kept, start=1):
rec = r["rec"]
src = Path(rec["path"])
rgb = None
if src.exists():
rgb, _ = load_rgb_bgr(src)
if rgb is None:
continue
crop = _crop_face_square(rgb, rec["bbox"], pad_ratio, out_size)
png = faces_dir / f"{rank:04d}.png"
cv2.imwrite(str(png), cv2.cvtColor(crop, cv2.COLOR_RGB2BGR))
written_pngs.append(png)
manifest_faces.append({
"rank": rank,
"png": f"faces/{rank:04d}.png",
"source": rec["path"],
"aliases": path_aliases.get(rec["path"], []),
"bbox": rec["bbox"],
"face_short": rec.get("face_short"),
"det_score": rec.get("det_score"),
"blur": rec.get("blur"),
"pose": rec.get("pose"),
"cosd_centroid": float(r["cosd"]),
"quality": r["quality"],
})
if not written_pngs:
continue
# Emit .fsz bundles.
top_n_eff = min(top_n, len(written_pngs))
_zip_png_list(written_pngs[:top_n_eff], fs_out / f"{name}_top{top_n_eff}.fsz")
if len(written_pngs) > top_n_eff:
_zip_png_list(written_pngs, fs_out / f"{name}_all.fsz")
# Per-faceset manifest.
manifest = {
"name": name,
"input_face_records": len(indices),
"dropped_outlier": dropped_outlier,
"dropped_visual_dupes": dropped_vdupe,
"dropped_multi_face_same_source": len(kept) - len(unique_kept),
"exported": len(written_pngs),
"top_n": top_n_eff,
"fsz_top": f"{name}_top{top_n_eff}.fsz",
"fsz_all": f"{name}_all.fsz" if len(written_pngs) > top_n_eff else None,
"quality_weights": QUALITY_WEIGHTS,
"faces": manifest_faces,
}
(fs_out / "manifest.json").write_text(json.dumps(manifest, indent=2))
# Convenience name placeholder.
name_file = fs_out / "NAME.txt"
if not name_file.exists():
name_file.write_text(
"# Optional: write the identity's name on the first line.\n"
"# This file is for operator reference only - roop-unleashed ignores it.\n\n"
)
faceset_summary.append(manifest)
print(
f"[{name}] in={len(indices)} outlier_drop={dropped_outlier} vdupe_drop={dropped_vdupe} "
f"multiface_drop={len(kept) - len(unique_kept)} exported={len(written_pngs)} "
f"(top{top_n_eff}.fsz)"
)
# Singleton rescue -> _candidates/
if include_candidates and raw_manifest_path is not None:
raw = json.loads(raw_manifest_path.read_text())
# Index singletons: face records in _singletons by (path, bbox) => cache index
bbox_key_to_cache = {
(m["path"], tuple(m["bbox"]) if m.get("bbox") else None): i
for i, m in enumerate(face_records)
}
singleton_cache_indices: list[int] = []
for e in raw:
if e.get("folder") != "_singletons":
continue
key = (e["source"], tuple(e["bbox"]) if e.get("bbox") else None)
ci = bbox_key_to_cache.get(key)
if ci is not None and ci not in placed_cache_indices:
singleton_cache_indices.append(ci)
if not final_centroids:
print("No final centroids; skipping candidates.")
elif not singleton_cache_indices:
print("No singletons to rescue.")
else:
cand_root = out_dir / "_candidates"
cand_root.mkdir(parents=True, exist_ok=True)
cent_names = list(final_centroids.keys())
cent_mat = np.stack([final_centroids[n] for n in cent_names])
to_faceset: dict[str, list[int]] = {}
unmatched: list[int] = []
rescued_report: list[dict] = []
for ci in singleton_cache_indices:
rec = face_records[ci]
if rec.get("face_short", 0) < min_face_short:
continue
q = compute_quality(rec)
if q["composite"] < candidate_min_score:
continue
sims = cent_mat @ emb[ci]
best = int(np.argmax(sims))
dist = 1.0 - float(sims[best])
if dist <= candidate_match_threshold:
to_faceset.setdefault(cent_names[best], []).append(ci)
rescued_report.append({
"cache_idx": ci, "source": rec["path"], "assigned": cent_names[best],
"cosd": dist, "quality": q,
})
else:
unmatched.append(ci)
# Cluster unmatched among themselves into new_NNN buckets.
if len(unmatched) > 1:
u_vecs = np.stack([emb[i] for i in unmatched])
labels = _cluster_embeddings(u_vecs, 0.55)
groups: dict[int, list[int]] = {}
for ci, lbl in zip(unmatched, labels):
groups.setdefault(int(lbl), []).append(ci)
groups_sorted = sorted(groups.items(), key=lambda kv: -len(kv[1]))
new_buckets = {}
rank = 0
for _gid, members in groups_sorted:
if len(members) == 1:
continue # still a singleton, skip
rank += 1
new_buckets[f"new_{rank:03d}"] = members
to_new = new_buckets
else:
to_new = {}
# Materialize candidates
def materialize(bucket_name: str, ci_list: list[int]):
bd = cand_root / bucket_name
fd = bd / "faces"
fd.mkdir(parents=True, exist_ok=True)
written = []
entries = []
ranked_cis = sorted(ci_list, key=lambda i: -compute_quality(face_records[i])["composite"])
for rk, ci in enumerate(ranked_cis, 1):
rec = face_records[ci]
src = Path(rec["path"])
if not src.exists():
continue
rgb, _ = load_rgb_bgr(src)
if rgb is None:
continue
crop = _crop_face_square(rgb, rec["bbox"], pad_ratio, out_size)
png = fd / f"{rk:04d}.png"
cv2.imwrite(str(png), cv2.cvtColor(crop, cv2.COLOR_RGB2BGR))
written.append(png)
entries.append({
"rank": rk,
"png": f"faces/{rk:04d}.png",
"source": rec["path"],
"bbox": rec["bbox"],
"quality": compute_quality(rec),
})
if written:
(bd / "manifest.json").write_text(json.dumps({
"bucket": bucket_name,
"faces": entries,
}, indent=2))
for fs_name, cis in to_faceset.items():
materialize(f"to_{fs_name}", cis)
for bname, cis in to_new.items():
materialize(bname, cis)
(cand_root / "rescue_report.json").write_text(json.dumps({
"rescued_to_existing": len(rescued_report),
"new_clusters": len(to_new),
"unmatched_singletons_kept_as_singleton": len(unmatched) - sum(len(v) for v in to_new.values()),
"assignments": rescued_report,
}, indent=2))
print(f"Candidates: rescued={len(rescued_report)} to existing facesets; new_clusters={len(to_new)}")
# Top-level manifest
(out_dir / "manifest.json").write_text(json.dumps({
"facesets": [{k: v for k, v in m.items() if k != "faces"} for m in faceset_summary],
"quality_weights": QUALITY_WEIGHTS,
"outlier_threshold": outlier_threshold,
"top_n": top_n,
"pad_ratio": pad_ratio,
"out_size": out_size,
}, indent=2))
print(f"Wrote top-level manifest -> {out_dir / 'manifest.json'}")
# ---------- main ---------- #
def main() -> None:
p = argparse.ArgumentParser()
sub = p.add_subparsers(dest="cmd", required=True)
pe = sub.add_parser("embed")
pe.add_argument("src_dir", type=Path)
pe.add_argument("cache", type=Path)
pe.add_argument("--no-resume", action="store_true", help="ignore any existing cache at <cache> path")
pe.add_argument("--flush-every", type=int, default=FLUSH_DEFAULT)
pc = sub.add_parser("cluster")
pc.add_argument("cache", type=Path)
pc.add_argument("out_dir", type=Path)
pc.add_argument("--threshold", type=float, default=0.55)
pc.add_argument("--mode", choices=["copy", "move", "symlink"], default="copy")
pc.add_argument("--dry-run", action="store_true")
pr = sub.add_parser("refine")
pr.add_argument("cache", type=Path)
pr.add_argument("out_dir", type=Path)
pr.add_argument("--initial-threshold", type=float, default=0.55)
pr.add_argument("--merge-threshold", type=float, default=0.40)
pr.add_argument("--outlier-threshold", type=float, default=0.55)
pr.add_argument("--min-faces", type=int, default=15)
pr.add_argument("--min-short", type=int, default=90)
pr.add_argument("--min-blur", type=float, default=40.0)
pr.add_argument("--min-det-score", type=float, default=0.6)
pr.add_argument("--mode", choices=["copy", "move", "symlink"], default="copy")
pr.add_argument("--dry-run", action="store_true")
pd = sub.add_parser("dedup")
pd.add_argument("cache", type=Path)
pd.add_argument("--cos", type=float, default=0.03, help="cosine-distance threshold for visual dupes")
pd.add_argument("--out", type=Path, default=None)
px = sub.add_parser("extend", help="Add new embeddings to existing raw/refine dirs without renumbering")
px.add_argument("cache", type=Path)
px.add_argument("raw_out", type=Path, help="existing raw cluster dir (must contain manifest.json)")
px.add_argument("--refine-out", type=Path, default=None, help="optional existing facesets dir")
px.add_argument("--threshold", type=float, default=0.55, help="cosine-dist cutoff for matching new face to an existing person centroid")
px.add_argument("--new-cluster-threshold", type=float, default=0.55, help="threshold for clustering the unmatched new faces among themselves")
px.add_argument("--mode", choices=["copy", "move", "symlink"], default="copy")
px.add_argument("--refine-min-short", type=int, default=90)
px.add_argument("--refine-min-blur", type=float, default=40.0)
px.add_argument("--refine-min-det-score", type=float, default=0.6)
px.add_argument("--refine-centroid-threshold", type=float, default=0.55)
pn = sub.add_parser("enrich", help="Re-detect to persist landmark_2d_106, landmark_3d_68, pose into cache")
pn.add_argument("cache", type=Path)
pn.add_argument("--force", action="store_true", help="re-enrich even records that already have pose")
pn.add_argument("--flush-every", type=int, default=100)
pxs = sub.add_parser("export-swap", help="Build facesets_swap_ready/ with ranked single-face PNGs + .fsz per identity")
pxs.add_argument("cache", type=Path)
pxs.add_argument("refine_manifest", type=Path, help="path to refine_manifest.json of the source facesets dir")
pxs.add_argument("out_dir", type=Path)
pxs.add_argument("--raw-manifest", type=Path, default=None, help="raw_full/manifest.json (required for --candidates)")
pxs.add_argument("--top-n", type=int, default=30)
pxs.add_argument("--outlier-threshold", type=float, default=0.45)
pxs.add_argument("--pad-ratio", type=float, default=0.5)
pxs.add_argument("--out-size", type=int, default=512)
pxs.add_argument("--min-face-short", type=int, default=100)
pxs.add_argument("--candidates", action="store_true", help="rescue singletons into _candidates/")
pxs.add_argument("--candidate-match-threshold", type=float, default=0.55)
pxs.add_argument("--candidate-min-score", type=float, default=0.40)
args = p.parse_args()
if args.cmd == "embed":
cmd_embed(args.src_dir, args.cache, resume=not args.no_resume, flush_every=args.flush_every)
elif args.cmd == "cluster":
cmd_cluster(args.cache, args.out_dir, args.threshold, args.mode, args.dry_run)
elif args.cmd == "refine":
cmd_refine(
args.cache, args.out_dir,
args.initial_threshold, args.merge_threshold, args.outlier_threshold,
args.min_faces, args.min_short, args.min_blur, args.min_det_score,
args.mode, args.dry_run,
)
elif args.cmd == "dedup":
cmd_dedup(args.cache, args.cos, args.out)
elif args.cmd == "extend":
cmd_extend(
args.cache, args.raw_out, args.refine_out,
args.threshold, args.new_cluster_threshold, args.mode,
args.refine_min_short, args.refine_min_blur, args.refine_min_det_score,
args.refine_centroid_threshold,
)
elif args.cmd == "enrich":
cmd_enrich(args.cache, force=args.force, flush_every=args.flush_every)
elif args.cmd == "export-swap":
cmd_export_swap(
args.cache, args.refine_manifest, args.raw_manifest, args.out_dir,
args.top_n, args.outlier_threshold, args.pad_ratio, args.out_size,
args.candidates, args.candidate_match_threshold, args.candidate_min_score,
args.min_face_short,
)
if __name__ == "__main__":
main()