Add Immich import pipeline (WSL stage + Windows DML embed + cluster)

Three-piece workflow that imports a self-hosted Immich library and emits
new facesets without disturbing existing identity numbering:

- work/immich_stage.py (WSL): pages /search/metadata, parallel-fetches
  /faces?id= per asset, prefilters by face_short>=90 against bbox scaled
  to original-image coords, downloads originals, sha256-dedups against
  nl_full.npz and same-run staged files. 8-worker ThreadPoolExecutor
  doing the full /faces->filter->/original chain per asset; resumable
  via state.json. API URL + key come from IMMICH_URL / IMMICH_API_KEY
  env vars, label->UUID map from work/immich/users.json (gitignored).
- work/embed_worker.py (Windows venv at C:\face_embed_venv): runs
  insightface.FaceAnalysis(buffalo_l) with the DmlExecutionProvider on
  AMD Radeon Vega via onnxruntime-directml. Produces a cache file in
  the same .npz schema as sort_faces.cmd_embed (loadable via
  load_cache). ~7.5x speedup over CPU end-to-end; embeddings bit-
  identical to CPU (cosine similarity 1.0000 across 8 sample faces).
- work/cluster_immich.py (WSL): mirrors cluster_osrc.py against an
  immich_<user>.npz. Builds existing identity centroids from canonical
  faceset_NNN/ in facesets_swap_ready/, drops matches at <=0.45,
  clusters the rest at 0.55, applies refine gates, hands off to
  cmd_export_swap. Numbers new facesets past the existing maximum.
- work/finalize_immich.sh: chains queue->Windows embed->cache copy->
  cluster_immich, with logging.

The 2026-04-26 run on https://fotos.computerliebe.org (Immich v2.7.2)
processed 53,842 admin-accessible assets, staged 10,261, embedded
19,462 face records on Vega DML in 64.6 min, matched 8,103 (42%) to
existing identities, and emitted 185 new facesets (faceset_026..264
with gaps). facesets_swap_ready/ went from 31 to 216 substantive
facesets.

Important caveat surfaced: /search/metadata's userIds filter is
silently ignored when the API key is bound to a different user, so
this run can't enumerate other users' libraries from the admin key.
A per-user API key would be required for nic.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-26 18:14:26 +02:00
parent 7ecbfae981
commit 321fed01cc
6 changed files with 1340 additions and 3 deletions

View File

