Add Immich outage circuit breaker; document nic run + Tailscale quirk

work/immich_stage.py:
- Startup probe of /server/version (exit 2 if unreachable).
- Outage circuit breaker: after OUTAGE_FAIL_STREAK=12 consecutive
  faces_error/download_error results, run a quick probe; if the probe
  also fails, persist state and exit with code 2 so a long unattended
  run can pause rather than silently churning through tens of thousands
  of retries during an upstream outage. Resume by re-running the same
  command -- state.json + queue.json are intact.

README:
- Document the nic run (per-user API key necessary; second pipeline
  invocation confirmed expected behavior; cleaner library than peter's
  with 0 internal byte-dupes vs 2,976).
- Mention the circuit breaker as the mechanism that keeps long
  unattended runs safe under the known Tailscale flicker pattern at
  this site.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-26 23:36:11 +02:00
parent 321fed01cc
commit 62dba3ddb3
2 changed files with 52 additions and 0 deletions

View File

@@ -78,6 +78,12 @@ HTTP_TIMEOUT = 60 # seconds, conservative for big originals
HTTP_RETRIES = 3
HTTP_BACKOFF = 2.0
# Circuit breaker: if this many consecutive workers fail with network errors,
# probe Immich; if probe also fails, exit cleanly with code 2 so the orchestrator
# can pause until the user says resume. State is preserved (resume-safe).
OUTAGE_FAIL_STREAK = 12
OUTAGE_PROBE_TIMEOUT = 8
# ---- helpers ------------------------------------------------------------- #
def http_get(url: str, accept_bytes: bool = False) -> bytes | dict:
@@ -96,6 +102,16 @@ def http_get(url: str, accept_bytes: bool = False) -> bytes | dict:
raise RuntimeError(f"GET {url} failed after {HTTP_RETRIES} attempts: {last_err}")
def probe_immich() -> bool:
"""Quick connectivity probe (no retry). Used by the circuit breaker."""
try:
req = urllib.request.Request(f"{API}/server/version", headers=HEADERS)
urllib.request.urlopen(req, timeout=OUTAGE_PROBE_TIMEOUT).read()
return True
except Exception:
return False
def http_post(url: str, payload: dict) -> dict:
last_err = None
body = json.dumps(payload).encode("utf-8")
@@ -241,6 +257,12 @@ def stage(user_label: str, limit: int | None, workers: int) -> None:
f"{len(queue)} in queue, {len(aliases)} aliased to existing cache")
seen = set(state["seen_asset_ids"])
# ---- startup connectivity probe ---- #
if not probe_immich():
print(f"[init] Immich probe failed at {API}/server/version -- exiting code 2")
sys.exit(2)
print("[init] Immich reachable")
# ---- load existing canonical cache hashes (sha256) ---- #
print(f"[init] loading existing cache hashes from {CACHE_PATH}")
_emb, meta, _src, _proc, _aliases = load_cache(CACHE_PATH)
@@ -280,6 +302,7 @@ def stage(user_label: str, limit: int | None, workers: int) -> None:
return asset, faces, blob, eligible, None
n = 0
err_streak = 0
last_flush = time.time()
t0 = time.time()
pool = ThreadPoolExecutor(max_workers=workers)
@@ -304,7 +327,22 @@ def stage(user_label: str, limit: int | None, workers: int) -> None:
detail = err.split(":", 1)[1][:160] if ":" in err else err
print(f"[err] {kind} {aid}: {detail}")
state["skipped_download_error"] += 1
err_streak += 1
# Circuit breaker: long streak -> probe; if down, save and exit.
if err_streak >= OUTAGE_FAIL_STREAK:
print(f"[breaker] {err_streak} consecutive errors; probing Immich...")
if probe_immich():
print("[breaker] probe ok, treating as transient; continuing")
err_streak = 0
else:
print("[breaker] probe FAILED -- pausing run; resume with same command")
queue_path.write_text(json.dumps(queue, indent=2))
state_path.write_text(json.dumps(state, indent=2))
aliases_path.write_text(json.dumps(aliases, indent=2))
sys.exit(2)
continue
else:
err_streak = 0
# Permanent classifications -> seen.
if err == "no_faces":