Files
face-sets/work/cluster_immich.py
Peter 321fed01cc Add Immich import pipeline (WSL stage + Windows DML embed + cluster)
Three-piece workflow that imports a self-hosted Immich library and emits
new facesets without disturbing existing identity numbering:

- work/immich_stage.py (WSL): pages /search/metadata, parallel-fetches
  /faces?id= per asset, prefilters by face_short>=90 against bbox scaled
  to original-image coords, downloads originals, sha256-dedups against
  nl_full.npz and same-run staged files. 8-worker ThreadPoolExecutor
  doing the full /faces->filter->/original chain per asset; resumable
  via state.json. API URL + key come from IMMICH_URL / IMMICH_API_KEY
  env vars, label->UUID map from work/immich/users.json (gitignored).
- work/embed_worker.py (Windows venv at C:\face_embed_venv): runs
  insightface.FaceAnalysis(buffalo_l) with the DmlExecutionProvider on
  AMD Radeon Vega via onnxruntime-directml. Produces a cache file in
  the same .npz schema as sort_faces.cmd_embed (loadable via
  load_cache). ~7.5x speedup over CPU end-to-end; embeddings bit-
  identical to CPU (cosine similarity 1.0000 across 8 sample faces).
- work/cluster_immich.py (WSL): mirrors cluster_osrc.py against an
  immich_<user>.npz. Builds existing identity centroids from canonical
  faceset_NNN/ in facesets_swap_ready/, drops matches at <=0.45,
  clusters the rest at 0.55, applies refine gates, hands off to
  cmd_export_swap. Numbers new facesets past the existing maximum.
- work/finalize_immich.sh: chains queue->Windows embed->cache copy->
  cluster_immich, with logging.

The 2026-04-26 run on https://fotos.computerliebe.org (Immich v2.7.2)
processed 53,842 admin-accessible assets, staged 10,261, embedded
19,462 face records on Vega DML in 64.6 min, matched 8,103 (42%) to
existing identities, and emitted 185 new facesets (faceset_026..264
with gaps). facesets_swap_ready/ went from 31 to 216 substantive
facesets.

Important caveat surfaced: /search/metadata's userIds filter is
silently ignored when the API key is bound to a different user, so
this run can't enumerate other users' libraries from the admin key.
A per-user API key would be required for nic.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 18:14:26 +02:00

341 lines
13 KiB
Python

