"""Extend the existing 6 era buckets of faceset_001 by absorbing PNGs that post-date the original age_split run (from consolidation merges, etc.). Mirrors the anchor-fragment assignment logic in age_split_001.py: - For each unbucketed face in faceset_001's manifest, find the nearest active era anchor by cosine distance to the anchor's centroid. - Accept the assignment iff dist <= 0.40 AND |year_delta| <= 5 (where year_delta = exif_year(face) - dom_year(anchor)). - Undated PNGs are skipped (no assignment). - Anchors are NOT re-centered after absorption (preserves the same drift guarantees as the original age_split). CLI: python work/age_extend_001.py analyze --out work/age_extend/candidates.json python work/age_extend_001.py report --candidates ... --out work/age_extend python work/age_extend_001.py apply --candidates ... [--dry-run] """ from __future__ import annotations import argparse import json import shutil import sys import time from collections import Counter from pathlib import Path import numpy as np from PIL import Image, ExifTags ROOT = Path("/mnt/e/temp_things/fcswp/nl_sorted/facesets_swap_ready") PARENT = "faceset_001" ACTIVE_ERAS = [ "faceset_001_2005-10", "faceset_001_2010-13", "faceset_001_2011", "faceset_001_2014-17", "faceset_001_2018-19", "faceset_001_2018-20", ] CACHES = [ Path("/opt/face-sets/work/cache/nl_full.npz"), Path("/opt/face-sets/work/cache/immich_peter.npz"), Path("/opt/face-sets/work/cache/immich_nic.npz"), ] EXIF_CACHE = Path("/opt/face-sets/work/cache/age_split_exif.json") # anchor-fragment thresholds (mirror age_split_001.py) DIST_MAX = 0.40 YEAR_MAX = 5 # ----------------------------- caches ----------------------------- def load_caches(): rec_index: dict[tuple[str, tuple[int, int, int, int]], np.ndarray] = {} alias_map: dict[str, str] = {} for c in CACHES: if not c.exists(): print(f"[warn] cache missing: {c}", file=sys.stderr) continue d = np.load(c, allow_pickle=True) emb = d["embeddings"] meta = json.loads(str(d["meta"])) face_records = [m for m in meta if not m.get("noface")] if len(face_records) != len(emb): raise SystemExit(f"meta/emb mismatch in {c}: {len(face_records)} vs {len(emb)}") if "path_aliases" in d.files: paliases = json.loads(str(d["path_aliases"])) for canon, alist in paliases.items(): alias_map.setdefault(canon, canon) for a in alist: alias_map[a] = canon for i, rec in enumerate(face_records): p = rec["path"] bbox = tuple(int(x) for x in rec["bbox"]) v = emb[i].astype(np.float32) n = float(np.linalg.norm(v)) if n > 0: v = v / n rec_index[(p, bbox)] = v alias_map.setdefault(p, p) print(f"[cache] indexed {len(rec_index)} face records, {len(alias_map)} aliases", file=sys.stderr) return rec_index, alias_map def lookup_emb(rec_index, alias_map, src: str, bbox): bbox_t = tuple(int(x) for x in bbox) canon = alias_map.get(src, src) v = rec_index.get((canon, bbox_t)) if v is None and canon != src: v = rec_index.get((src, bbox_t)) return v # ----------------------------- exif ----------------------------- def load_exif_cache(): if not EXIF_CACHE.exists(): return {} return json.loads(EXIF_CACHE.read_text()) def save_exif_cache(cache): tmp = EXIF_CACHE.with_suffix(".tmp.json") tmp.write_text(json.dumps(cache, indent=2)) tmp.replace(EXIF_CACHE) def exif_year(path: Path) -> int | None: try: with Image.open(path) as im: ex = im._getexif() if not ex: return None for tag_id, val in ex.items(): tag = ExifTags.TAGS.get(tag_id, tag_id) if tag == "DateTimeOriginal" and isinstance(val, str) and len(val) >= 4: return int(val[:4]) except Exception: return None return None def get_year(src: str, exif_cache) -> int | None: """Return EXIF year for src, using cache. Mutates cache for new lookups.""" if src in exif_cache: return exif_cache[src] p = Path(src) y = exif_year(p) if p.exists() else None exif_cache[src] = y return y # ----------------------------- analyze ----------------------------- def cmd_analyze(args): rec_index, alias_map = load_caches() exif_cache = load_exif_cache() exif_cache_dirty = False parent_dir = ROOT / PARENT parent_manifest = json.loads((parent_dir / "manifest.json").read_text()) parent_faces = parent_manifest.get("faces", []) print(f"[parent] {PARENT}: {len(parent_faces)} face entries", file=sys.stderr) # Build "in_bucket" set + each anchor's centroid + dom_year anchors = [] in_bucket: set[tuple[str, tuple[int, int, int, int]]] = set() for era in ACTIVE_ERAS: ed = ROOT / era if not ed.is_dir(): print(f"[warn] missing era bucket: {era}", file=sys.stderr) continue em = json.loads((ed / "manifest.json").read_text()) emb_list = [] years = [] n_missing_emb = 0 for f in em.get("faces", []): src = f.get("source") bbox = f.get("bbox") if not src or not bbox: continue key = (alias_map.get(src, src), tuple(int(x) for x in bbox)) in_bucket.add(key) in_bucket.add((src, tuple(int(x) for x in bbox))) # cover both alias and raw v = lookup_emb(rec_index, alias_map, src, bbox) if v is None: n_missing_emb += 1 else: emb_list.append(v) y = get_year(src, exif_cache) if y is None: exif_cache_dirty = True else: years.append(y) if src not in exif_cache: exif_cache_dirty = True if not emb_list: print(f"[warn] {era}: no embeddings found, skipping anchor", file=sys.stderr) continue arr = np.stack(emb_list).astype(np.float32) c = arr.mean(axis=0) n = float(np.linalg.norm(c)) if n > 0: c = c / n dom_year = Counter(years).most_common(1)[0][0] if years else None anchors.append({ "name": era, "centroid": c, "n_faces": len(em.get("faces", [])), "n_emb_used": len(emb_list), "n_emb_missing": n_missing_emb, "dom_year": dom_year, "year_min": min(years) if years else None, "year_max": max(years) if years else None, }) print(f"[anchor] {era}: n={len(em.get('faces', []))} emb_used={len(emb_list)} " f"emb_miss={n_missing_emb} dom_year={dom_year} years=[{min(years) if years else '-'}..{max(years) if years else '-'}]", file=sys.stderr) # Find unbucketed faces in parent unbucketed = [] for f in parent_faces: src = f.get("source") bbox = f.get("bbox") if not src or not bbox: continue bbox_t = tuple(int(x) for x in bbox) key1 = (alias_map.get(src, src), bbox_t) key2 = (src, bbox_t) if key1 in in_bucket or key2 in in_bucket: continue unbucketed.append(f) print(f"[parent] {len(unbucketed)} unbucketed face entries (in {PARENT} but no era bucket)", file=sys.stderr) # Score each unbucketed face against every anchor proposals = [] skipped_no_emb = 0 skipped_no_year = 0 for f in unbucketed: src = f["source"] bbox = f["bbox"] v = lookup_emb(rec_index, alias_map, src, bbox) if v is None: skipped_no_emb += 1 continue y = get_year(src, exif_cache) if y is None: skipped_no_year += 1 exif_cache_dirty = True continue if src not in exif_cache: exif_cache_dirty = True # nearest anchor best = None # (dist, idx) for i, a in enumerate(anchors): d = 1.0 - float(np.dot(a["centroid"], v)) if best is None or d < best[0]: best = (d, i) if best is None: continue dist, bidx = best anchor = anchors[bidx] year_delta = abs(y - anchor["dom_year"]) if anchor["dom_year"] is not None else None accept = (dist <= DIST_MAX and year_delta is not None and year_delta <= YEAR_MAX) proposals.append({ "png": f["png"], "source": src, "bbox": [int(x) for x in bbox], "year": y, "rank_in_parent": f.get("rank"), "quality_composite": f.get("quality", {}).get("composite"), "quality": f.get("quality", {}), "best_anchor": anchor["name"], "best_anchor_dom_year": anchor["dom_year"], "centroid_dist": round(dist, 4), "year_delta": year_delta, "accept": bool(accept), "all_anchor_dists": { a["name"]: round(1.0 - float(np.dot(a["centroid"], v)), 4) for a in anchors }, }) if exif_cache_dirty: save_exif_cache(exif_cache) print(f"[exif] cache flushed ({len(exif_cache)} entries total)", file=sys.stderr) # Summarize accepted = [p for p in proposals if p["accept"]] rejected = [p for p in proposals if not p["accept"]] by_anchor = Counter(p["best_anchor"] for p in accepted) print(f"[summary] unbucketed={len(unbucketed)} scored={len(proposals)} " f"accepted={len(accepted)} rejected={len(rejected)} " f"skipped(no_emb={skipped_no_emb}, no_year={skipped_no_year})", file=sys.stderr) for k, v in by_anchor.most_common(): print(f" {k}: +{v}", file=sys.stderr) out = { "thresholds": {"dist_max": DIST_MAX, "year_max": YEAR_MAX}, "anchors": [ {k: v for k, v in a.items() if k != "centroid"} for a in anchors ], "n_unbucketed": len(unbucketed), "skipped": {"no_emb": skipped_no_emb, "no_year": skipped_no_year}, "proposals": sorted(proposals, key=lambda p: (not p["accept"], p["best_anchor"], -1 * (p["quality_composite"] or 0))), "by_anchor": dict(by_anchor), } op = Path(args.out) op.parent.mkdir(parents=True, exist_ok=True) op.write_text(json.dumps(out, indent=2)) print(f"[done] {len(proposals)} proposals -> {op}", file=sys.stderr) # ----------------------------- report ----------------------------- def cmd_report(args): cand = json.loads(Path(args.candidates).read_text()) out_dir = Path(args.out) thumbs_dir = out_dir / "thumbs" thumbs_dir.mkdir(parents=True, exist_ok=True) THUMB = 140 def make_thumb(png_relpath: str) -> str: # png_relpath looks like "faces/0042.png" src = ROOT / PARENT / png_relpath name = Path(png_relpath).stem dst = thumbs_dir / f"{name}.jpg" if not dst.exists(): try: img = Image.open(src).convert("RGB") img.thumbnail((THUMB, THUMB), Image.LANCZOS) img.save(dst, "JPEG", quality=82) except Exception as e: print(f"[thumb-skip] {src}: {e}", file=sys.stderr) return "" return f"thumbs/{name}.jpg" # group accepted proposals by target anchor by_anchor: dict[str, list] = {} rejected = [] for p in cand["proposals"]: if p["accept"]: by_anchor.setdefault(p["best_anchor"], []).append(p) else: rejected.append(p) rows = [] rows.append("

