#!/usr/bin/env python3 """Probe faceset_001 for age-sortable sub-structure. Three questions: 1. How spread is the embedding cloud? (intra-cluster pairwise distance histogram) 2. Does it split naturally into sub-clusters at a tight threshold? 3. Do the sub-clusters correspond to distinct time periods (EXIF DateTimeOriginal)? """ from __future__ import annotations import json import sys from collections import Counter from pathlib import Path import numpy as np from PIL import Image, ExifTags REPO = Path(__file__).resolve().parent.parent sys.path.insert(0, str(REPO)) from sort_faces import load_cache # noqa: E402 CACHE = REPO / "work" / "cache" / "nl_full.npz" FS001 = Path("/mnt/e/temp_things/fcswp/nl_sorted/facesets_swap_ready/faceset_001") def exif_year(path: Path) -> int | None: try: with Image.open(path) as im: exif = im._getexif() if not exif: return None for tag_id, val in exif.items(): tag = ExifTags.TAGS.get(tag_id, tag_id) if tag == "DateTimeOriginal" and isinstance(val, str) and len(val) >= 4: return int(val[:4]) except Exception: return None return None def main() -> None: manifest = json.loads((FS001 / "manifest.json").read_text()) faces = manifest["faces"] paths = [Path(f["source"]) for f in faces] print(f"faceset_001 has {len(paths)} ranked faces in the swap-ready set") # Pull embeddings for these face records by (path, bbox). emb, meta, _src, _proc, _aliases = load_cache(CACHE) face_records = [m for m in meta if not m.get("noface")] if len(face_records) != len(emb): raise SystemExit("emb/meta mismatch") bbox_key = {} for i, m in enumerate(face_records): bbox_key[(m["path"], tuple(m.get("bbox") or ()))] = i selected = [] missing = 0 for f in faces: key = (f["source"], tuple(f.get("bbox") or ())) i = bbox_key.get(key) if i is None: missing += 1 continue selected.append(i) print(f"matched {len(selected)} embeddings (missing {missing})") E = emb[selected] # All embeddings are L2-normalized -> cosine dist = 1 - dot. sims = E @ E.T dists = 1.0 - sims iu = np.triu_indices_from(dists, k=1) pw = dists[iu] print("\n-- intra-cluster pairwise cosine distance --") print(f" n_pairs = {len(pw):,}") print(f" mean = {pw.mean():.3f}") print(f" median = {np.median(pw):.3f}") print(f" p10/p25/p75/p90 = {np.percentile(pw, [10,25,75,90])}") print(f" max = {pw.max():.3f}") # Histogram bins around interesting thresholds. edges = [0.0, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 1.0, 1.4] hist, _ = np.histogram(pw, bins=edges) print("\n histogram (cos-dist bin -> pair count):") for lo, hi, c in zip(edges[:-1], edges[1:], hist): bar = "#" * int(60 * c / max(hist.max(), 1)) print(f" [{lo:.1f},{hi:.1f}) {c:7d} {bar}") # Sub-cluster at three thresholds via agglomerative on the distance matrix. from sklearn.cluster import AgglomerativeClustering print("\n-- sub-clustering --") for thr in (0.30, 0.35, 0.40, 0.45, 0.50): ac = AgglomerativeClustering( n_clusters=None, metric="precomputed", linkage="average", distance_threshold=thr, ) labels = ac.fit_predict(dists) sizes = Counter(labels) n = len(sizes) big = sum(1 for s in sizes.values() if s >= 10) top5 = sorted(sizes.values(), reverse=True)[:5] print(f" threshold {thr:.2f}: {n} sub-clusters, {big} with >=10 images, top-5 sizes={top5}") # Pick the threshold that gives 2-5 substantial sub-clusters. target_thr = 0.35 ac = AgglomerativeClustering( n_clusters=None, metric="precomputed", linkage="average", distance_threshold=target_thr, ) labels = ac.fit_predict(dists) sizes = Counter(labels) big_labels = [lab for lab, s in sizes.most_common() if s >= 20] print(f"\n-- EXIF year analysis at threshold {target_thr} (sub-clusters with >=20 images) --") print(f" {len(big_labels)} substantial sub-clusters") # Build label -> list of source paths by_label: dict[int, list[Path]] = {} for ci, lab in zip(selected, labels): rec = face_records[ci] by_label.setdefault(int(lab), []).append(Path(rec["path"])) for lab in big_labels[:6]: paths_in = by_label[lab] years = [] for p in paths_in: y = exif_year(p) if y is not None: years.append(y) n_paths = len(paths_in) n_years = len(years) if years: ys = np.array(years) ymin, ymax = int(ys.min()), int(ys.max()) ymed = int(np.median(ys)) yhist = Counter(years) top_years = ", ".join(f"{y}:{c}" for y, c in sorted(yhist.most_common(5))) else: ymin = ymax = ymed = None top_years = "" print( f" cluster {lab}: {n_paths} faces, EXIF on {n_years}/{n_paths}, " f"year range {ymin}..{ymax} (median {ymed})" ) print(f" top years: {top_years}") if __name__ == "__main__": main()