#!/usr/bin/env python3
"""Discover new identities in an Immich-sourced cache and emit them as facesets.
Mirrors `work/cluster_osrc.py`, but the source corpus is an arbitrary
Immich user's `immich_<user>.npz` cache produced by the Windows DML embed
worker. Existing identity centroids come from the union of every faceset
already in `facesets_swap_ready/` (faceset_001..NNN, both auto-clustered
and hand-sorted).
Pipeline:
1. Load immich_<user>.npz; restrict to face records (drop noface).
2. Build centroids of every existing canonical faceset in
facesets_swap_ready/ (skip era splits and _thin/).
3. Drop immich faces whose nearest existing centroid is within
EXISTING_MATCH_THRESHOLD; those are already covered by the canonical set.
4. Cluster the remaining among themselves at INITIAL_THRESHOLD.
5. Per cluster: refine-equivalent gates (face_short, blur, det_score),
plus outlier rejection at OUTLIER_THRESHOLD for clusters of size >= 4.
6. Keep clusters whose surviving unique source-path count is >= MIN_FACES.
7. Number kept clusters past the existing facesets_swap_ready/ max.
8. Synthesize a refine_manifest, hand off to cmd_export_swap, move dirs into
facesets_swap_ready/, drop a provenance marker, append to top-level
manifest.json (preserving facesets / thin_eras).
"""
from __future__ import annotations
import argparse
import json
import shutil
import sys
from pathlib import Path
import numpy as np
REPO = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO))
from sort_faces import ( # noqa: E402
_cluster_embeddings,
cmd_export_swap,
load_cache,
)
# ---- config -------------------------------------------------------------- #
REPO_WORK = REPO / "work"
SWAP_READY = Path("/mnt/e/temp_things/fcswp/nl_sorted/facesets_swap_ready")
EXISTING_MATCH_THRESHOLD = 0.45
INITIAL_THRESHOLD = 0.55
MIN_FACES = 6
MIN_SHORT = 90
MIN_BLUR = 40.0
MIN_DET_SCORE = 0.6
OUTLIER_THRESHOLD = 0.55
TOP_N = 30
EXPORT_OUTLIER_THRESHOLD = 0.45
PAD_RATIO = 0.5
OUT_SIZE = 512
EXPORT_MIN_FACE_SHORT = 100
# ---- helpers ------------------------------------------------------------- #
def _normalize(v: np.ndarray) -> np.ndarray:
n = np.linalg.norm(v)
return v / n if n > 0 else v
def _existing_identity_centroids(
nl_cache: Path,
) -> tuple[np.ndarray, list[str]]:
"""Build identity centroids from every canonical faceset_NNN/ in
facesets_swap_ready/. Era-split sub-dirs (faceset_001_<era>) and the
_thin/ quarantine are skipped. Each faceset's manifest.json provides
(source, bbox) keys we use to look up rows in nl_full.npz."""
emb, meta, _src, _proc, _aliases = load_cache(nl_cache)
face_records = [m for m in meta if not m.get("noface")]
if len(face_records) != len(emb):
raise SystemExit(f"meta/embedding mismatch in {nl_cache}: {len(face_records)} vs {len(emb)}")
bbox_idx = {(m["path"], tuple(m.get("bbox") or ())): i for i, m in enumerate(face_records)}
centroids: list[np.ndarray] = []
names: list[str] = []
for d in sorted(SWAP_READY.iterdir()):
if not d.is_dir():
continue
if d.name.startswith("_"):
continue
# Skip era-split sub-facesets (faceset_NNN_*).
if d.name.startswith("faceset_") and "_" in d.name[len("faceset_"):]:
continue
man = d / "manifest.json"
if not man.exists():
continue
try:
entries = json.loads(man.read_text()).get("faces", [])
except Exception:
continue
keys = [(f["source"], tuple(f.get("bbox") or ())) for f in entries]
idxs = [bbox_idx[k] for k in keys if k in bbox_idx]
if not idxs:
continue
centroids.append(_normalize(emb[idxs].mean(axis=0)))
names.append(d.name)
if not centroids:
raise SystemExit("no canonical identity centroids could be built; check facesets_swap_ready/")
return np.stack(centroids), names
def _next_faceset_number() -> int:
nums = []
for d in SWAP_READY.iterdir():
if not d.is_dir() or not d.name.startswith("faceset_"):
continue
tail = d.name[len("faceset_"):]
# Take only top-level numbered facesets (no era suffix).
if "_" in tail:
continue
try:
nums.append(int(tail))
except ValueError:
continue
return (max(nums) + 1) if nums else 1
# ---- phase 1: discover --------------------------------------------------- #
def discover_new_clusters(
immich_cache: Path, nl_cache: Path, start_nnn: int, source_label: str
) -> tuple[dict, list[dict]]:
print(f"loading immich cache: {immich_cache}")
emb, meta, _src, _proc, _aliases = load_cache(immich_cache)
face_records = [m for m in meta if not m.get("noface")]
if len(face_records) != len(emb):
raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}")
print(f" {len(face_records)} face records, {sum(1 for m in meta if m.get('noface'))} noface")
print(f"building existing-identity centroids from {SWAP_READY}")
cents, cent_names = _existing_identity_centroids(nl_cache)
print(f" {len(cent_names)} canonical centroids")
sims = emb @ cents.T
nearest_d = 1.0 - sims.max(axis=1)
nearest_id = sims.argmax(axis=1)
covered = nearest_d <= EXISTING_MATCH_THRESHOLD
print(f"\nfaces already covered (cos-dist <= {EXISTING_MATCH_THRESHOLD}): "
f"{int(covered.sum())}/{len(emb)}")
for j, name in enumerate(cent_names):
c = int(((nearest_id == j) & covered).sum())
if c:
print(f" -> {name}: {c}")
new_idx = [i for i in range(len(emb)) if not covered[i]]
print(f"\nunmatched immich faces to cluster: {len(new_idx)}")
if len(new_idx) <= 1:
labels = np.zeros(len(new_idx), dtype=int)
else:
labels = _cluster_embeddings(emb[new_idx], INITIAL_THRESHOLD)
n_clusters = len(set(int(l) for l in labels))
sizes = sorted([int((labels == l).sum()) for l in set(labels)], reverse=True)
print(f"clusters at threshold {INITIAL_THRESHOLD}: {n_clusters} "
f"top sizes: {sizes[:10]}")
clusters: dict[int, list[int]] = {}
for k, lab in enumerate(labels):
clusters.setdefault(int(lab), []).append(new_idx[k])
kept: list[dict] = []
drop_quality_total = 0
drop_outlier_total = 0
for cid, idxs in clusters.items():
good: list[int] = []
for i in idxs:
r = face_records[i]
if r.get("face_short", 0) < MIN_SHORT:
drop_quality_total += 1; continue
if r.get("blur", 0.0) < MIN_BLUR:
drop_quality_total += 1; continue
if r.get("det_score", 0.0) < MIN_DET_SCORE:
drop_quality_total += 1; continue
good.append(i)
if not good:
continue
if len(good) >= 4:
cent = _normalize(emb[good].mean(axis=0))
d = 1.0 - emb[good] @ cent
tight = [good[k] for k, dist in enumerate(d) if dist <= OUTLIER_THRESHOLD]
drop_outlier_total += len(good) - len(tight)
good = tight
if not good:
continue
unique_paths = sorted({face_records[i]["path"] for i in good})
if len(unique_paths) < MIN_FACES:
continue
kept.append({
"indices": good,
"unique_paths": unique_paths,
"size_face": len(good),
"size_paths": len(unique_paths),
})
kept.sort(key=lambda c: -c["size_paths"])
print(f"\nafter quality+outlier+min_faces: {len(kept)} clusters kept "
f"(dropped: quality={drop_quality_total} outlier={drop_outlier_total})")
for rank, c in enumerate(kept, start=start_nnn):
print(f" faceset_{rank:03d}: faces={c['size_face']:3d} "
f"unique_paths={c['size_paths']:3d}")
facesets = [
{
"name": f"faceset_{rank:03d}",
"image_count": c["size_paths"],
"face_count": c["size_face"],
"images": c["unique_paths"],
}
for rank, c in enumerate(kept, start=start_nnn)
]
manifest = {
"params": {
"existing_match_threshold": EXISTING_MATCH_THRESHOLD,
"initial_threshold": INITIAL_THRESHOLD,
"outlier_threshold": OUTLIER_THRESHOLD,
"min_faces": MIN_FACES,
"min_short": MIN_SHORT,
"min_blur": MIN_BLUR,
"min_det_score": MIN_DET_SCORE,
"source_label": source_label,
"source_cache": str(immich_cache),
},
"facesets": facesets,
}
return manifest, kept
# ---- phase 2: export + relocate ----------------------------------------- #
def export_and_relocate(manifest: dict, immich_cache: Path, source_label: str) -> None:
synth_path = REPO_WORK / f"synthetic_{source_label}_manifest.json"
synth_path.write_text(json.dumps(manifest, indent=2))
print(f"\nsynthetic manifest -> {synth_path}")
out_tmp = SWAP_READY.parent / f"facesets_swap_ready_{source_label}_new"
if out_tmp.exists():
shutil.rmtree(out_tmp)
out_tmp.mkdir(parents=True)
print(f"running cmd_export_swap -> {out_tmp}")
cmd_export_swap(
cache_path=immich_cache,
refine_manifest_path=synth_path,
raw_manifest_path=None,
out_dir=out_tmp,
top_n=TOP_N,
outlier_threshold=EXPORT_OUTLIER_THRESHOLD,
pad_ratio=PAD_RATIO,
out_size=OUT_SIZE,
include_candidates=False,
candidate_match_threshold=0.55,
candidate_min_score=0.40,
min_face_short=EXPORT_MIN_FACE_SHORT,
)
new_top = json.loads((out_tmp / "manifest.json").read_text())
new_entries = new_top.get("facesets", [])
moved = 0
for fs_meta in new_entries:
name = fs_meta["name"]
src_dir = out_tmp / name
if not src_dir.exists():
print(f"[{name}] export dir missing; skipping")
continue
dst_dir = SWAP_READY / name
if dst_dir.exists():
print(f"[{name}] {dst_dir} already exists; refusing to overwrite")
continue
(src_dir / f"immich_{source_label}.txt").write_text(
f"{name}\n\nSource: Immich user {source_label} cluster (auto-discovered).\n"
)
shutil.move(str(src_dir), str(dst_dir))
moved += 1
print(f"[{name}] -> {dst_dir}")
final_manifest_path = SWAP_READY / "manifest.json"
if final_manifest_path.exists():
existing = json.loads(final_manifest_path.read_text())
else:
existing = {"facesets": []}
existing.setdefault("facesets", [])
existing_names = {fs["name"] for fs in existing["facesets"]}
appended = 0
for entry in new_entries:
if entry["name"] in existing_names:
print(f"[manifest] {entry['name']} already present; not duplicating")
continue
existing["facesets"].append(entry)
appended += 1
final_manifest_path.write_text(json.dumps(existing, indent=2))
print(f"\nmerged manifest: appended {appended} entries -> {final_manifest_path}")
print(f"moved {moved} faceset directories into {SWAP_READY}")
if out_tmp.exists() and not list(out_tmp.iterdir()):
out_tmp.rmdir()
# ---- main ---------------------------------------------------------------- #
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument("immich_cache", type=Path,
help="path to immich_<user>.npz produced by the embed worker")
p.add_argument("--nl-cache", type=Path, default=REPO_WORK / "cache" / "nl_full.npz",
help="canonical cache for existing identity centroids")
p.add_argument("--source-label", default=None,
help="short label used in marker filenames; default = stem of immich_cache")
p.add_argument("--start-nnn", type=int, default=None,
help="first faceset number to assign; default = current max+1 in facesets_swap_ready/")
p.add_argument("--dry-run", action="store_true")
args = p.parse_args()
label = args.source_label or args.immich_cache.stem.removeprefix("immich_") or args.immich_cache.stem
start_nnn = args.start_nnn if args.start_nnn is not None else _next_faceset_number()
print(f"source label: {label!r}; first faceset number: {start_nnn:03d}")
manifest, kept = discover_new_clusters(args.immich_cache, args.nl_cache, start_nnn, label)
if args.dry_run:
print("\n--dry-run: stopping after cluster discovery (no exports written).")
return
if not manifest.get("facesets"):
print("no new facesets to build.")
return
export_and_relocate(manifest, args.immich_cache, label)
print("\nDone.")
if __name__ == "__main__":
main()