faceset_001 age extension — review

") rows.append(f"

{cand['n_unbucketed']} unbucketed faces in {PARENT}; " f"{sum(len(v) for v in by_anchor.values())} accepted / {len(rejected)} rejected; " f"thresholds dist≤{cand['thresholds']['dist_max']} AND |year_delta|≤{cand['thresholds']['year_max']}.

") nav = " · ".join(f"{a} (+{len(by_anchor[a])})" for a in by_anchor) + " · rejected" rows.append(f"") for anchor_name in ACTIVE_ERAS: if anchor_name not in by_anchor: continue items = by_anchor[anchor_name] anchor_meta = next((a for a in cand["anchors"] if a["name"] == anchor_name), {}) rows.append(f"
") rows.append(f"

{anchor_name} (dom_year={anchor_meta.get('dom_year')}; " f"existing n={anchor_meta.get('n_faces')}; +{len(items)} new)

") rows.append("
") for p in sorted(items, key=lambda x: (x["centroid_dist"], -1 * (x["quality_composite"] or 0))): thumb = make_thumb(p["png"]) cls = "hi" if p["centroid_dist"] <= 0.30 else "mid" rows.append( f"
" f"" f"
{p['png']}
year {p['year']} (Δ{p['year_delta']})
" f"dist {p['centroid_dist']:.3f}
" f"
" ) rows.append("
") if rejected: rows.append("
") rows.append(f"

