Files
face-sets/sort_faces.py
Peter c5a4e2dfdb Add face-sort pipeline as the repo's base
Single-file CLI (embed / cluster / refine) using InsightFace buffalo_l
embeddings and agglomerative clustering, migrated in from the ad-hoc
/home/peter/face_sort/ directory so this repo is the canonical home for
faceset preparation feeding roop-unleashed and similar tools.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 11:20:00 +02:00

444 lines
16 KiB
Python

"""Sort photos by similar faces using InsightFace embeddings + agglomerative clustering.
Subcommands:
embed <src_dir> <cache.npz> recursively scan, detect+embed faces
cluster <cache.npz> <out_dir> [opts] raw agglomerative clustering -> person_NNN/
refine <cache.npz> <out_dir> [opts] merge + outlier + quality pass -> faceset-ready folders
"""
from __future__ import annotations
import argparse
import json
import shutil
import sys
import time
from pathlib import Path
import numpy as np
from PIL import Image, ImageOps
from tqdm import tqdm
IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff", ".webp", ".heic"}
MIN_DET_SCORE = 0.5
MIN_FACE_PIX = 40
def list_images(src: Path) -> list[Path]:
out: list[Path] = []
for p in src.rglob("*"):
if p.is_file() and p.suffix.lower() in IMG_EXTS:
out.append(p)
return sorted(out)
def load_rgb_bgr(path: Path):
try:
with Image.open(path) as im:
im = ImageOps.exif_transpose(im)
im = im.convert("RGB")
rgb = np.array(im)
bgr = rgb[:, :, ::-1].copy()
return rgb, bgr
except Exception as e:
print(f"[warn] failed to load {path}: {e}", file=sys.stderr)
return None, None
def laplacian_variance(gray: np.ndarray) -> float:
"""Simple blur metric without OpenCV Laplacian call (uses numpy)."""
k = np.array([[0, 1, 0], [1, -4, 1], [0, 1, 0]], dtype=np.float32)
# same-size convolution via numpy slicing
g = gray.astype(np.float32)
lap = (
-4.0 * g[1:-1, 1:-1]
+ g[:-2, 1:-1] + g[2:, 1:-1]
+ g[1:-1, :-2] + g[1:-1, 2:]
)
return float(lap.var())
def make_rel(path: Path, root: Path) -> str:
try:
return str(path.relative_to(root))
except ValueError:
return path.name
def safe_dst_name(path: Path, root: Path) -> str:
"""Collision-safe filename built from source-relative path."""
rel = make_rel(path, root)
# Flatten: replace separators with double underscore
flat = rel.replace("/", "__").replace("\\", "__").replace(" ", "_")
return flat
def cmd_embed(src_dir: Path, cache_path: Path) -> None:
from insightface.app import FaceAnalysis
app = FaceAnalysis(name="buffalo_l", providers=["CPUExecutionProvider"])
app.prepare(ctx_id=-1, det_size=(640, 640))
images = list_images(src_dir)
print(f"Found {len(images)} images under {src_dir}")
embeddings: list[np.ndarray] = []
meta: list[dict] = []
t0 = time.time()
for img_path in tqdm(images, desc="embedding"):
rgb, bgr = load_rgb_bgr(img_path)
if bgr is None:
meta.append({"path": str(img_path), "face_idx": -1, "noface": True, "error": "load"})
continue
faces = app.get(bgr)
kept = 0
for i, f in enumerate(faces):
if float(f.det_score) < MIN_DET_SCORE:
continue
x1, y1, x2, y2 = [int(round(v)) for v in f.bbox]
x1, y1 = max(x1, 0), max(y1, 0)
x2, y2 = min(x2, rgb.shape[1]), min(y2, rgb.shape[0])
w, h = x2 - x1, y2 - y1
short = min(w, h)
if short < MIN_FACE_PIX:
continue
# Blur metric on the face crop (grayscale)
crop = rgb[y1:y2, x1:x2]
if crop.size == 0:
continue
gray = crop.mean(axis=2)
blur = laplacian_variance(gray) if min(gray.shape) > 3 else 0.0
emb = f.normed_embedding.astype(np.float32)
embeddings.append(emb)
meta.append({
"path": str(img_path),
"face_idx": i,
"det_score": float(f.det_score),
"bbox": [x1, y1, x2, y2],
"face_short": int(short),
"face_area": int(w * h),
"blur": blur,
"noface": False,
})
kept += 1
if kept == 0:
meta.append({"path": str(img_path), "face_idx": -1, "noface": True})
dt = time.time() - t0
print(f"Detected {len(embeddings)} faces across {len(images)} images in {dt:.1f}s")
emb_arr = np.stack(embeddings) if embeddings else np.zeros((0, 512), dtype=np.float32)
np.savez(cache_path, embeddings=emb_arr, meta=json.dumps(meta), src_root=str(src_dir))
print(f"Cache written to {cache_path}")
def load_cache(cache_path: Path):
data = np.load(cache_path, allow_pickle=True)
emb = data["embeddings"]
meta = json.loads(str(data["meta"]))
src_root = Path(str(data["src_root"])) if "src_root" in data.files else None
return emb, meta, src_root
def _transfer(src: Path, dst: Path, mode: str) -> None:
if dst.exists():
return
if mode == "copy":
shutil.copy2(src, dst)
elif mode == "move":
shutil.move(str(src), str(dst))
elif mode == "symlink":
dst.symlink_to(src)
def _cluster_embeddings(emb: np.ndarray, threshold: float) -> np.ndarray:
from sklearn.cluster import AgglomerativeClustering
clusterer = AgglomerativeClustering(
n_clusters=None,
distance_threshold=threshold,
metric="cosine",
linkage="average",
)
return clusterer.fit_predict(emb)
def cmd_cluster(cache_path: Path, out_dir: Path, threshold: float, mode: str, dry_run: bool) -> None:
emb, meta, src_root = load_cache(cache_path)
if src_root is None:
src_root = Path("/")
face_records = [m for m in meta if not m.get("noface")]
noface_records = [m for m in meta if m.get("noface")]
if len(face_records) != len(emb):
raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}")
if len(emb) == 0:
print("No faces detected; nothing to cluster.")
return
print(f"Clustering {len(emb)} face embeddings (threshold={threshold} cosine distance)")
labels = _cluster_embeddings(emb, threshold)
clusters: dict[int, list[dict]] = {}
for rec, lbl in zip(face_records, labels):
rec = dict(rec)
rec["cluster"] = int(lbl)
clusters.setdefault(int(lbl), []).append(rec)
ordered = sorted(clusters.items(), key=lambda kv: (-len(kv[1]), kv[0]))
sizes = [len(v) for _, v in ordered]
singletons = sum(1 for s in sizes if s == 1)
print(f"Clusters: {len(ordered)} | top sizes: {sizes[:15]}")
print(f"Multi-face clusters: {len(sizes) - singletons} singletons: {singletons}")
print(f"No-face images: {len(noface_records)}")
if dry_run:
for cid, recs in ordered[:20]:
imgs = {r["path"] for r in recs}
print(f" cluster {cid:3d} faces={len(recs):3d} imgs={len(imgs)}")
return
out_dir.mkdir(parents=True, exist_ok=True)
rank = 0
cluster_dir: dict[int, Path] = {}
for cid, recs in ordered:
if len(recs) == 1:
cluster_dir[cid] = out_dir / "_singletons"
else:
rank += 1
cluster_dir[cid] = out_dir / f"person_{rank:03d}"
cluster_dir[cid].mkdir(parents=True, exist_ok=True)
per_cluster_imgs: dict[int, set[str]] = {cid: set() for cid, _ in ordered}
for cid, recs in ordered:
for r in recs:
per_cluster_imgs[cid].add(r["path"])
total = sum(len(v) for v in per_cluster_imgs.values())
unique = len({p for s in per_cluster_imgs.values() for p in s})
print(f"Placing {total} file instances across {unique} unique images (mode={mode}) -> {out_dir}")
for cid, paths in tqdm(per_cluster_imgs.items(), desc="transferring"):
dst_dir = cluster_dir[cid]
for p in sorted(paths):
src = Path(p)
dst = dst_dir / safe_dst_name(src, src_root)
_transfer(src, dst, mode)
if noface_records:
noface_dir = out_dir / "_noface"
noface_dir.mkdir(exist_ok=True)
for r in noface_records:
src = Path(r["path"])
if not src.exists():
continue
_transfer(src, noface_dir / safe_dst_name(src, src_root), mode)
print(f"{len(noface_records)} no-face images -> {noface_dir}")
manifest = []
for cid, recs in ordered:
for r in recs:
manifest.append({
"image": Path(r["path"]).name,
"source": r["path"],
"cluster": cid,
"folder": cluster_dir[cid].name,
"bbox": r.get("bbox"),
"det_score": r.get("det_score"),
"face_short": r.get("face_short"),
"blur": r.get("blur"),
})
(out_dir / "manifest.json").write_text(json.dumps(manifest, indent=2))
print(f"Manifest -> {out_dir / 'manifest.json'}")
def _cluster_centroids(emb: np.ndarray, labels: np.ndarray) -> tuple[np.ndarray, list[int]]:
ids = sorted(set(int(l) for l in labels))
cents = []
for cid in ids:
mask = labels == cid
v = emb[mask].mean(axis=0)
n = np.linalg.norm(v)
if n > 0:
v = v / n
cents.append(v)
return np.stack(cents), ids
def cmd_refine(
cache_path: Path,
out_dir: Path,
initial_threshold: float,
merge_threshold: float,
outlier_threshold: float,
min_faces: int,
min_short: int,
min_blur: float,
min_det_score: float,
mode: str,
dry_run: bool,
) -> None:
emb, meta, src_root = load_cache(cache_path)
if src_root is None:
src_root = Path("/")
face_records = [m for m in meta if not m.get("noface")]
if len(face_records) != len(emb):
raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}")
print(f"Stage 1: initial clustering (threshold={initial_threshold})")
labels = _cluster_embeddings(emb, initial_threshold)
# Stage 2: merge similar clusters by centroid
cents, cent_ids = _cluster_centroids(emb, labels)
print(f"Stage 2: centroid merge on {len(cent_ids)} clusters (merge_threshold={merge_threshold})")
cent_labels = _cluster_embeddings(cents, merge_threshold) if len(cents) > 1 else np.zeros(1, dtype=int)
# remap original labels via centroid merge
label_map = {cid: int(ml) for cid, ml in zip(cent_ids, cent_labels)}
merged = np.array([label_map[int(l)] for l in labels])
# Build merged clusters
clusters: dict[int, list[tuple[int, dict]]] = {} # cluster -> list of (global_idx, rec)
for idx, (rec, lbl) in enumerate(zip(face_records, merged)):
clusters.setdefault(int(lbl), []).append((idx, dict(rec)))
print(f"After merge: {len(clusters)} clusters")
# Stage 3: outlier rejection + quality filter per cluster
kept_by_cluster: dict[int, list[tuple[int, dict]]] = {}
dropped_quality = 0
dropped_outlier = 0
for cid, items in clusters.items():
idxs = [i for i, _ in items]
cvecs = emb[idxs]
# centroid from the in-cluster faces
c = cvecs.mean(axis=0)
n = np.linalg.norm(c)
if n > 0:
c = c / n
kept: list[tuple[int, dict]] = []
for (idx, rec), v in zip(items, cvecs):
# Quality gate
if rec.get("face_short", 0) < min_short:
dropped_quality += 1
continue
if rec.get("blur", 0.0) < min_blur:
dropped_quality += 1
continue
if rec.get("det_score", 0.0) < min_det_score:
dropped_quality += 1
continue
# Outlier: only apply if the merged cluster has >=4 surviving-ish members
if len(items) >= 4:
cos_dist = 1.0 - float(v @ c)
if cos_dist > outlier_threshold:
dropped_outlier += 1
continue
kept.append((idx, rec))
if kept:
kept_by_cluster[cid] = kept
print(f"Dropped {dropped_quality} faces by quality gate, {dropped_outlier} as outliers")
# Stage 4: enforce minimum cluster size (by unique images, not faces)
final: list[tuple[int, list[tuple[int, dict]]]] = []
for cid, items in kept_by_cluster.items():
unique_imgs = {rec["path"] for _, rec in items}
if len(unique_imgs) >= min_faces:
final.append((cid, items))
final.sort(key=lambda kv: -len(kv[1]))
print(f"Facesets meeting min_faces={min_faces}: {len(final)}")
for rank, (cid, items) in enumerate(final, 1):
unique_imgs = {rec["path"] for _, rec in items}
print(f" faceset_{rank:03d}: faces={len(items):3d} imgs={len(unique_imgs):3d}")
if dry_run:
return
out_dir.mkdir(parents=True, exist_ok=True)
for rank, (cid, items) in enumerate(final, 1):
dst_dir = out_dir / f"faceset_{rank:03d}"
dst_dir.mkdir(exist_ok=True)
seen_paths: set[str] = set()
for _, rec in items:
p = rec["path"]
if p in seen_paths:
continue
seen_paths.add(p)
src = Path(p)
if not src.exists():
continue
_transfer(src, dst_dir / safe_dst_name(src, src_root), mode)
# Write refinement manifest
manifest = {
"params": {
"initial_threshold": initial_threshold,
"merge_threshold": merge_threshold,
"outlier_threshold": outlier_threshold,
"min_faces": min_faces,
"min_short": min_short,
"min_blur": min_blur,
"min_det_score": min_det_score,
},
"facesets": [
{
"name": f"faceset_{rank:03d}",
"face_count": len(items),
"image_count": len({rec["path"] for _, rec in items}),
"images": sorted({rec["path"] for _, rec in items}),
}
for rank, (_, items) in enumerate(final, 1)
],
}
(out_dir / "refine_manifest.json").write_text(json.dumps(manifest, indent=2))
print(f"Refine manifest -> {out_dir / 'refine_manifest.json'}")
def main() -> None:
p = argparse.ArgumentParser()
sub = p.add_subparsers(dest="cmd", required=True)
pe = sub.add_parser("embed")
pe.add_argument("src_dir", type=Path)
pe.add_argument("cache", type=Path)
pc = sub.add_parser("cluster")
pc.add_argument("cache", type=Path)
pc.add_argument("out_dir", type=Path)
pc.add_argument("--threshold", type=float, default=0.55)
pc.add_argument("--mode", choices=["copy", "move", "symlink"], default="copy")
pc.add_argument("--dry-run", action="store_true")
pr = sub.add_parser("refine")
pr.add_argument("cache", type=Path)
pr.add_argument("out_dir", type=Path)
pr.add_argument("--initial-threshold", type=float, default=0.55)
pr.add_argument("--merge-threshold", type=float, default=0.40)
pr.add_argument("--outlier-threshold", type=float, default=0.55)
pr.add_argument("--min-faces", type=int, default=15)
pr.add_argument("--min-short", type=int, default=90)
pr.add_argument("--min-blur", type=float, default=40.0)
pr.add_argument("--min-det-score", type=float, default=0.6)
pr.add_argument("--mode", choices=["copy", "move", "symlink"], default="copy")
pr.add_argument("--dry-run", action="store_true")
args = p.parse_args()
if args.cmd == "embed":
cmd_embed(args.src_dir, args.cache)
elif args.cmd == "cluster":
cmd_cluster(args.cache, args.out_dir, args.threshold, args.mode, args.dry_run)
elif args.cmd == "refine":
cmd_refine(
args.cache, args.out_dir,
args.initial_threshold, args.merge_threshold, args.outlier_threshold,
args.min_faces, args.min_short, args.min_blur, args.min_det_score,
args.mode, args.dry_run,
)
if __name__ == "__main__":
main()