- README: document work/build_folders.py (hand-sorted folder identities) and the new age-split workflow for splitting a long-running identity into era-specific facesets after clustering. - Force-track work/age_split_001.py and work/check_faceset001_age.py; these are the worked example + readiness probe for faceset_001 and the template for splitting any other identity by EXIF era. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
152 lines
5.2 KiB
Python
152 lines
5.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Probe faceset_001 for age-sortable sub-structure.
|
|
|
|
Three questions:
|
|
1. How spread is the embedding cloud? (intra-cluster pairwise distance histogram)
|
|
2. Does it split naturally into sub-clusters at a tight threshold?
|
|
3. Do the sub-clusters correspond to distinct time periods (EXIF DateTimeOriginal)?
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import sys
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
from PIL import Image, ExifTags
|
|
|
|
REPO = Path(__file__).resolve().parent.parent
|
|
sys.path.insert(0, str(REPO))
|
|
from sort_faces import load_cache # noqa: E402
|
|
|
|
CACHE = REPO / "work" / "cache" / "nl_full.npz"
|
|
FS001 = Path("/mnt/e/temp_things/fcswp/nl_sorted/facesets_swap_ready/faceset_001")
|
|
|
|
|
|
def exif_year(path: Path) -> int | None:
|
|
try:
|
|
with Image.open(path) as im:
|
|
exif = im._getexif()
|
|
if not exif:
|
|
return None
|
|
for tag_id, val in exif.items():
|
|
tag = ExifTags.TAGS.get(tag_id, tag_id)
|
|
if tag == "DateTimeOriginal" and isinstance(val, str) and len(val) >= 4:
|
|
return int(val[:4])
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
|
|
def main() -> None:
|
|
manifest = json.loads((FS001 / "manifest.json").read_text())
|
|
faces = manifest["faces"]
|
|
paths = [Path(f["source"]) for f in faces]
|
|
print(f"faceset_001 has {len(paths)} ranked faces in the swap-ready set")
|
|
|
|
# Pull embeddings for these face records by (path, bbox).
|
|
emb, meta, _src, _proc, _aliases = load_cache(CACHE)
|
|
face_records = [m for m in meta if not m.get("noface")]
|
|
if len(face_records) != len(emb):
|
|
raise SystemExit("emb/meta mismatch")
|
|
bbox_key = {}
|
|
for i, m in enumerate(face_records):
|
|
bbox_key[(m["path"], tuple(m.get("bbox") or ()))] = i
|
|
|
|
selected = []
|
|
missing = 0
|
|
for f in faces:
|
|
key = (f["source"], tuple(f.get("bbox") or ()))
|
|
i = bbox_key.get(key)
|
|
if i is None:
|
|
missing += 1
|
|
continue
|
|
selected.append(i)
|
|
print(f"matched {len(selected)} embeddings (missing {missing})")
|
|
|
|
E = emb[selected]
|
|
# All embeddings are L2-normalized -> cosine dist = 1 - dot.
|
|
sims = E @ E.T
|
|
dists = 1.0 - sims
|
|
iu = np.triu_indices_from(dists, k=1)
|
|
pw = dists[iu]
|
|
print("\n-- intra-cluster pairwise cosine distance --")
|
|
print(f" n_pairs = {len(pw):,}")
|
|
print(f" mean = {pw.mean():.3f}")
|
|
print(f" median = {np.median(pw):.3f}")
|
|
print(f" p10/p25/p75/p90 = {np.percentile(pw, [10,25,75,90])}")
|
|
print(f" max = {pw.max():.3f}")
|
|
|
|
# Histogram bins around interesting thresholds.
|
|
edges = [0.0, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 1.0, 1.4]
|
|
hist, _ = np.histogram(pw, bins=edges)
|
|
print("\n histogram (cos-dist bin -> pair count):")
|
|
for lo, hi, c in zip(edges[:-1], edges[1:], hist):
|
|
bar = "#" * int(60 * c / max(hist.max(), 1))
|
|
print(f" [{lo:.1f},{hi:.1f}) {c:7d} {bar}")
|
|
|
|
# Sub-cluster at three thresholds via agglomerative on the distance matrix.
|
|
from sklearn.cluster import AgglomerativeClustering
|
|
print("\n-- sub-clustering --")
|
|
for thr in (0.30, 0.35, 0.40, 0.45, 0.50):
|
|
ac = AgglomerativeClustering(
|
|
n_clusters=None,
|
|
metric="precomputed",
|
|
linkage="average",
|
|
distance_threshold=thr,
|
|
)
|
|
labels = ac.fit_predict(dists)
|
|
sizes = Counter(labels)
|
|
n = len(sizes)
|
|
big = sum(1 for s in sizes.values() if s >= 10)
|
|
top5 = sorted(sizes.values(), reverse=True)[:5]
|
|
print(f" threshold {thr:.2f}: {n} sub-clusters, {big} with >=10 images, top-5 sizes={top5}")
|
|
|
|
# Pick the threshold that gives 2-5 substantial sub-clusters.
|
|
target_thr = 0.35
|
|
ac = AgglomerativeClustering(
|
|
n_clusters=None, metric="precomputed", linkage="average",
|
|
distance_threshold=target_thr,
|
|
)
|
|
labels = ac.fit_predict(dists)
|
|
sizes = Counter(labels)
|
|
big_labels = [lab for lab, s in sizes.most_common() if s >= 20]
|
|
print(f"\n-- EXIF year analysis at threshold {target_thr} (sub-clusters with >=20 images) --")
|
|
print(f" {len(big_labels)} substantial sub-clusters")
|
|
|
|
# Build label -> list of source paths
|
|
by_label: dict[int, list[Path]] = {}
|
|
for ci, lab in zip(selected, labels):
|
|
rec = face_records[ci]
|
|
by_label.setdefault(int(lab), []).append(Path(rec["path"]))
|
|
|
|
for lab in big_labels[:6]:
|
|
paths_in = by_label[lab]
|
|
years = []
|
|
for p in paths_in:
|
|
y = exif_year(p)
|
|
if y is not None:
|
|
years.append(y)
|
|
n_paths = len(paths_in)
|
|
n_years = len(years)
|
|
if years:
|
|
ys = np.array(years)
|
|
ymin, ymax = int(ys.min()), int(ys.max())
|
|
ymed = int(np.median(ys))
|
|
yhist = Counter(years)
|
|
top_years = ", ".join(f"{y}:{c}" for y, c in sorted(yhist.most_common(5)))
|
|
else:
|
|
ymin = ymax = ymed = None
|
|
top_years = ""
|
|
print(
|
|
f" cluster {lab}: {n_paths} faces, EXIF on {n_years}/{n_paths}, "
|
|
f"year range {ymin}..{ymax} (median {ymed})"
|
|
)
|
|
print(f" top years: {top_years}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|