rejected ({len(rejected)} faces don't fit any anchor)

") rows.append("
") for p in sorted(rejected, key=lambda x: x["centroid_dist"])[:200]: thumb = make_thumb(p["png"]) why = [] if p["centroid_dist"] > cand['thresholds']['dist_max']: why.append(f"dist {p['centroid_dist']:.2f}>{cand['thresholds']['dist_max']}") if p["year_delta"] is None or p["year_delta"] > cand['thresholds']['year_max']: why.append(f"yΔ{p['year_delta']}>{cand['thresholds']['year_max']}") rows.append( f"
" f"" f"
{p['png']}
year {p['year']} → best {p['best_anchor']}
" f"{'; '.join(why)}
" f"
" ) if len(rejected) > 200: rows.append(f"

...{len(rejected)-200} more truncated.

") rows.append("
") html = f""" faceset_001 age extension {''.join(rows)} """ out_html = out_dir / "index.html" out_html.write_text(html) print(f"[done] {out_html}", file=sys.stderr) # ----------------------------- apply ----------------------------- def _zip_png_list(pngs: list[Path], zip_path: Path) -> None: import zipfile with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=4) as zf: for i, p in enumerate(pngs): zf.write(p, arcname=f"{i:04d}.png") def cmd_apply(args): cand = json.loads(Path(args.candidates).read_text()) accepted = [p for p in cand["proposals"] if p["accept"]] if args.dry_run: from collections import Counter as C by = C(p["best_anchor"] for p in accepted) print(f"=== dry-run: {len(accepted)} assignments across {len(by)} anchors ===") for k, v in by.most_common(): print(f" {k}: +{v}") return parent_dir = ROOT / PARENT master_path = ROOT / "manifest.json" master = json.loads(master_path.read_text()) facesets_by_name = {f["name"]: f for f in master.get("facesets", [])} by_anchor: dict[str, list] = {} for p in accepted: by_anchor.setdefault(p["best_anchor"], []).append(p) total_added = 0 for anchor_name, props in by_anchor.items(): ed = ROOT / anchor_name em_path = ed / "manifest.json" em = json.loads(em_path.read_text()) existing = list(em.get("faces", [])) # gather new entries with their source PNG paths in faceset_001/faces/ new_with_src = [] for p in props: src_png = parent_dir / p["png"] if not src_png.exists(): print(f"[warn] missing parent PNG {src_png}; skip", file=sys.stderr) continue face_entry = { "source": p["source"], "bbox": p["bbox"], "quality": p["quality"], "exif_year": p["year"], "centroid_dist_at_assign": p["centroid_dist"], "year_delta_at_assign": p["year_delta"], "extended_from_parent": True, } new_with_src.append((face_entry, src_png)) # combine; rank by quality.composite desc (existing entries already have rank, # but we re-rank globally so new entries slot in by quality) combined: list[tuple[dict, Path | None]] = [] for f in existing: combined.append((f, None)) combined.extend(new_with_src) combined.sort(key=lambda x: -x[0].get("quality", {}).get("composite", 0)) # stage fresh staging = ed / "_faces_new" if staging.exists(): shutil.rmtree(staging) staging.mkdir() new_face_entries = [] for new_rank, (face, src_png_or_none) in enumerate(combined, start=1): new_name = f"{new_rank:04d}.png" if src_png_or_none is None: # existing entry: copy from current era bucket faces/ old_name = Path(face["png"]).name src = ed / "faces" / old_name if not src.exists(): print(f"[warn] {anchor_name}: missing existing PNG {src}; skip", file=sys.stderr) continue shutil.copy2(src, staging / new_name) else: shutil.copy2(src_png_or_none, staging / new_name) face = dict(face) face["rank"] = new_rank face["png"] = f"faces/{new_name}" new_face_entries.append(face) # swap dirs old_holding = ed / "_faces_old" if old_holding.exists(): shutil.rmtree(old_holding) (ed / "faces").rename(old_holding) staging.rename(ed / "faces") shutil.rmtree(old_holding) # re-zip .fsz survivor_pngs = sorted((ed / "faces").glob("*.png")) top_n = em.get("top_n", 30) top_n_eff = min(top_n, len(survivor_pngs)) for old in ed.glob("*.fsz"): old.unlink() top_fsz_name = f"{anchor_name}_top{top_n_eff}.fsz" all_fsz_name = f"{anchor_name}_all.fsz" _zip_png_list(survivor_pngs[:top_n_eff], ed / top_fsz_name) if len(survivor_pngs) > top_n_eff: _zip_png_list(survivor_pngs, ed / all_fsz_name) all_fsz_used = all_fsz_name else: all_fsz_used = None # update local + master manifests em["faces"] = new_face_entries em["exported"] = len(new_face_entries) em["fsz_top"] = top_fsz_name em["fsz_all"] = all_fsz_used em["top_n"] = top_n_eff em.setdefault("age_extend_history", []).append({ "added": len(new_with_src), "thresholds": cand["thresholds"], }) em_path.write_text(json.dumps(em, indent=2)) if anchor_name in facesets_by_name: facesets_by_name[anchor_name]["exported"] = len(new_face_entries) facesets_by_name[anchor_name]["fsz_top"] = top_fsz_name facesets_by_name[anchor_name]["fsz_all"] = all_fsz_used facesets_by_name[anchor_name]["top_n"] = top_n_eff added_here = len(new_with_src) total_added += added_here print(f"[applied] {anchor_name}: +{added_here} (now {len(new_face_entries)} faces)", file=sys.stderr) # rewrite master with ordering preserved new_facesets = [] for entry in master.get("facesets", []): new_facesets.append(facesets_by_name.get(entry["name"], entry)) master["facesets"] = new_facesets master.setdefault("age_extend_runs", []).append({ "parent": PARENT, "thresholds": cand["thresholds"], "anchors": list(by_anchor.keys()), "added_total": total_added, }) tmp = master_path.with_suffix(".tmp.json") tmp.write_text(json.dumps(master, indent=2)) tmp.replace(master_path) print(f"[done] +{total_added} faces across {len(by_anchor)} anchors", file=sys.stderr) # ----------------------------- main ----------------------------- def main(): ap = argparse.ArgumentParser() sub = ap.add_subparsers(dest="cmd", required=True) a = sub.add_parser("analyze") a.add_argument("--out", required=True) a.set_defaults(func=cmd_analyze) r = sub.add_parser("report") r.add_argument("--candidates", required=True) r.add_argument("--out", required=True) r.set_defaults(func=cmd_report) p = sub.add_parser("apply") p.add_argument("--candidates", required=True) p.add_argument("--dry-run", action="store_true") p.set_defaults(func=cmd_apply) args = ap.parse_args() args.func(args) if __name__ == "__main__": main()