@@ -204,6 +204,77 @@ existing identities), this produced 6 new facesets (`faceset_020..025`,
sizes 426 exported PNGs; the 7th candidate cluster lost all 6 faces to sizes 426 exported PNGs; the 7th candidate cluster lost all 6 faces to
export-swap's tighter `min_face_short=100` gate). export-swap's tighter `min_face_short=100` gate).
### Importing identities from a self-hosted Immich library
`work/immich_stage.py` + `work/embed_worker.py` + `work/cluster_immich.py`
together import an Immich library at scale, with the embed step running on
a Windows AMD GPU via DirectML and everything else on WSL. Three pieces:
1. **`work/immich_stage.py` (WSL)** — pages every IMAGE asset via
`/search/metadata`, fetches each asset's `/faces?id=` to read Immich's
own ML-driven bboxes, scales each bbox to original-image coordinates,
and prefilters by `face_short ≥ 90`. For survivors it downloads the
original, sha256-deduplicates against the canonical `nl_full.npz` and
against same-run staged files, and saves to
`/mnt/x/src/immich/<user>/<rel>`. Writes a `queue.json` that the embed
worker consumes. 8 concurrent worker threads run the full per-asset
I/O chain (`/faces` → filter → `/original`) so 8 workers ≈ 8× the
serial throughput.
2. **`work/embed_worker.py` (Windows venv at `C:\face_embed_venv\`)** —
loads `insightface.FaceAnalysis(buffalo_l)` with the
`DmlExecutionProvider` and runs detection + landmarks + recognition
over the queue. Produces a `.npz` cache that's bit-identical in
schema to what `sort_faces.py:cmd_embed` writes, so the result is
directly loadable by `load_cache()`. The cache already includes the
post-`enrich` fields (`landmark_2d_106`, `landmark_3d_68`, `pose`)
because FaceAnalysis returns them for free. AMD Vega gives ~7.5×
real-pipeline speedup over CPU.
3. **`work/cluster_immich.py` (WSL)** — mirrors `cluster_osrc.py`'s
shape but reads from `immich_<user>.npz`. Builds existing-identity
centroids from every canonical `faceset_NNN/` in
`facesets_swap_ready/` (skipping era splits and `_thin/`), drops
immich faces matching at cos-dist ≤ 0.45, clusters the rest at 0.55,
applies refine gates, numbers new facesets past the existing maximum,
and feeds `cmd_export_swap` via a synthetic manifest.
`work/finalize_immich.sh <user>` chains queue → Windows embed → cache
copy back → cluster_immich, with logging.
The Immich admin API key + base URL come from environment variables:
```bash
export IMMICH_URL=https://your-immich.example.com
export IMMICH_API_KEY=... # admin or per-user key
python work/immich_stage.py --user peter --workers 8
bash work/finalize_immich.sh peter
```
For the 2026-04-26 run against `https://fotos.computerliebe.org` (Immich
v2.7.2), with the admin API key:
| step | result |
|------|------|
| stage | 53,842 assets seen, **10,261 staged** (~10 GB), 978 byte-deduped against `nl_full.npz`, 2,976 internal byte-duplicates, 39K skipped no-face / no-big-face |
| Windows DML embed | 19,462 face records + 1 noface in **64.6 min** (2.6 img/s end-to-end) |
| matched existing identities | **8,103 of 19,480 (42%)** at cos-dist ≤ 0.45; biggest hits faceset_002 (+2,666), faceset_001 (+1,856), faceset_003 (+670) |
| new clusters | 2,534 at threshold 0.55 → 239 surviving refine gates → **185 emitted** as `faceset_026..264` (gaps where export-swap's tighter outlier filter dropped clusters below the export quality bar) |
**Important caveats for Immich v2.7.2**:
- The `userIds` filter on `/search/metadata` is **silently ignored** when
the API key is bound to a different user. The "import everything the
API key can see" semantics are what you actually get; cross-user
isolation is enforced server-side.
- `/server/statistics` reports counts that under-count what
`/search/metadata` actually returns (e.g. external library
thumbnail-dirs that got indexed because the import path included them).
Don't trust the statistics number as a denominator.
- A meaningful fraction of `originalPath`-based assets are *Immich's own
thumbnails* (`<library_root>/thumbs/.../-preview.jpeg`) — included if
the external library's import path covers the thumbs directory and the
exclusion patterns don't list `**/thumbs/**`. For our run, 5,563 of
10,261 staged were thumbnails. They embed and cluster fine but the
resulting faces are lower-resolution.
## Key defaults ## Key defaults
`refine`: `refine`:
@@ -248,15 +319,22 @@ Highly recommended at swap time: enable **Select post-processing = GFPGAN** with
├─ docs/ ├─ docs/
│ └─ analysis/ │ └─ analysis/
│ └─ facesets-downstream-refinement-evaluation.md │ └─ facesets-downstream-refinement-evaluation.md
└─ work/ (gitignored except force-tracked .py) └─ work/ (gitignored except force-tracked .py / .sh)
├─ build_folders.py (hand-sorted-folder orchestration) ├─ build_folders.py (hand-sorted-folder orchestration)
├─ check_faceset001_age.py (age-split readiness probe) ├─ check_faceset001_age.py (age-split readiness probe)
├─ age_split_001.py (age-split orchestration; faceset_001) ├─ age_split_001.py (age-split orchestration; faceset_001)
├─ cluster_osrc.py (mixed-bucket identity discovery) ├─ cluster_osrc.py (mixed-bucket identity discovery)
├─ synthetic_refine_manifest.json (last build_folders.py output) ├─ immich_stage.py (Immich library staging, parallel)
├─ synthetic_osrc_manifest.json (last cluster_osrc.py output) ├─ embed_worker.py (Windows DML embed worker, runs from C:\face_embed_venv\)
├─ cluster_immich.py (Immich identity discovery + export)
├─ finalize_immich.sh (chains queue → embed → cluster)
├─ synthetic_*_manifest.json (per-run synthetic refine manifests)
├─ immich/
│ ├─ users.json (label -> userId map; gitignored)
│ └─ <user>/{queue,state,aliases}.json (per-user staging artifacts)
├─ cache/ ├─ cache/
│ ├─ nl_full.npz (canonical cache + duplicates.json) │ ├─ nl_full.npz (canonical cache + duplicates.json)
│ ├─ immich_<user>.npz (per-user immich embeddings)
│ └─ age_split_exif.json (path → EXIF-year cache) │ └─ age_split_exif.json (path → EXIF-year cache)
└─ logs/ └─ logs/
└─ *.log (every long step writes here) └─ *.log (every long step writes here)

View File

@@ -0,0 +1,216 @@
# Importing identities from a self-hosted Immich library
_Run date: 2026-04-26. Target: Immich v2.7.2 at `https://fotos.computerliebe.org`.
Driver scripts: `work/immich_stage.py`, `work/embed_worker.py`,
`work/cluster_immich.py`, `work/finalize_immich.sh`._
## 1. Why a split workflow
InsightFace `buffalo_l` on the WSL CPU runs the full detection + landmarks +
recognition stack at ~34 faces/second. Re-detecting all 79K Immich photos
would have taken ~1028 days. The available AMD Radeon RX Vega is unusable
under WSL (no `/dev/dri/`, no ROCm), but **DirectML on Windows native**
runs the same models bit-identically and ~7.5× faster end-to-end. The
pipeline therefore splits:
- **WSL side** (`/opt/face-sets/`) — orchestration: API listing, download,
sha256 dedup, file management, clustering, faceset emission.
- **Windows side** (`C:\face_embed_venv\`) — the embed step only. A fresh
Python 3.12 (installed via `winget install Python.Python.3.12`) with
`numpy`, `pillow`, `opencv-python-headless`, `onnxruntime-directml`,
`insightface`. Models copied from `/home/peter/.insightface/models/buffalo_l/`
to `C:\face_embed_venv\models\buffalo_l\`.
A 30-iteration synthetic benchmark on Vega:
| model | DML | CPU | speedup |
|-------------|----:|----:|--------:|
| `det_10g.onnx` (640×640) | 10.0 ms | 183.5 ms | 18.4× |
| `w600k_r50.onnx` (112×112) | 8.2 ms | 90.5 ms | 11.0× |
End-to-end FaceAnalysis on 5 real Immich-sourced images (excluding the
first-call DML JIT warmup): ~7.5× speedup post-warmup. Per-face cosine
similarity DML vs CPU was 1.0000 across all 8 detected faces — DML is
bit-identical to CPU for arcface inference.
## 2. Architecture
```
┌─────────────────────────────────────────────┐
│ WSL /opt/face-sets/work/immich_stage.py │
│ ┌──────────────────────────────────────────┐│
│ │ ThreadPoolExecutor.map(_fetch_for_asset, ││
│ │ list_assets(user)) ││
│ │ ─ /faces?id= (Immich, parallel x8) ││
│ │ ─ filter face_short >= 90 ││
│ │ ─ /assets/.../original (parallel x8) ││
│ └──────────────────────────────────────────┘│
│ consumer (main thread): │
│ sha256 → dedup vs nl_full.npz │
│ save to /mnt/x/src/immich/<user>/<rel>/ │
│ append to queue.json │
└────────────────┬────────────────────────────┘
▼ queue.json (with WSL + Windows paths)
┌─────────────────────────────────────────────┐
│ Windows embed_worker.py (C:\face_embed_venv) │
│ insightface.FaceAnalysis( │
│ providers=[DmlExecutionProvider, ...]) │
│ per image: detection + landmarks + arcface │
│ emit cache in sort_faces.py:cmd_embed │
│ schema with embeddings + meta + processed │
│ + path_aliases + schema=v2 │
└────────────────┬────────────────────────────┘
▼ immich_<user>.npz
┌─────────────────────────────────────────────┐
│ WSL cluster_immich.py │
│ build centroids of canonical │
│ faceset_NNN/ in facesets_swap_ready/ │
│ drop matches at cos-dist <= 0.45 │
│ cluster the rest at 0.55 │
│ refine gates -> synthetic refine_manifest │
│ cmd_export_swap -> facesets_swap_ready/ │
│ merge top-level manifest │
└─────────────────────────────────────────────┘
```
Cache artifacts stay separate (per the architecture choice on this run):
each user's results live in their own `immich_<user>.npz`. A future
one-shot merge can fold them into `nl_full.npz` if needed; the existing
`extend` command would do the right thing once schemas align.
## 3. Path mapping
`/mnt/x/``X:\`. Cache stores WSL form (matching `nl_full.npz`'s
existing convention). `wsl_to_win()` translates for the embed worker
which runs natively on Windows.
`work/cluster_immich.py` always uses the canonical `facesets_swap_ready/`
view to build identity centroids — meaning the comparison is against the
*current* set of canonical facesets in the swap-ready directory (skipping
era splits and `_thin/`), not against the older `facesets_full/` snapshot.
## 4. Result of the 2026-04-26 run (peter / admin)
### 4a. Stage
```
total_assets_seen: 53842
staged_count: 10261 (~10 GB on /mnt/x/)
deduped_against_existing: 978 (sha256 in nl_full.npz already)
deduped_against_staged: 2976 (internal byte-dupes inside Immich)
skipped_no_big_face: 9539 (Immich detected only sub-90px faces)
skipped_no_faces: 29390 (Immich detected zero faces)
skipped_download_error: 698 (transient DNS / TLS, not seen-marked)
elapsed: ~70 min (6.4 assets/s end-to-end at 8 workers)
```
The 698 transient errors are recoverable on a re-run because
`immich_stage.py` does not add them to the `seen` set. Each transient
asset would be retried.
### 4b. Embed (Windows DML)
```
queue: 10261 entries
new face records: 19462
new noface records: 1
load errors: 125 (likely HEIC / unreadable)
elapsed: 3878.0s (64.6 min, 2.6 img/s end-to-end)
```
The 2.6 img/s end-to-end includes CIFS-share image load, image decode,
DML inference (~50 ms/face), and JSON / NPZ flushing. Pure DML inference
is faster; the rest of the pipeline dominates at scale.
### 4c. Cluster
```
existing canonical centroids: 25
faces already covered (cos-dist <= 0.45): 8103/19480 (42%)
faceset_001: 1856
faceset_002: 2666
faceset_003: 670
faceset_004: 48
faceset_005: 40
... (smaller hits to the remaining 20)
unmatched faces to cluster: 11377
clusters at threshold 0.55: 2534 (top sizes [469, 444, 342, 338, 262, ...])
survived refine gates: 239
emitted as new facesets: 185 (54 dropped by export-swap's 0.45 outlier)
```
Top-level `facesets_swap_ready/manifest.json` after this run: **216
facesets** (up from 31; ~7× growth) + 68 thin_eras under `_thin/`.
## 5. Surprises and caveats
### 5a. `/search/metadata`'s `userIds` filter is silently ignored (Immich v2.7.2)
When the admin API key is used, passing `userIds=[<other-user-uuid>]`
returns admin's own assets, not the other user's. The filter is
silently dropped. Verified by sampling 200 returned items and
confirming `ownerId` was admin for all of them.
To process another user's library, **a separate API key issued by that
user is required** — the admin key cannot enumerate cross-user
libraries through any documented endpoint we tried. `/timeline/buckets`
with a `userId` query parameter returns
`Not found or no timeline.read access`.
### 5b. `/server/statistics` undercounts what the search returns
`/server/statistics` reported admin = 53,842 photos. Our
`/search/metadata` paginated through... **53,842** top-level. So the
header agrees with the body in this case. But `/server/statistics` does
NOT count items that live under external libraries' import paths —
yet `/search/metadata` does include them. For this Immich, two external
libraries (`/mnt/media/photos` and `/mnt/media/omv_photos`) are
configured but `/libraries` reports `assetCount=0` for both. Yet 80% of
our staged paths come from those library import paths. Don't trust
statistics-vs-search consistency.
### 5c. Indexed Immich thumbnails masquerading as assets
5,563 of our 10,261 staged paths are `<library>/thumbs/.../-preview.jpeg`
— Immich's own internally-generated thumbnails got indexed because the
external library import path included the thumbs subdirectory and the
exclusion patterns didn't list `**/thumbs/**`. They embed and cluster
fine but produce lower-resolution face records. The fix on the Immich
side is adding `**/thumbs/**` to the exclusion patterns.
### 5d. Internal byte-duplicates (2,976)
Many Immich assets are byte-identical to other Immich assets — typically
because the same photo was uploaded both from a phone and from a
synced cloud folder. sha256 dedup catches all of these on the second
download (we still pay the bandwidth, but skip the disk write and
embed work). With Immich v2.7.2's own `assets/duplicates` endpoint we
could catch this earlier, but it's not currently used.
## 6. Re-running and applying to other Immich instances
```bash
export IMMICH_URL=https://your-immich.example.com
export IMMICH_API_KEY=... # admin or per-user key
# Optional: populate work/immich/users.json with label -> UUID map.
# 1. Stage (parallel /faces + downloads, resumable).
python work/immich_stage.py --user peter --workers 8
# 2. End-to-end finalize: copy queue to /mnt/c/, run Windows embed worker,
# copy the cache back, run cluster_immich.py.
bash work/finalize_immich.sh peter
```
For a different Immich instance, the only configuration is the env vars
and the `users.json` sidecar. `cluster_immich.py`'s tunables (matching
threshold, clustering threshold, refine gates, MIN_FACES) are at the
top of the script.
To process a *second* user's library, issue a per-user API key in the
Immich admin UI for that user, set `IMMICH_API_KEY` to that key, and
re-run with their `--user <label>`. The admin key cannot impersonate
other users via the search API.

340
work/cluster_immich.py Normal file
View File

@@ -0,0 +1,340 @@
#!/usr/bin/env python3
"""Discover new identities in an Immich-sourced cache and emit them as facesets.
Mirrors `work/cluster_osrc.py`, but the source corpus is an arbitrary
Immich user's `immich_<user>.npz` cache produced by the Windows DML embed
worker. Existing identity centroids come from the union of every faceset
already in `facesets_swap_ready/` (faceset_001..NNN, both auto-clustered
and hand-sorted).
Pipeline:
1. Load immich_<user>.npz; restrict to face records (drop noface).
2. Build centroids of every existing canonical faceset in
facesets_swap_ready/ (skip era splits and _thin/).
3. Drop immich faces whose nearest existing centroid is within
EXISTING_MATCH_THRESHOLD; those are already covered by the canonical set.
4. Cluster the remaining among themselves at INITIAL_THRESHOLD.
5. Per cluster: refine-equivalent gates (face_short, blur, det_score),
plus outlier rejection at OUTLIER_THRESHOLD for clusters of size >= 4.
6. Keep clusters whose surviving unique source-path count is >= MIN_FACES.
7. Number kept clusters past the existing facesets_swap_ready/ max.
8. Synthesize a refine_manifest, hand off to cmd_export_swap, move dirs into
facesets_swap_ready/, drop a provenance marker, append to top-level
manifest.json (preserving facesets / thin_eras).
"""
from __future__ import annotations
import argparse
import json
import shutil
import sys
from pathlib import Path
import numpy as np
REPO = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO))
from sort_faces import ( # noqa: E402
_cluster_embeddings,
cmd_export_swap,
load_cache,
)
# ---- config -------------------------------------------------------------- #
REPO_WORK = REPO / "work"
SWAP_READY = Path("/mnt/e/temp_things/fcswp/nl_sorted/facesets_swap_ready")
EXISTING_MATCH_THRESHOLD = 0.45
INITIAL_THRESHOLD = 0.55
MIN_FACES = 6
MIN_SHORT = 90
MIN_BLUR = 40.0
MIN_DET_SCORE = 0.6
OUTLIER_THRESHOLD = 0.55
TOP_N = 30
EXPORT_OUTLIER_THRESHOLD = 0.45
PAD_RATIO = 0.5
OUT_SIZE = 512
EXPORT_MIN_FACE_SHORT = 100
# ---- helpers ------------------------------------------------------------- #
def _normalize(v: np.ndarray) -> np.ndarray:
n = np.linalg.norm(v)
return v / n if n > 0 else v
def _existing_identity_centroids(
nl_cache: Path,
) -> tuple[np.ndarray, list[str]]:
"""Build identity centroids from every canonical faceset_NNN/ in
facesets_swap_ready/. Era-split sub-dirs (faceset_001_<era>) and the
_thin/ quarantine are skipped. Each faceset's manifest.json provides
(source, bbox) keys we use to look up rows in nl_full.npz."""
emb, meta, _src, _proc, _aliases = load_cache(nl_cache)
face_records = [m for m in meta if not m.get("noface")]
if len(face_records) != len(emb):
raise SystemExit(f"meta/embedding mismatch in {nl_cache}: {len(face_records)} vs {len(emb)}")
bbox_idx = {(m["path"], tuple(m.get("bbox") or ())): i for i, m in enumerate(face_records)}
centroids: list[np.ndarray] = []
names: list[str] = []
for d in sorted(SWAP_READY.iterdir()):
if not d.is_dir():
continue
if d.name.startswith("_"):
continue
# Skip era-split sub-facesets (faceset_NNN_*).
if d.name.startswith("faceset_") and "_" in d.name[len("faceset_"):]:
continue
man = d / "manifest.json"
if not man.exists():
continue
try:
entries = json.loads(man.read_text()).get("faces", [])
except Exception:
continue
keys = [(f["source"], tuple(f.get("bbox") or ())) for f in entries]
idxs = [bbox_idx[k] for k in keys if k in bbox_idx]
if not idxs:
continue
centroids.append(_normalize(emb[idxs].mean(axis=0)))
names.append(d.name)
if not centroids:
raise SystemExit("no canonical identity centroids could be built; check facesets_swap_ready/")
return np.stack(centroids), names
def _next_faceset_number() -> int:
nums = []
for d in SWAP_READY.iterdir():
if not d.is_dir() or not d.name.startswith("faceset_"):
continue
tail = d.name[len("faceset_"):]
# Take only top-level numbered facesets (no era suffix).
if "_" in tail:
continue
try:
nums.append(int(tail))
except ValueError:
continue
return (max(nums) + 1) if nums else 1
# ---- phase 1: discover --------------------------------------------------- #
def discover_new_clusters(
immich_cache: Path, nl_cache: Path, start_nnn: int, source_label: str
) -> tuple[dict, list[dict]]:
print(f"loading immich cache: {immich_cache}")
emb, meta, _src, _proc, _aliases = load_cache(immich_cache)
face_records = [m for m in meta if not m.get("noface")]
if len(face_records) != len(emb):
raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}")
print(f" {len(face_records)} face records, {sum(1 for m in meta if m.get('noface'))} noface")
print(f"building existing-identity centroids from {SWAP_READY}")
cents, cent_names = _existing_identity_centroids(nl_cache)
print(f" {len(cent_names)} canonical centroids")
sims = emb @ cents.T
nearest_d = 1.0 - sims.max(axis=1)
nearest_id = sims.argmax(axis=1)
covered = nearest_d <= EXISTING_MATCH_THRESHOLD
print(f"\nfaces already covered (cos-dist <= {EXISTING_MATCH_THRESHOLD}): "
f"{int(covered.sum())}/{len(emb)}")
for j, name in enumerate(cent_names):
c = int(((nearest_id == j) & covered).sum())
if c:
print(f" -> {name}: {c}")
new_idx = [i for i in range(len(emb)) if not covered[i]]
print(f"\nunmatched immich faces to cluster: {len(new_idx)}")
if len(new_idx) <= 1:
labels = np.zeros(len(new_idx), dtype=int)
else:
labels = _cluster_embeddings(emb[new_idx], INITIAL_THRESHOLD)
n_clusters = len(set(int(l) for l in labels))
sizes = sorted([int((labels == l).sum()) for l in set(labels)], reverse=True)
print(f"clusters at threshold {INITIAL_THRESHOLD}: {n_clusters} "
f"top sizes: {sizes[:10]}")
clusters: dict[int, list[int]] = {}
for k, lab in enumerate(labels):
clusters.setdefault(int(lab), []).append(new_idx[k])
kept: list[dict] = []
drop_quality_total = 0
drop_outlier_total = 0
for cid, idxs in clusters.items():
good: list[int] = []
for i in idxs:
r = face_records[i]
if r.get("face_short", 0) < MIN_SHORT:
drop_quality_total += 1; continue
if r.get("blur", 0.0) < MIN_BLUR:
drop_quality_total += 1; continue
if r.get("det_score", 0.0) < MIN_DET_SCORE:
drop_quality_total += 1; continue
good.append(i)
if not good:
continue
if len(good) >= 4:
cent = _normalize(emb[good].mean(axis=0))
d = 1.0 - emb[good] @ cent
tight = [good[k] for k, dist in enumerate(d) if dist <= OUTLIER_THRESHOLD]
drop_outlier_total += len(good) - len(tight)
good = tight
if not good:
continue
unique_paths = sorted({face_records[i]["path"] for i in good})
if len(unique_paths) < MIN_FACES:
continue
kept.append({
"indices": good,
"unique_paths": unique_paths,
"size_face": len(good),
"size_paths": len(unique_paths),
})
kept.sort(key=lambda c: -c["size_paths"])
print(f"\nafter quality+outlier+min_faces: {len(kept)} clusters kept "
f"(dropped: quality={drop_quality_total} outlier={drop_outlier_total})")
for rank, c in enumerate(kept, start=start_nnn):
print(f" faceset_{rank:03d}: faces={c['size_face']:3d} "
f"unique_paths={c['size_paths']:3d}")
facesets = [
{
"name": f"faceset_{rank:03d}",
"image_count": c["size_paths"],
"face_count": c["size_face"],
"images": c["unique_paths"],
}
for rank, c in enumerate(kept, start=start_nnn)
]
manifest = {
"params": {
"existing_match_threshold": EXISTING_MATCH_THRESHOLD,
"initial_threshold": INITIAL_THRESHOLD,
"outlier_threshold": OUTLIER_THRESHOLD,
"min_faces": MIN_FACES,
"min_short": MIN_SHORT,
"min_blur": MIN_BLUR,
"min_det_score": MIN_DET_SCORE,
"source_label": source_label,
"source_cache": str(immich_cache),
},
"facesets": facesets,
}
return manifest, kept
# ---- phase 2: export + relocate ----------------------------------------- #
def export_and_relocate(manifest: dict, immich_cache: Path, source_label: str) -> None:
synth_path = REPO_WORK / f"synthetic_{source_label}_manifest.json"
synth_path.write_text(json.dumps(manifest, indent=2))
print(f"\nsynthetic manifest -> {synth_path}")
out_tmp = SWAP_READY.parent / f"facesets_swap_ready_{source_label}_new"
if out_tmp.exists():
shutil.rmtree(out_tmp)
out_tmp.mkdir(parents=True)
print(f"running cmd_export_swap -> {out_tmp}")
cmd_export_swap(
cache_path=immich_cache,
refine_manifest_path=synth_path,
raw_manifest_path=None,
out_dir=out_tmp,
top_n=TOP_N,
outlier_threshold=EXPORT_OUTLIER_THRESHOLD,
pad_ratio=PAD_RATIO,
out_size=OUT_SIZE,
include_candidates=False,
candidate_match_threshold=0.55,
candidate_min_score=0.40,
min_face_short=EXPORT_MIN_FACE_SHORT,
)
new_top = json.loads((out_tmp / "manifest.json").read_text())
new_entries = new_top.get("facesets", [])
moved = 0
for fs_meta in new_entries:
name = fs_meta["name"]
src_dir = out_tmp / name
if not src_dir.exists():
print(f"[{name}] export dir missing; skipping")
continue
dst_dir = SWAP_READY / name
if dst_dir.exists():
print(f"[{name}] {dst_dir} already exists; refusing to overwrite")
continue
(src_dir / f"immich_{source_label}.txt").write_text(
f"{name}\n\nSource: Immich user {source_label} cluster (auto-discovered).\n"
)
shutil.move(str(src_dir), str(dst_dir))
moved += 1
print(f"[{name}] -> {dst_dir}")
final_manifest_path = SWAP_READY / "manifest.json"
if final_manifest_path.exists():
existing = json.loads(final_manifest_path.read_text())
else:
existing = {"facesets": []}
existing.setdefault("facesets", [])
existing_names = {fs["name"] for fs in existing["facesets"]}
appended = 0
for entry in new_entries:
if entry["name"] in existing_names:
print(f"[manifest] {entry['name']} already present; not duplicating")
continue
existing["facesets"].append(entry)
appended += 1
final_manifest_path.write_text(json.dumps(existing, indent=2))
print(f"\nmerged manifest: appended {appended} entries -> {final_manifest_path}")
print(f"moved {moved} faceset directories into {SWAP_READY}")
if out_tmp.exists() and not list(out_tmp.iterdir()):
out_tmp.rmdir()
# ---- main ---------------------------------------------------------------- #
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument("immich_cache", type=Path,
help="path to immich_<user>.npz produced by the embed worker")
p.add_argument("--nl-cache", type=Path, default=REPO_WORK / "cache" / "nl_full.npz",
help="canonical cache for existing identity centroids")
p.add_argument("--source-label", default=None,
help="short label used in marker filenames; default = stem of immich_cache")
p.add_argument("--start-nnn", type=int, default=None,
help="first faceset number to assign; default = current max+1 in facesets_swap_ready/")
p.add_argument("--dry-run", action="store_true")
args = p.parse_args()
label = args.source_label or args.immich_cache.stem.removeprefix("immich_") or args.immich_cache.stem
start_nnn = args.start_nnn if args.start_nnn is not None else _next_faceset_number()
print(f"source label: {label!r}; first faceset number: {start_nnn:03d}")
manifest, kept = discover_new_clusters(args.immich_cache, args.nl_cache, start_nnn, label)
if args.dry_run:
print("\n--dry-run: stopping after cluster discovery (no exports written).")
return
if not manifest.get("facesets"):
print("no new facesets to build.")
return
export_and_relocate(manifest, args.immich_cache, label)
print("\nDone.")
if __name__ == "__main__":
main()

244
work/embed_worker.py Executable file
View File

@@ -0,0 +1,244 @@
"""Windows / DirectML embed worker.
Reads a queue.json staged by /opt/face-sets/work/immich_stage.py (WSL side),
runs InsightFace's FaceAnalysis on each image with the DmlExecutionProvider
backed by the AMD Vega, and writes a cache file in the schema produced by
sort_faces.py:cmd_embed (so it can be merged into nl_full.npz).
CLI:
py -3.12 embed_worker.py <queue.json> <out_cache.npz> [--limit N]
The queue.json entry shape (each item) is:
{
"asset_id": "...",
"sha256": "...",
"wsl_path": "/mnt/x/src/immich/<user>/<rel>", # canonical path stored
"win_path": "X:\\src\\immich\\<user>\\<rel>", # what we read from
"size_bytes": int,
"width": int, "height": int,
...
}
Per face record matches cmd_embed's schema:
path, face_idx, det_score, bbox, face_short, face_area, blur, noface=False, hash
plus landmark_2d_106, landmark_3d_68, pose (FaceAnalysis returns these for
free and the existing cache already carries them after `enrich`).
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from pathlib import Path
import numpy as np
from PIL import Image, ImageOps
from insightface.app import FaceAnalysis
MODEL_ROOT = r"C:\face_embed_venv\models"
MIN_DET_SCORE = 0.5
MIN_FACE_PIX = 40
FLUSH_EVERY = 50
def load_rgb_bgr(path: Path):
try:
with Image.open(path) as im:
im = ImageOps.exif_transpose(im)
im = im.convert("RGB")
rgb = np.array(im)
bgr = rgb[:, :, ::-1].copy()
return rgb, bgr
except Exception as e:
print(f"[warn] failed to load {path}: {e}", file=sys.stderr)
return None, None
def laplacian_variance(gray: np.ndarray) -> float:
g = gray.astype(np.float32)
lap = (
-4.0 * g[1:-1, 1:-1]
+ g[:-2, 1:-1] + g[2:, 1:-1]
+ g[1:-1, :-2] + g[1:-1, 2:]
)
return float(lap.var())
def save_cache(out_path: Path, emb_chunks: list, meta: list, processed: set, src_root: str):
emb = np.concatenate(emb_chunks) if emb_chunks else np.zeros((0, 512), dtype=np.float32)
tmp = out_path.with_suffix(".tmp.npz")
np.savez(
str(tmp),
embeddings=emb,
meta=json.dumps(meta),
src_root=str(src_root),
processed_paths=json.dumps(sorted(processed)),
path_aliases=json.dumps({}),
schema="v2",
)
os.replace(tmp, out_path)
def load_cache_if_exists(out_path: Path):
"""Resume helper. Returns (emb_chunks, meta, processed_set)."""
if not out_path.exists():
return [], [], set()
data = np.load(out_path, allow_pickle=True)
emb = data["embeddings"]
meta = json.loads(str(data["meta"]))
processed = set(json.loads(str(data["processed_paths"])))
return [emb] if len(emb) else [], list(meta), processed
def main():
p = argparse.ArgumentParser()
p.add_argument("queue", type=Path)
p.add_argument("out", type=Path)
p.add_argument("--limit", type=int, default=None)
args = p.parse_args()
queue = json.loads(args.queue.read_text())
print(f"queue: {len(queue)} entries from {args.queue}")
args.out.parent.mkdir(parents=True, exist_ok=True)
emb_chunks, meta, processed = load_cache_if_exists(args.out)
n_existing_records = len(meta)
n_existing_emb = sum(e.shape[0] for e in emb_chunks)
if n_existing_records:
print(f"resume: {n_existing_records} existing meta records "
f"({n_existing_emb} embeddings, {len(processed)} processed paths)")
print("initializing FaceAnalysis with DmlExecutionProvider")
app = FaceAnalysis(
name="buffalo_l",
root=MODEL_ROOT,
providers=["DmlExecutionProvider", "CPUExecutionProvider"],
)
app.prepare(ctx_id=0, det_size=(640, 640))
src_root = "/mnt/x/src/immich"
n_done = 0
n_face_records_added = 0
n_noface_added = 0
n_skipped = 0
n_load_err = 0
t0 = time.perf_counter()
last_flush = time.perf_counter()
new_emb_chunks: list[np.ndarray] = []
new_meta: list[dict] = []
def flush():
nonlocal new_emb_chunks, new_meta, last_flush
if not new_emb_chunks and not new_meta:
return
if new_emb_chunks:
emb_chunks.append(np.concatenate(new_emb_chunks))
new_emb_chunks = []
for r in new_meta:
meta.append(r)
new_meta = []
save_cache(args.out, emb_chunks, meta, processed, src_root)
last_flush = time.perf_counter()
for i, entry in enumerate(queue):
if args.limit is not None and n_done >= args.limit:
break
wsl_path = entry["wsl_path"]
win_path = entry["win_path"]
sha = entry["sha256"]
if wsl_path in processed:
n_skipped += 1
continue
rgb, bgr = load_rgb_bgr(Path(win_path))
if bgr is None:
new_meta.append({
"path": wsl_path, "face_idx": -1, "noface": True,
"hash": sha, "error": "load",
})
processed.add(wsl_path)
n_load_err += 1
n_done += 1
continue
faces = app.get(bgr)
kept_any = False
for j, f in enumerate(faces):
if float(f.det_score) < MIN_DET_SCORE:
continue
x1, y1, x2, y2 = [int(round(v)) for v in f.bbox]
x1 = max(x1, 0); y1 = max(y1, 0)
x2 = min(x2, rgb.shape[1]); y2 = min(y2, rgb.shape[0])
w, h = x2 - x1, y2 - y1
short = min(w, h)
if short < MIN_FACE_PIX:
continue
crop = rgb[y1:y2, x1:x2]
if crop.size == 0:
continue
gray = crop.mean(axis=2)
blur = laplacian_variance(gray) if min(gray.shape) > 3 else 0.0
emb = f.normed_embedding.astype(np.float32)
new_emb_chunks.append(emb[None, :])
rec = {
"path": wsl_path,
"face_idx": j,
"det_score": float(f.det_score),
"bbox": [x1, y1, x2, y2],
"face_short": int(short),
"face_area": int(w * h),
"blur": blur,
"noface": False,
"hash": sha,
}
# Enrichment-equivalent fields (FaceAnalysis returns these for free)
if hasattr(f, "landmark_2d_106") and f.landmark_2d_106 is not None:
rec["landmark_2d_106"] = f.landmark_2d_106.astype(np.float32).tolist()
if hasattr(f, "landmark_3d_68") and f.landmark_3d_68 is not None:
rec["landmark_3d_68"] = f.landmark_3d_68.astype(np.float32).tolist()
if hasattr(f, "pose") and f.pose is not None:
rec["pose"] = [float(x) for x in f.pose]
new_meta.append(rec)
kept_any = True
n_face_records_added += 1
if not kept_any:
new_meta.append({
"path": wsl_path, "face_idx": -1, "noface": True, "hash": sha,
})
n_noface_added += 1
processed.add(wsl_path)
n_done += 1
if (n_done % FLUSH_EVERY == 0) or (time.perf_counter() - last_flush) > 30.0:
flush()
elapsed = time.perf_counter() - t0
rate = n_done / max(0.1, elapsed)
print(
f"[embed] done={n_done:5d}/{len(queue)} faces+={n_face_records_added:5d} "
f"noface+={n_noface_added:4d} skipped={n_skipped:4d} "
f"load_err={n_load_err:3d} rate={rate:.1f} img/s "
f"({elapsed:.1f}s elapsed)"
)
flush()
elapsed = time.perf_counter() - t0
print()
print("=== embed done ===")
print(f" done: {n_done}")
print(f" new face records: {n_face_records_added}")
print(f" new noface records: {n_noface_added}")
print(f" skipped (already done): {n_skipped}")
print(f" load errors: {n_load_err}")
print(f" elapsed: {elapsed:.1f}s ({n_done/max(0.1,elapsed):.1f} img/s)")
print(f" cache: {args.out}")
if __name__ == "__main__":
main()

50
work/finalize_immich.sh Executable file
View File

@@ -0,0 +1,50 @@
#!/usr/bin/env bash
# Finalize an Immich user's stage:
# 1. Copy queue.json to /mnt/c so the Windows embed worker can read it
# 2. Run the embed worker on Windows (DML)
# 3. Copy the resulting cache back to /opt/face-sets/work/cache/
# 4. Run cluster_immich.py to discover + emit new facesets
#
# Usage: ./work/finalize_immich.sh <user-label>
set -euo pipefail
USER_LABEL="${1:?usage: $0 <user-label>}"
REPO="$(cd "$(dirname "$0")/.." && pwd)"
WSL_QUEUE="$REPO/work/immich/$USER_LABEL/queue.json"
WIN_QUEUE_DIR="/mnt/c/face_embed_venv/work/immich/$USER_LABEL"
WIN_QUEUE="$WIN_QUEUE_DIR/queue.json"
WIN_QUEUE_FOR_PS="C:\\face_embed_venv\\work\\immich\\$USER_LABEL\\queue.json"
WIN_CACHE_DIR="/mnt/c/face_embed_venv/work/cache"
WIN_CACHE="$WIN_CACHE_DIR/immich_${USER_LABEL}.npz"
WIN_CACHE_FOR_PS="C:\\face_embed_venv\\work\\cache\\immich_${USER_LABEL}.npz"
WSL_CACHE="$REPO/work/cache/immich_${USER_LABEL}.npz"
LOG="$REPO/work/logs/immich_finalize_${USER_LABEL}.log"
[ -f "$WSL_QUEUE" ] || { echo "missing queue: $WSL_QUEUE" >&2; exit 1; }
echo "=== finalize: $USER_LABEL ===" | tee -a "$LOG"
date | tee -a "$LOG"
mkdir -p "$WIN_QUEUE_DIR" "$WIN_CACHE_DIR" "$REPO/work/cache"
echo "[1/4] copying queue: $WSL_QUEUE -> $WIN_QUEUE" | tee -a "$LOG"
cp "$WSL_QUEUE" "$WIN_QUEUE"
echo " $(wc -c < "$WIN_QUEUE") bytes; $(/home/peter/face_sort_env/bin/python3 -c "import json,sys; print(len(json.load(open('$WIN_QUEUE'))))") entries"
echo "[2/4] running Windows DML embed worker" | tee -a "$LOG"
powershell.exe -NoProfile -Command "C:\\face_embed_venv\\Scripts\\python.exe C:\\face_embed_venv\\bench\\embed_worker.py '$WIN_QUEUE_FOR_PS' '$WIN_CACHE_FOR_PS'" 2>&1 | tee -a "$LOG"
[ -f "$WIN_CACHE" ] || { echo "embed produced no cache file at $WIN_CACHE" | tee -a "$LOG"; exit 1; }
echo "[3/4] copying cache back: $WIN_CACHE -> $WSL_CACHE" | tee -a "$LOG"
cp "$WIN_CACHE" "$WSL_CACHE"
echo " $(/home/peter/face_sort_env/bin/python3 -c "import sys,json; sys.path.insert(0,'$REPO'); from sort_faces import load_cache; e,m,_,_,_=load_cache('$WSL_CACHE'); print(f'{len(e)} embeddings, {sum(1 for x in m if x.get(\"noface\"))} noface, {sum(1 for x in m if not x.get(\"noface\"))} faces')")"
echo "[4/4] running cluster_immich.py" | tee -a "$LOG"
/home/peter/face_sort_env/bin/python3 "$REPO/work/cluster_immich.py" "$WSL_CACHE" 2>&1 | tee -a "$LOG"
echo "=== finalize done: $USER_LABEL ===" | tee -a "$LOG"
date | tee -a "$LOG"

409
work/immich_stage.py Normal file
View File

@@ -0,0 +1,409 @@
#!/usr/bin/env python3
"""Stage Immich assets for embedding (WSL side of the split workflow).
For one Immich user:
1. Page through `/search/metadata` listing every IMAGE asset the user owns.
2. For each asset, fetch `/faces?id=` and decide if any detected face has a
scaled short side >= MIN_FACE_SHORT on the original. Skip assets that
don't.
3. Download the original. Compute sha256.
4. Dedup against (a) the existing canonical cache `nl_full.npz` and
(b) sha256s already staged in this run / earlier runs. If duplicate,
do NOT save to disk; record the alias.
5. Save survivors to /mnt/x/src/immich/<user>/<rel> mirroring the structure
after Immich's `/upload/library/<owner>/` prefix.
6. Write a queue file with WSL + Windows paths so the Windows DML embed
worker can find them.
7. Persist staging state continuously so the run is resumable.
Output artifacts:
work/immich/<user>/queue.json - what the Windows worker should embed
work/immich/<user>/state.json - resume state
work/immich/<user>/aliases.json - asset_id -> existing canonical path
when sha256 matched something already
in nl_full.npz
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import sys
import time
import urllib.error
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
import numpy as np
REPO = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO))
from sort_faces import load_cache # noqa: E402
# ---- config -------------------------------------------------------------- #
API = os.environ.get("IMMICH_URL", "").rstrip("/") + "/api" if os.environ.get("IMMICH_URL") else None
KEY = os.environ.get("IMMICH_API_KEY")
if not API or not KEY:
raise SystemExit(
"set IMMICH_URL and IMMICH_API_KEY env vars before running, e.g.\n"
" export IMMICH_URL=https://fotos.example.org\n"
" export IMMICH_API_KEY=... # admin API key"
)
HEADERS = {"x-api-key": KEY, "Accept": "application/json"}
# Short-label -> Immich userId. The user is responsible for filling this in for
# their own Immich instance. NOTE: as of Immich v2.7.2, /search/metadata's
# `userIds` filter is silently ignored when the API key is bound to a different
# user, so changing this label/UUID does not actually change which assets the
# API returns; we keep it here for naming output dirs and as future-proofing.
USERS_FILE = REPO / "work" / "immich" / "users.json"
USERS: dict[str, str] = {}
if USERS_FILE.exists():
USERS = json.loads(USERS_FILE.read_text())
CACHE_PATH = REPO / "work" / "cache" / "nl_full.npz" # for sha256 dedup
STAGE_DIR = REPO / "work" / "immich"
DEST_ROOT = Path("/mnt/x/src/immich")
WIN_DEST_ROOT = "X:\\src\\immich" # equivalent on the Windows side
PAGE_SIZE = 1000
MIN_FACE_SHORT = 90 # match refine's gate
MIN_DET_SCORE = 0.5 # weaker than refine's 0.6, since Immich's score scale differs
HTTP_TIMEOUT = 60 # seconds, conservative for big originals
HTTP_RETRIES = 3
HTTP_BACKOFF = 2.0
# ---- helpers ------------------------------------------------------------- #
def http_get(url: str, accept_bytes: bool = False) -> bytes | dict:
"""GET with retries. Returns parsed JSON unless accept_bytes is True."""
last_err = None
for attempt in range(HTTP_RETRIES):
try:
req = urllib.request.Request(url, headers=HEADERS)
with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT) as resp:
data = resp.read()
return data if accept_bytes else json.loads(data)
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError) as e:
last_err = e
if attempt + 1 < HTTP_RETRIES:
time.sleep(HTTP_BACKOFF * (attempt + 1))
raise RuntimeError(f"GET {url} failed after {HTTP_RETRIES} attempts: {last_err}")
def http_post(url: str, payload: dict) -> dict:
last_err = None
body = json.dumps(payload).encode("utf-8")
for attempt in range(HTTP_RETRIES):
try:
req = urllib.request.Request(
url, data=body, headers={**HEADERS, "Content-Type": "application/json"}
)
with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT) as resp:
return json.loads(resp.read())
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError) as e:
last_err = e
if attempt + 1 < HTTP_RETRIES:
time.sleep(HTTP_BACKOFF * (attempt + 1))
raise RuntimeError(f"POST {url} failed after {HTTP_RETRIES} attempts: {last_err}")
def sha256_bytes(b: bytes) -> str:
return hashlib.sha256(b).hexdigest()
def derive_relpath(original_path: str) -> str:
"""Return a relative subpath rooted at the user dir, mirroring Immich.
/usr/src/app/upload/library/admin/2026/2026-02-18/foo.jpg
-> 2026/2026-02-18/foo.jpg
Anything that doesn't match the expected prefix falls back to the basename
only.
"""
marker = "/upload/library/"
i = original_path.find(marker)
if i < 0:
return Path(original_path).name
rest = original_path[i + len(marker):]
parts = rest.split("/", 1)
return parts[1] if len(parts) == 2 else parts[0]
def wsl_to_win(p: Path) -> str:
"""Convert /mnt/x/.. -> X:\\.. for the embed worker that runs on Windows."""
s = str(p)
if s.startswith("/mnt/"):
drive = s[5]
rest = s[6:].lstrip("/")
return f"{drive.upper()}:\\{rest.replace('/', chr(92))}"
if s.startswith("/opt/face-sets/"):
# /opt/face-sets/work/... is on the WSL ext4 filesystem; reachable from
# Windows as \\wsl$\Ubuntu\opt\face-sets\... (slower than C:). For our
# use we keep all stage outputs under /mnt/x or /mnt/c so this branch
# should not be hit, but fall back rather than fail.
return f"\\\\wsl$\\Ubuntu\\opt\\face-sets\\{s[len('/opt/face-sets/'):].replace('/', chr(92))}"
return s
def keep_asset(asset: dict, faces: list) -> tuple[bool, list[dict]]:
"""Return (keep, eligible_face_records). A face is 'eligible' iff its
scaled-to-original short side >= MIN_FACE_SHORT and source-type is
machine-learning."""
W, H = asset.get("width"), asset.get("height")
if not W or not H:
return False, []
eligible = []
for f in faces:
if f.get("sourceType") and f["sourceType"] != "machine-learning":
continue
iw = f.get("imageWidth") or W
ih = f.get("imageHeight") or H
sx = (W / iw) if iw else 1.0
sy = (H / ih) if ih else 1.0
bw = (f["boundingBoxX2"] - f["boundingBoxX1"]) * sx
bh = (f["boundingBoxY2"] - f["boundingBoxY1"]) * sy
if min(bw, bh) >= MIN_FACE_SHORT:
eligible.append({
"id": f["id"],
"x1": int(round(f["boundingBoxX1"] * sx)),
"y1": int(round(f["boundingBoxY1"] * sy)),
"x2": int(round(f["boundingBoxX2"] * sx)),
"y2": int(round(f["boundingBoxY2"] * sy)),
"person": (f.get("person") or {}).get("name") or None,
})
return (len(eligible) > 0), eligible
# ---- main staging loop --------------------------------------------------- #
def list_assets(user_id: str):
"""Yield every IMAGE asset owned by user_id, paginated."""
page = 1
while True:
resp = http_post(f"{API}/search/metadata", {
"size": PAGE_SIZE,
"type": "IMAGE",
"page": page,
"userIds": [user_id],
})
items = resp["assets"]["items"]
if not items:
return
for a in items:
yield a
nxt = resp["assets"].get("nextPage")
if not nxt:
return
page = int(nxt)
def stage(user_label: str, limit: int | None, workers: int) -> None:
user_id = USERS[user_label]
user_dir = STAGE_DIR / user_label
user_dir.mkdir(parents=True, exist_ok=True)
state_path = user_dir / "state.json"
queue_path = user_dir / "queue.json"
aliases_path = user_dir / "aliases.json"
# ---- load existing state for resume ---- #
state = {
"started_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
"user_label": user_label,
"user_id": user_id,
"seen_asset_ids": [],
"staged_count": 0,
"deduped_against_existing": 0,
"deduped_against_staged": 0,
"skipped_no_big_face": 0,
"skipped_no_faces": 0,
"skipped_download_error": 0,
"total_assets_seen": 0,
}
queue: list[dict] = []
aliases: dict[str, dict] = {} # asset_id -> {sha, canonical_path}
staged_hashes: set[str] = set()
if state_path.exists():
prior = json.loads(state_path.read_text())
state.update(prior)
state["resumed_at"] = time.strftime("%Y-%m-%dT%H:%M:%S")
if queue_path.exists():
queue = json.loads(queue_path.read_text())
staged_hashes = {q["sha256"] for q in queue}
if aliases_path.exists():
aliases = json.loads(aliases_path.read_text())
print(f"[resume] {len(state['seen_asset_ids'])} asset_ids already seen, "
f"{len(queue)} in queue, {len(aliases)} aliased to existing cache")
seen = set(state["seen_asset_ids"])
# ---- load existing canonical cache hashes (sha256) ---- #
print(f"[init] loading existing cache hashes from {CACHE_PATH}")
_emb, meta, _src, _proc, _aliases = load_cache(CACHE_PATH)
canonical_by_hash: dict[str, str] = {}
for m in meta:
h = m.get("hash")
if h:
canonical_by_hash.setdefault(h, m["path"])
print(f"[init] {len(canonical_by_hash)} unique sha256s in nl_full.npz")
# ---- iterate assets ---- #
# Each worker does the entire I/O chain for an asset: /faces -> filter ->
# /original. That way 8 workers translate to ~8x parallelism end-to-end.
# Main thread does sha256, dedup decisions, and writes (which are CPU/SMB
# bound but cheap relative to two HTTPS round-trips per asset).
# Worker result tuple:
# (asset, faces|None, blob|None, eligible|None, error|None)
def _fetch_for_asset(asset: dict):
if asset.get("type") != "IMAGE":
return asset, None, None, None, "not_image"
aid = asset["id"]
if aid in seen:
return asset, None, None, None, "already_seen"
try:
faces = http_get(f"{API}/faces?id={aid}")
except Exception as e:
return asset, None, None, None, f"faces_error:{e}"
if not faces:
return asset, [], None, [], "no_faces"
keep, eligible = keep_asset(asset, faces)
if not keep:
return asset, faces, None, eligible, "no_big_face"
try:
blob = http_get(f"{API}/assets/{aid}/original", accept_bytes=True)
except Exception as e:
return asset, faces, None, eligible, f"download_error:{e}"
return asset, faces, blob, eligible, None
n = 0
last_flush = time.time()
t0 = time.time()
pool = ThreadPoolExecutor(max_workers=workers)
try:
for asset, faces, blob, eligible, err in pool.map(_fetch_for_asset, list_assets(user_id)):
if asset.get("type") != "IMAGE":
continue
n += 1
state["total_assets_seen"] = n
if limit is not None and n > limit:
print(f"[stop] hit --limit {limit}")
break
aid = asset["id"]
# Already-seen / non-image: silently skip.
if err == "already_seen":
continue
# Transient: count, but DON'T mark as seen so resume retries.
if err and (err.startswith("faces_error") or err.startswith("download_error")):
kind = err.split(":", 1)[0]
detail = err.split(":", 1)[1][:160] if ":" in err else err
print(f"[err] {kind} {aid}: {detail}")
state["skipped_download_error"] += 1
continue
# Permanent classifications -> seen.
if err == "no_faces":
state["skipped_no_faces"] += 1
seen.add(aid); state["seen_asset_ids"] = sorted(seen)
continue
if err == "no_big_face":
state["skipped_no_big_face"] += 1
seen.add(aid); state["seen_asset_ids"] = sorted(seen)
continue
# Have faces + blob -> dedup + save.
h = sha256_bytes(blob)
if h in canonical_by_hash:
aliases[aid] = {"sha256": h, "canonical": canonical_by_hash[h]}
state["deduped_against_existing"] += 1
seen.add(aid); state["seen_asset_ids"] = sorted(seen)
continue
if h in staged_hashes:
state["deduped_against_staged"] += 1
seen.add(aid); state["seen_asset_ids"] = sorted(seen)
continue
rel = derive_relpath(asset.get("originalPath", asset.get("originalFileName", aid)))
wsl_path = DEST_ROOT / user_label / rel
wsl_path.parent.mkdir(parents=True, exist_ok=True)
wsl_path.write_bytes(blob)
staged_hashes.add(h)
queue.append({
"asset_id": aid,
"sha256": h,
"wsl_path": str(wsl_path),
"win_path": wsl_to_win(wsl_path),
"size_bytes": len(blob),
"width": asset.get("width"),
"height": asset.get("height"),
"originalPath": asset.get("originalPath"),
"originalFileName": asset.get("originalFileName"),
"localDateTime": asset.get("localDateTime"),
"immich_eligible_faces": eligible,
})
state["staged_count"] += 1
seen.add(aid)
state["seen_asset_ids"] = sorted(seen)
if time.time() - last_flush > 5.0 or len(queue) % 25 == 0:
queue_path.write_text(json.dumps(queue, indent=2))
state_path.write_text(json.dumps(state, indent=2))
aliases_path.write_text(json.dumps(aliases, indent=2))
last_flush = time.time()
elapsed = time.time() - t0
rate = state["total_assets_seen"] / max(0.1, elapsed)
print(f"[stage] seen={state['total_assets_seen']:6d} "
f"staged={state['staged_count']:5d} "
f"dedup-existing={state['deduped_against_existing']:5d} "
f"dedup-staged={state['deduped_against_staged']:5d} "
f"no-big-face={state['skipped_no_big_face']:6d} "
f"no-faces={state['skipped_no_faces']:6d} "
f"errs={state['skipped_download_error']:3d} "
f"({rate:.1f} assets/s)")
finally:
pool.shutdown(wait=False, cancel_futures=True)
# final flush
queue_path.write_text(json.dumps(queue, indent=2))
state_path.write_text(json.dumps(state, indent=2))
aliases_path.write_text(json.dumps(aliases, indent=2))
print()
print(f"=== final state for user {user_label} ===")
for k in [
"total_assets_seen", "staged_count", "deduped_against_existing",
"deduped_against_staged", "skipped_no_big_face", "skipped_no_faces",
"skipped_download_error",
]:
print(f" {k}: {state[k]}")
total_bytes = sum(q["size_bytes"] for q in queue)
print(f" staged bytes: {total_bytes/1e9:.2f} GB across {len(queue)} files")
print(f" queue: {queue_path}")
print(f" state: {state_path}")
print(f" aliases: {aliases_path}")
# ---- cli ----------------------------------------------------------------- #
def main() -> None:
p = argparse.ArgumentParser()
if not USERS:
p.add_argument("--user", required=True,
help=f"label for output dir (USERS map empty; populate {USERS_FILE} to constrain)")
else:
p.add_argument("--user", choices=list(USERS.keys()), required=True)
p.add_argument("--limit", type=int, default=None,
help="stop after seeing N assets total (for testing)")
p.add_argument("--workers", type=int, default=8,
help="concurrent /faces fetches (default 8)")
args = p.parse_args()
stage(args.user, args.limit, args.workers)
if __name__ == "__main__":
main()