Three-piece workflow that imports a self-hosted Immich library and emits new facesets without disturbing existing identity numbering: - work/immich_stage.py (WSL): pages /search/metadata, parallel-fetches /faces?id= per asset, prefilters by face_short>=90 against bbox scaled to original-image coords, downloads originals, sha256-dedups against nl_full.npz and same-run staged files. 8-worker ThreadPoolExecutor doing the full /faces->filter->/original chain per asset; resumable via state.json. API URL + key come from IMMICH_URL / IMMICH_API_KEY env vars, label->UUID map from work/immich/users.json (gitignored). - work/embed_worker.py (Windows venv at C:\face_embed_venv): runs insightface.FaceAnalysis(buffalo_l) with the DmlExecutionProvider on AMD Radeon Vega via onnxruntime-directml. Produces a cache file in the same .npz schema as sort_faces.cmd_embed (loadable via load_cache). ~7.5x speedup over CPU end-to-end; embeddings bit- identical to CPU (cosine similarity 1.0000 across 8 sample faces). - work/cluster_immich.py (WSL): mirrors cluster_osrc.py against an immich_<user>.npz. Builds existing identity centroids from canonical faceset_NNN/ in facesets_swap_ready/, drops matches at <=0.45, clusters the rest at 0.55, applies refine gates, hands off to cmd_export_swap. Numbers new facesets past the existing maximum. - work/finalize_immich.sh: chains queue->Windows embed->cache copy-> cluster_immich, with logging. The 2026-04-26 run on https://fotos.computerliebe.org (Immich v2.7.2) processed 53,842 admin-accessible assets, staged 10,261, embedded 19,462 face records on Vega DML in 64.6 min, matched 8,103 (42%) to existing identities, and emitted 185 new facesets (faceset_026..264 with gaps). facesets_swap_ready/ went from 31 to 216 substantive facesets. Important caveat surfaced: /search/metadata's userIds filter is silently ignored when the API key is bound to a different user, so this run can't enumerate other users' libraries from the admin key. A per-user API key would be required for nic. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
341 lines
13 KiB
Python
341 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""Discover new identities in an Immich-sourced cache and emit them as facesets.
|
|
|
|
Mirrors `work/cluster_osrc.py`, but the source corpus is an arbitrary
|
|
Immich user's `immich_<user>.npz` cache produced by the Windows DML embed
|
|
worker. Existing identity centroids come from the union of every faceset
|
|
already in `facesets_swap_ready/` (faceset_001..NNN, both auto-clustered
|
|
and hand-sorted).
|
|
|
|
Pipeline:
|
|
1. Load immich_<user>.npz; restrict to face records (drop noface).
|
|
2. Build centroids of every existing canonical faceset in
|
|
facesets_swap_ready/ (skip era splits and _thin/).
|
|
3. Drop immich faces whose nearest existing centroid is within
|
|
EXISTING_MATCH_THRESHOLD; those are already covered by the canonical set.
|
|
4. Cluster the remaining among themselves at INITIAL_THRESHOLD.
|
|
5. Per cluster: refine-equivalent gates (face_short, blur, det_score),
|
|
plus outlier rejection at OUTLIER_THRESHOLD for clusters of size >= 4.
|
|
6. Keep clusters whose surviving unique source-path count is >= MIN_FACES.
|
|
7. Number kept clusters past the existing facesets_swap_ready/ max.
|
|
8. Synthesize a refine_manifest, hand off to cmd_export_swap, move dirs into
|
|
facesets_swap_ready/, drop a provenance marker, append to top-level
|
|
manifest.json (preserving facesets / thin_eras).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import shutil
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
|
|
REPO = Path(__file__).resolve().parent.parent
|
|
sys.path.insert(0, str(REPO))
|
|
|
|
from sort_faces import ( # noqa: E402
|
|
_cluster_embeddings,
|
|
cmd_export_swap,
|
|
load_cache,
|
|
)
|
|
|
|
# ---- config -------------------------------------------------------------- #
|
|
|
|
REPO_WORK = REPO / "work"
|
|
SWAP_READY = Path("/mnt/e/temp_things/fcswp/nl_sorted/facesets_swap_ready")
|
|
|
|
EXISTING_MATCH_THRESHOLD = 0.45
|
|
INITIAL_THRESHOLD = 0.55
|
|
|
|
MIN_FACES = 6
|
|
MIN_SHORT = 90
|
|
MIN_BLUR = 40.0
|
|
MIN_DET_SCORE = 0.6
|
|
OUTLIER_THRESHOLD = 0.55
|
|
|
|
TOP_N = 30
|
|
EXPORT_OUTLIER_THRESHOLD = 0.45
|
|
PAD_RATIO = 0.5
|
|
OUT_SIZE = 512
|
|
EXPORT_MIN_FACE_SHORT = 100
|
|
|
|
|
|
# ---- helpers ------------------------------------------------------------- #
|
|
|
|
def _normalize(v: np.ndarray) -> np.ndarray:
|
|
n = np.linalg.norm(v)
|
|
return v / n if n > 0 else v
|
|
|
|
|
|
def _existing_identity_centroids(
|
|
nl_cache: Path,
|
|
) -> tuple[np.ndarray, list[str]]:
|
|
"""Build identity centroids from every canonical faceset_NNN/ in
|
|
facesets_swap_ready/. Era-split sub-dirs (faceset_001_<era>) and the
|
|
_thin/ quarantine are skipped. Each faceset's manifest.json provides
|
|
(source, bbox) keys we use to look up rows in nl_full.npz."""
|
|
emb, meta, _src, _proc, _aliases = load_cache(nl_cache)
|
|
face_records = [m for m in meta if not m.get("noface")]
|
|
if len(face_records) != len(emb):
|
|
raise SystemExit(f"meta/embedding mismatch in {nl_cache}: {len(face_records)} vs {len(emb)}")
|
|
bbox_idx = {(m["path"], tuple(m.get("bbox") or ())): i for i, m in enumerate(face_records)}
|
|
|
|
centroids: list[np.ndarray] = []
|
|
names: list[str] = []
|
|
for d in sorted(SWAP_READY.iterdir()):
|
|
if not d.is_dir():
|
|
continue
|
|
if d.name.startswith("_"):
|
|
continue
|
|
# Skip era-split sub-facesets (faceset_NNN_*).
|
|
if d.name.startswith("faceset_") and "_" in d.name[len("faceset_"):]:
|
|
continue
|
|
man = d / "manifest.json"
|
|
if not man.exists():
|
|
continue
|
|
try:
|
|
entries = json.loads(man.read_text()).get("faces", [])
|
|
except Exception:
|
|
continue
|
|
keys = [(f["source"], tuple(f.get("bbox") or ())) for f in entries]
|
|
idxs = [bbox_idx[k] for k in keys if k in bbox_idx]
|
|
if not idxs:
|
|
continue
|
|
centroids.append(_normalize(emb[idxs].mean(axis=0)))
|
|
names.append(d.name)
|
|
if not centroids:
|
|
raise SystemExit("no canonical identity centroids could be built; check facesets_swap_ready/")
|
|
return np.stack(centroids), names
|
|
|
|
|
|
def _next_faceset_number() -> int:
|
|
nums = []
|
|
for d in SWAP_READY.iterdir():
|
|
if not d.is_dir() or not d.name.startswith("faceset_"):
|
|
continue
|
|
tail = d.name[len("faceset_"):]
|
|
# Take only top-level numbered facesets (no era suffix).
|
|
if "_" in tail:
|
|
continue
|
|
try:
|
|
nums.append(int(tail))
|
|
except ValueError:
|
|
continue
|
|
return (max(nums) + 1) if nums else 1
|
|
|
|
|
|
# ---- phase 1: discover --------------------------------------------------- #
|
|
|
|
def discover_new_clusters(
|
|
immich_cache: Path, nl_cache: Path, start_nnn: int, source_label: str
|
|
) -> tuple[dict, list[dict]]:
|
|
print(f"loading immich cache: {immich_cache}")
|
|
emb, meta, _src, _proc, _aliases = load_cache(immich_cache)
|
|
face_records = [m for m in meta if not m.get("noface")]
|
|
if len(face_records) != len(emb):
|
|
raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}")
|
|
print(f" {len(face_records)} face records, {sum(1 for m in meta if m.get('noface'))} noface")
|
|
|
|
print(f"building existing-identity centroids from {SWAP_READY}")
|
|
cents, cent_names = _existing_identity_centroids(nl_cache)
|
|
print(f" {len(cent_names)} canonical centroids")
|
|
|
|
sims = emb @ cents.T
|
|
nearest_d = 1.0 - sims.max(axis=1)
|
|
nearest_id = sims.argmax(axis=1)
|
|
covered = nearest_d <= EXISTING_MATCH_THRESHOLD
|
|
print(f"\nfaces already covered (cos-dist <= {EXISTING_MATCH_THRESHOLD}): "
|
|
f"{int(covered.sum())}/{len(emb)}")
|
|
for j, name in enumerate(cent_names):
|
|
c = int(((nearest_id == j) & covered).sum())
|
|
if c:
|
|
print(f" -> {name}: {c}")
|
|
|
|
new_idx = [i for i in range(len(emb)) if not covered[i]]
|
|
print(f"\nunmatched immich faces to cluster: {len(new_idx)}")
|
|
if len(new_idx) <= 1:
|
|
labels = np.zeros(len(new_idx), dtype=int)
|
|
else:
|
|
labels = _cluster_embeddings(emb[new_idx], INITIAL_THRESHOLD)
|
|
n_clusters = len(set(int(l) for l in labels))
|
|
sizes = sorted([int((labels == l).sum()) for l in set(labels)], reverse=True)
|
|
print(f"clusters at threshold {INITIAL_THRESHOLD}: {n_clusters} "
|
|
f"top sizes: {sizes[:10]}")
|
|
|
|
clusters: dict[int, list[int]] = {}
|
|
for k, lab in enumerate(labels):
|
|
clusters.setdefault(int(lab), []).append(new_idx[k])
|
|
|
|
kept: list[dict] = []
|
|
drop_quality_total = 0
|
|
drop_outlier_total = 0
|
|
for cid, idxs in clusters.items():
|
|
good: list[int] = []
|
|
for i in idxs:
|
|
r = face_records[i]
|
|
if r.get("face_short", 0) < MIN_SHORT:
|
|
drop_quality_total += 1; continue
|
|
if r.get("blur", 0.0) < MIN_BLUR:
|
|
drop_quality_total += 1; continue
|
|
if r.get("det_score", 0.0) < MIN_DET_SCORE:
|
|
drop_quality_total += 1; continue
|
|
good.append(i)
|
|
if not good:
|
|
continue
|
|
if len(good) >= 4:
|
|
cent = _normalize(emb[good].mean(axis=0))
|
|
d = 1.0 - emb[good] @ cent
|
|
tight = [good[k] for k, dist in enumerate(d) if dist <= OUTLIER_THRESHOLD]
|
|
drop_outlier_total += len(good) - len(tight)
|
|
good = tight
|
|
if not good:
|
|
continue
|
|
unique_paths = sorted({face_records[i]["path"] for i in good})
|
|
if len(unique_paths) < MIN_FACES:
|
|
continue
|
|
kept.append({
|
|
"indices": good,
|
|
"unique_paths": unique_paths,
|
|
"size_face": len(good),
|
|
"size_paths": len(unique_paths),
|
|
})
|
|
|
|
kept.sort(key=lambda c: -c["size_paths"])
|
|
print(f"\nafter quality+outlier+min_faces: {len(kept)} clusters kept "
|
|
f"(dropped: quality={drop_quality_total} outlier={drop_outlier_total})")
|
|
for rank, c in enumerate(kept, start=start_nnn):
|
|
print(f" faceset_{rank:03d}: faces={c['size_face']:3d} "
|
|
f"unique_paths={c['size_paths']:3d}")
|
|
|
|
facesets = [
|
|
{
|
|
"name": f"faceset_{rank:03d}",
|
|
"image_count": c["size_paths"],
|
|
"face_count": c["size_face"],
|
|
"images": c["unique_paths"],
|
|
}
|
|
for rank, c in enumerate(kept, start=start_nnn)
|
|
]
|
|
manifest = {
|
|
"params": {
|
|
"existing_match_threshold": EXISTING_MATCH_THRESHOLD,
|
|
"initial_threshold": INITIAL_THRESHOLD,
|
|
"outlier_threshold": OUTLIER_THRESHOLD,
|
|
"min_faces": MIN_FACES,
|
|
"min_short": MIN_SHORT,
|
|
"min_blur": MIN_BLUR,
|
|
"min_det_score": MIN_DET_SCORE,
|
|
"source_label": source_label,
|
|
"source_cache": str(immich_cache),
|
|
},
|
|
"facesets": facesets,
|
|
}
|
|
return manifest, kept
|
|
|
|
|
|
# ---- phase 2: export + relocate ----------------------------------------- #
|
|
|
|
def export_and_relocate(manifest: dict, immich_cache: Path, source_label: str) -> None:
|
|
synth_path = REPO_WORK / f"synthetic_{source_label}_manifest.json"
|
|
synth_path.write_text(json.dumps(manifest, indent=2))
|
|
print(f"\nsynthetic manifest -> {synth_path}")
|
|
|
|
out_tmp = SWAP_READY.parent / f"facesets_swap_ready_{source_label}_new"
|
|
if out_tmp.exists():
|
|
shutil.rmtree(out_tmp)
|
|
out_tmp.mkdir(parents=True)
|
|
|
|
print(f"running cmd_export_swap -> {out_tmp}")
|
|
cmd_export_swap(
|
|
cache_path=immich_cache,
|
|
refine_manifest_path=synth_path,
|
|
raw_manifest_path=None,
|
|
out_dir=out_tmp,
|
|
top_n=TOP_N,
|
|
outlier_threshold=EXPORT_OUTLIER_THRESHOLD,
|
|
pad_ratio=PAD_RATIO,
|
|
out_size=OUT_SIZE,
|
|
include_candidates=False,
|
|
candidate_match_threshold=0.55,
|
|
candidate_min_score=0.40,
|
|
min_face_short=EXPORT_MIN_FACE_SHORT,
|
|
)
|
|
|
|
new_top = json.loads((out_tmp / "manifest.json").read_text())
|
|
new_entries = new_top.get("facesets", [])
|
|
|
|
moved = 0
|
|
for fs_meta in new_entries:
|
|
name = fs_meta["name"]
|
|
src_dir = out_tmp / name
|
|
if not src_dir.exists():
|
|
print(f"[{name}] export dir missing; skipping")
|
|
continue
|
|
dst_dir = SWAP_READY / name
|
|
if dst_dir.exists():
|
|
print(f"[{name}] {dst_dir} already exists; refusing to overwrite")
|
|
continue
|
|
(src_dir / f"immich_{source_label}.txt").write_text(
|
|
f"{name}\n\nSource: Immich user {source_label} cluster (auto-discovered).\n"
|
|
)
|
|
shutil.move(str(src_dir), str(dst_dir))
|
|
moved += 1
|
|
print(f"[{name}] -> {dst_dir}")
|
|
|
|
final_manifest_path = SWAP_READY / "manifest.json"
|
|
if final_manifest_path.exists():
|
|
existing = json.loads(final_manifest_path.read_text())
|
|
else:
|
|
existing = {"facesets": []}
|
|
existing.setdefault("facesets", [])
|
|
existing_names = {fs["name"] for fs in existing["facesets"]}
|
|
appended = 0
|
|
for entry in new_entries:
|
|
if entry["name"] in existing_names:
|
|
print(f"[manifest] {entry['name']} already present; not duplicating")
|
|
continue
|
|
existing["facesets"].append(entry)
|
|
appended += 1
|
|
final_manifest_path.write_text(json.dumps(existing, indent=2))
|
|
print(f"\nmerged manifest: appended {appended} entries -> {final_manifest_path}")
|
|
print(f"moved {moved} faceset directories into {SWAP_READY}")
|
|
if out_tmp.exists() and not list(out_tmp.iterdir()):
|
|
out_tmp.rmdir()
|
|
|
|
|
|
# ---- main ---------------------------------------------------------------- #
|
|
|
|
def main() -> None:
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("immich_cache", type=Path,
|
|
help="path to immich_<user>.npz produced by the embed worker")
|
|
p.add_argument("--nl-cache", type=Path, default=REPO_WORK / "cache" / "nl_full.npz",
|
|
help="canonical cache for existing identity centroids")
|
|
p.add_argument("--source-label", default=None,
|
|
help="short label used in marker filenames; default = stem of immich_cache")
|
|
p.add_argument("--start-nnn", type=int, default=None,
|
|
help="first faceset number to assign; default = current max+1 in facesets_swap_ready/")
|
|
p.add_argument("--dry-run", action="store_true")
|
|
args = p.parse_args()
|
|
|
|
label = args.source_label or args.immich_cache.stem.removeprefix("immich_") or args.immich_cache.stem
|
|
start_nnn = args.start_nnn if args.start_nnn is not None else _next_faceset_number()
|
|
print(f"source label: {label!r}; first faceset number: {start_nnn:03d}")
|
|
|
|
manifest, kept = discover_new_clusters(args.immich_cache, args.nl_cache, start_nnn, label)
|
|
if args.dry_run:
|
|
print("\n--dry-run: stopping after cluster discovery (no exports written).")
|
|
return
|
|
if not manifest.get("facesets"):
|
|
print("no new facesets to build.")
|
|
return
|
|
export_and_relocate(manifest, args.immich_cache, label)
|
|
print("\nDone.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|