Add enrich + export-swap pipeline for downstream face-swap ready output

- enrich: re-detects each cached face with buffalo_l (detection +
  landmark_2d_106 + landmark_3d_68, recognition module skipped for speed)
  and persists landmarks + pose into the cache so per-face frontality and
  landmark-symmetry quality signals become available.
- compute_quality: composite score combining det_score, face short-edge,
  blur, frontality (from pose pitch/yaw), and 2D-landmark symmetry with
  tunable weights. Default weighting 0.30/0.20/0.20/0.15/0.15.
- export-swap: builds facesets_swap_ready/ from an existing refine
  manifest. Per identity: tighter outlier gate (default 0.45), visual-
  near-dupe collapse (keep best representative per group), multi-face-
  per-source-image collapse (keep best bbox), rank by composite score,
  single-face-per-PNG crops at 512x512 with 0.5 bbox padding, ready-to-
  drop .fsz bundles (top-N + full), per-faceset manifest.json, NAME.txt
  placeholder for the operator. The multi-face-per-PNG collapse is the
  critical fix: roop-unleashed's .fsz loader appends every detected face
  in each PNG to the FaceSet, so any multi-face crop would contaminate
  the averaged embedding.
- Optional --candidates rescues raw_full singletons: matches against the
  final per-faceset centroids and routes to _candidates/to_<faceset>/
  for manual review; orphaned singletons that still cluster among
  themselves land in _candidates/new_<NNN>/.
- docs/analysis/: evaluation document captures the evidence, downstream
  requirements (FaceSet averaging, inswapper_128), opportunity matrix
  (R1-R14), and the recommended target state this export implements.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-23 22:37:32 +02:00
parent 484278e70e
commit d53ab9fbfc
2 changed files with 774 additions and 0 deletions

View File

@@ -943,6 +943,519 @@ def cmd_extend(
print(f"Updated refine manifest -> {refine_manifest_path}")
# ---------- enrich (landmarks + pose per face record) ---------- #
def _pick_face_for_bbox(faces: list, stored_bbox: list[int]):
"""Given freshly-detected faces and a stored bbox, return the detected face whose
bbox has the highest IoU with stored_bbox (or None if no overlap)."""
if not faces:
return None
sx1, sy1, sx2, sy2 = stored_bbox
sa = max(1, (sx2 - sx1) * (sy2 - sy1))
best = None
best_iou = 0.0
for f in faces:
x1, y1, x2, y2 = [int(round(v)) for v in f.bbox]
ix1, iy1 = max(sx1, x1), max(sy1, y1)
ix2, iy2 = min(sx2, x2), min(sy2, y2)
if ix2 <= ix1 or iy2 <= iy1:
continue
inter = (ix2 - ix1) * (iy2 - iy1)
fa = max(1, (x2 - x1) * (y2 - y1))
union = sa + fa - inter
iou = inter / union
if iou > best_iou:
best_iou = iou
best = f
return best if best_iou >= 0.3 else None
def cmd_enrich(cache_path: Path, force: bool, flush_every: int) -> None:
"""Re-detect every face record's source image to persist landmarks + pose.
Skips the recognition module (we already have embeddings) so detection + the two
landmark models are the only ones loaded.
"""
emb, meta, src_root, processed, path_aliases = load_cache(cache_path)
if src_root is None:
src_root = Path("/")
to_do: list[int] = []
for i, m in enumerate(meta):
if m.get("noface"):
continue
if force or not m.get("pose"):
to_do.append(i)
if not to_do:
print("Enrich: nothing to do; every face record already has pose.")
return
# Group indices by source path so each image is decoded exactly once.
path_to_indices: dict[str, list[int]] = {}
for i in to_do:
path_to_indices.setdefault(meta[i]["path"], []).append(i)
print(f"Enrich: {len(to_do)} face records to enrich across {len(path_to_indices)} unique files")
from insightface.app import FaceAnalysis
app = FaceAnalysis(
name="buffalo_l",
providers=["CPUExecutionProvider"],
allowed_modules=["detection", "landmark_2d_106", "landmark_3d_68"],
)
app.prepare(ctx_id=-1, det_size=(640, 640))
since_flush = 0
missing = 0
ok = 0
try:
for path, idxs in tqdm(path_to_indices.items(), desc="enriching"):
rgb, bgr = load_rgb_bgr(Path(path))
if bgr is None:
missing += len(idxs)
continue
faces = app.get(bgr)
for i in idxs:
match = _pick_face_for_bbox(faces, meta[i].get("bbox"))
if match is None:
missing += 1
continue
if match.landmark_2d_106 is not None:
meta[i]["landmark_2d_106"] = match.landmark_2d_106.astype(np.float32).tolist()
if match.landmark_3d_68 is not None:
meta[i]["landmark_3d_68"] = match.landmark_3d_68.astype(np.float32).tolist()
if match.pose is not None:
meta[i]["pose"] = match.pose.astype(np.float32).tolist() # [pitch, yaw, roll]
ok += 1
since_flush += 1
if since_flush >= flush_every:
save_cache(cache_path, emb, meta, src_root, processed, path_aliases)
since_flush = 0
finally:
save_cache(cache_path, emb, meta, src_root, processed, path_aliases)
print(f"Enrich done: {ok} records enriched, {missing} could not be matched")
# ---------- quality scoring ---------- #
QUALITY_WEIGHTS = {
"det": 0.20,
"size": 0.15,
"sharp": 0.15,
"frontal": 0.30,
"symmetry": 0.20,
}
def _norm01(x: float, lo: float, hi: float) -> float:
if hi <= lo:
return 0.0
return max(0.0, min(1.0, (x - lo) / (hi - lo)))
def _landmark_symmetry(lm: list[list[float]] | None, bbox: list[int] | None) -> float:
"""Score [0,1] based on how symmetric the 2D 106 landmarks are about the bbox vertical center.
A head-on, un-occluded face has high symmetry; a strong profile or half-occluded face has low.
Returns 0.5 if landmarks unavailable (neutral)."""
if not lm or not bbox:
return 0.5
try:
arr = np.asarray(lm, dtype=np.float32)
cx = 0.5 * (bbox[0] + bbox[2])
width = max(1.0, bbox[2] - bbox[0])
# Mirror each landmark around cx and measure closest-landmark distance (normalized by bbox width).
mirrored = arr.copy()
mirrored[:, 0] = 2 * cx - mirrored[:, 0]
# For each mirrored point, find nearest real landmark.
d = np.linalg.norm(mirrored[:, None, :] - arr[None, :, :], axis=2).min(axis=1)
mean_err = d.mean() / width
# Empirically mean_err is ~0.02 for frontal, ~0.15 for strong profile.
score = 1.0 - _norm01(mean_err, 0.02, 0.15)
return float(score)
except Exception:
return 0.5
def _frontality(pose: list[float] | None) -> float:
if not pose or len(pose) < 2:
return 0.5
pitch, yaw = abs(pose[0]), abs(pose[1])
# yaw is the dominant signal for arcface-style embedding degradation.
yaw_score = 1.0 - _norm01(yaw, 10.0, 45.0)
pitch_score = 1.0 - _norm01(pitch, 10.0, 35.0)
return 0.7 * yaw_score + 0.3 * pitch_score
def compute_quality(rec: dict) -> dict:
"""Return dict with per-signal sub-scores and a composite score in [0,1]."""
det = _norm01(float(rec.get("det_score", 0.0)), 0.50, 0.95)
size = _norm01(float(rec.get("face_short", 0)), 90.0, 300.0)
sharp = _norm01(float(rec.get("blur", 0.0)), 40.0, 250.0)
frontal = _frontality(rec.get("pose"))
symmetry = _landmark_symmetry(rec.get("landmark_2d_106"), rec.get("bbox"))
w = QUALITY_WEIGHTS
composite = (
w["det"] * det + w["size"] * size + w["sharp"] * sharp
+ w["frontal"] * frontal + w["symmetry"] * symmetry
)
return {
"composite": float(composite),
"det": float(det), "size": float(size), "sharp": float(sharp),
"frontal": float(frontal), "symmetry": float(symmetry),
}
# ---------- export-swap ---------- #
def _crop_face_square(rgb: np.ndarray, bbox: list[int], pad_ratio: float, out_size: int) -> np.ndarray:
"""Pad bbox by `pad_ratio` on each side, clamp to image, pad to square, resize to out_size."""
import cv2
h, w = rgb.shape[:2]
x1, y1, x2, y2 = [int(v) for v in bbox]
bw, bh = x2 - x1, y2 - y1
px = int(bw * pad_ratio)
py = int(bh * pad_ratio)
ex1 = max(0, x1 - px)
ey1 = max(0, y1 - py)
ex2 = min(w, x2 + px)
ey2 = min(h, y2 + py)
crop = rgb[ey1:ey2, ex1:ex2]
ch, cw = crop.shape[:2]
if ch == 0 or cw == 0:
return np.zeros((out_size, out_size, 3), dtype=np.uint8)
if ch != cw:
sz = max(ch, cw)
padded = np.zeros((sz, sz, 3), dtype=crop.dtype)
y_off = (sz - ch) // 2
x_off = (sz - cw) // 2
padded[y_off:y_off + ch, x_off:x_off + cw] = crop
crop = padded
if crop.shape[0] != out_size:
crop = cv2.resize(crop, (out_size, out_size), interpolation=cv2.INTER_AREA)
return crop
def _zip_png_list(pngs: list[Path], zip_path: Path) -> None:
"""Write a .fsz (zip) with the given PNGs named 0000.png, 0001.png, ..."""
import zipfile
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=4) as zf:
for i, p in enumerate(pngs):
zf.write(p, arcname=f"{i:04d}.png")
def cmd_export_swap(
cache_path: Path,
refine_manifest_path: Path,
raw_manifest_path: Path | None,
out_dir: Path,
top_n: int,
outlier_threshold: float,
pad_ratio: float,
out_size: int,
include_candidates: bool,
candidate_match_threshold: float,
candidate_min_score: float,
min_face_short: int,
) -> None:
import cv2
emb, meta, src_root, _processed, path_aliases = load_cache(cache_path)
rm = json.loads(refine_manifest_path.read_text())
dup_path = cache_path.with_suffix(".duplicates.json")
if not dup_path.exists():
dup_path = cache_path.parent / (cache_path.stem + ".duplicates.json")
visual_groups: list[list[str]] = []
if dup_path.exists():
visual_groups = json.loads(dup_path.read_text()).get("visual_groups", [])
path_to_vgroup: dict[str, tuple[str, ...]] = {}
for g in visual_groups:
key = tuple(sorted(g))
for p in g:
path_to_vgroup[p] = key
face_records = [m for m in meta if not m.get("noface")]
if len(face_records) != len(emb):
raise SystemExit(f"meta/embedding mismatch: {len(face_records)} vs {len(emb)}")
path_idx: dict[str, list[int]] = {}
for i, m in enumerate(face_records):
path_idx.setdefault(m["path"], []).append(i)
out_dir.mkdir(parents=True, exist_ok=True)
faceset_summary = []
final_centroids: dict[str, np.ndarray] = {}
placed_cache_indices: set[int] = set()
for fs in rm.get("facesets", []):
name = fs["name"]
paths = set(fs.get("images", []))
indices = [i for p in paths for i in path_idx.get(p, [])]
if not indices:
continue
# Initial centroid for this faceset from all its current members.
init_vecs = emb[indices]
init_cent = init_vecs.mean(axis=0)
nrm = np.linalg.norm(init_cent)
if nrm > 0:
init_cent = init_cent / nrm
# Tight outlier filter + quality.
ranked: list[dict] = []
dropped_outlier = 0
for i in indices:
cosd = 1.0 - float(emb[i] @ init_cent)
if cosd > outlier_threshold:
dropped_outlier += 1
continue
rec = face_records[i]
if rec.get("face_short", 0) < min_face_short:
continue
q = compute_quality(rec)
ranked.append({"cache_idx": i, "rec": rec, "cosd": cosd, "quality": q})
# Visual-dupe collapse: keep best score per group.
groups_best: dict[tuple[str, ...], dict] = {}
singletons: list[dict] = []
for r in ranked:
g = path_to_vgroup.get(r["rec"]["path"])
if g is None:
singletons.append(r)
continue
prev = groups_best.get(g)
if prev is None or r["quality"]["composite"] > prev["quality"]["composite"]:
groups_best[g] = r
kept = singletons + list(groups_best.values())
kept.sort(key=lambda r: -r["quality"]["composite"])
dropped_vdupe = len(ranked) - len(kept)
if not kept:
print(f"[{name}] empty after filtering; skipping")
continue
# Recompute centroid from the kept embeddings (used for singleton rescue).
kept_vecs = np.stack([emb[r["cache_idx"]] for r in kept])
final_cent = kept_vecs.mean(axis=0)
nrm = np.linalg.norm(final_cent)
if nrm > 0:
final_cent = final_cent / nrm
final_centroids[name] = final_cent
for r in kept:
placed_cache_indices.add(r["cache_idx"])
# Materialize.
fs_out = out_dir / name
faces_dir = fs_out / "faces"
faces_dir.mkdir(parents=True, exist_ok=True)
# Deduplicate by source path: within the same faceset, a multi-face photo could
# have produced 2 records with different bboxes; we want the one with the best quality
# to win, and only crop that face.
seen_path = {}
unique_kept: list[dict] = []
for r in kept:
p = r["rec"]["path"]
if p not in seen_path or r["quality"]["composite"] > seen_path[p]["quality"]["composite"]:
seen_path[p] = r
unique_kept = sorted(seen_path.values(), key=lambda r: -r["quality"]["composite"])
written_pngs: list[Path] = []
manifest_faces: list[dict] = []
for rank, r in enumerate(unique_kept, start=1):
rec = r["rec"]
src = Path(rec["path"])
rgb = None
if src.exists():
rgb, _ = load_rgb_bgr(src)
if rgb is None:
continue
crop = _crop_face_square(rgb, rec["bbox"], pad_ratio, out_size)
png = faces_dir / f"{rank:04d}.png"
cv2.imwrite(str(png), cv2.cvtColor(crop, cv2.COLOR_RGB2BGR))
written_pngs.append(png)
manifest_faces.append({
"rank": rank,
"png": f"faces/{rank:04d}.png",
"source": rec["path"],
"aliases": path_aliases.get(rec["path"], []),
"bbox": rec["bbox"],
"face_short": rec.get("face_short"),
"det_score": rec.get("det_score"),
"blur": rec.get("blur"),
"pose": rec.get("pose"),
"cosd_centroid": float(r["cosd"]),
"quality": r["quality"],
})
if not written_pngs:
continue
# Emit .fsz bundles.
top_n_eff = min(top_n, len(written_pngs))
_zip_png_list(written_pngs[:top_n_eff], fs_out / f"{name}_top{top_n_eff}.fsz")
if len(written_pngs) > top_n_eff:
_zip_png_list(written_pngs, fs_out / f"{name}_all.fsz")
# Per-faceset manifest.
manifest = {
"name": name,
"input_face_records": len(indices),
"dropped_outlier": dropped_outlier,
"dropped_visual_dupes": dropped_vdupe,
"dropped_multi_face_same_source": len(kept) - len(unique_kept),
"exported": len(written_pngs),
"top_n": top_n_eff,
"fsz_top": f"{name}_top{top_n_eff}.fsz",
"fsz_all": f"{name}_all.fsz" if len(written_pngs) > top_n_eff else None,
"quality_weights": QUALITY_WEIGHTS,
"faces": manifest_faces,
}
(fs_out / "manifest.json").write_text(json.dumps(manifest, indent=2))
# Convenience name placeholder.
name_file = fs_out / "NAME.txt"
if not name_file.exists():
name_file.write_text(
"# Optional: write the identity's name on the first line.\n"
"# This file is for operator reference only - roop-unleashed ignores it.\n\n"
)
faceset_summary.append(manifest)
print(
f"[{name}] in={len(indices)} outlier_drop={dropped_outlier} vdupe_drop={dropped_vdupe} "
f"multiface_drop={len(kept) - len(unique_kept)} exported={len(written_pngs)} "
f"(top{top_n_eff}.fsz)"
)
# Singleton rescue -> _candidates/
if include_candidates and raw_manifest_path is not None:
raw = json.loads(raw_manifest_path.read_text())
# Index singletons: face records in _singletons by (path, bbox) => cache index
bbox_key_to_cache = {
(m["path"], tuple(m["bbox"]) if m.get("bbox") else None): i
for i, m in enumerate(face_records)
}
singleton_cache_indices: list[int] = []
for e in raw:
if e.get("folder") != "_singletons":
continue
key = (e["source"], tuple(e["bbox"]) if e.get("bbox") else None)
ci = bbox_key_to_cache.get(key)
if ci is not None and ci not in placed_cache_indices:
singleton_cache_indices.append(ci)
if not final_centroids:
print("No final centroids; skipping candidates.")
elif not singleton_cache_indices:
print("No singletons to rescue.")
else:
cand_root = out_dir / "_candidates"
cand_root.mkdir(parents=True, exist_ok=True)
cent_names = list(final_centroids.keys())
cent_mat = np.stack([final_centroids[n] for n in cent_names])
to_faceset: dict[str, list[int]] = {}
unmatched: list[int] = []
rescued_report: list[dict] = []
for ci in singleton_cache_indices:
rec = face_records[ci]
if rec.get("face_short", 0) < min_face_short:
continue
q = compute_quality(rec)
if q["composite"] < candidate_min_score:
continue
sims = cent_mat @ emb[ci]
best = int(np.argmax(sims))
dist = 1.0 - float(sims[best])
if dist <= candidate_match_threshold:
to_faceset.setdefault(cent_names[best], []).append(ci)
rescued_report.append({
"cache_idx": ci, "source": rec["path"], "assigned": cent_names[best],
"cosd": dist, "quality": q,
})
else:
unmatched.append(ci)
# Cluster unmatched among themselves into new_NNN buckets.
if len(unmatched) > 1:
u_vecs = np.stack([emb[i] for i in unmatched])
labels = _cluster_embeddings(u_vecs, 0.55)
groups: dict[int, list[int]] = {}
for ci, lbl in zip(unmatched, labels):
groups.setdefault(int(lbl), []).append(ci)
groups_sorted = sorted(groups.items(), key=lambda kv: -len(kv[1]))
new_buckets = {}
rank = 0
for _gid, members in groups_sorted:
if len(members) == 1:
continue # still a singleton, skip
rank += 1
new_buckets[f"new_{rank:03d}"] = members
to_new = new_buckets
else:
to_new = {}
# Materialize candidates
def materialize(bucket_name: str, ci_list: list[int]):
bd = cand_root / bucket_name
fd = bd / "faces"
fd.mkdir(parents=True, exist_ok=True)
written = []
entries = []
ranked_cis = sorted(ci_list, key=lambda i: -compute_quality(face_records[i])["composite"])
for rk, ci in enumerate(ranked_cis, 1):
rec = face_records[ci]
src = Path(rec["path"])
if not src.exists():
continue
rgb, _ = load_rgb_bgr(src)
if rgb is None:
continue
crop = _crop_face_square(rgb, rec["bbox"], pad_ratio, out_size)
png = fd / f"{rk:04d}.png"
cv2.imwrite(str(png), cv2.cvtColor(crop, cv2.COLOR_RGB2BGR))
written.append(png)
entries.append({
"rank": rk,
"png": f"faces/{rk:04d}.png",
"source": rec["path"],
"bbox": rec["bbox"],
"quality": compute_quality(rec),
})
if written:
(bd / "manifest.json").write_text(json.dumps({
"bucket": bucket_name,
"faces": entries,
}, indent=2))
for fs_name, cis in to_faceset.items():
materialize(f"to_{fs_name}", cis)
for bname, cis in to_new.items():
materialize(bname, cis)
(cand_root / "rescue_report.json").write_text(json.dumps({
"rescued_to_existing": len(rescued_report),
"new_clusters": len(to_new),
"unmatched_singletons_kept_as_singleton": len(unmatched) - sum(len(v) for v in to_new.values()),
"assignments": rescued_report,
}, indent=2))
print(f"Candidates: rescued={len(rescued_report)} to existing facesets; new_clusters={len(to_new)}")
# Top-level manifest
(out_dir / "manifest.json").write_text(json.dumps({
"facesets": [{k: v for k, v in m.items() if k != "faces"} for m in faceset_summary],
"quality_weights": QUALITY_WEIGHTS,
"outlier_threshold": outlier_threshold,
"top_n": top_n,
"pad_ratio": pad_ratio,
"out_size": out_size,
}, indent=2))
print(f"Wrote top-level manifest -> {out_dir / 'manifest.json'}")
# ---------- main ---------- #
def main() -> None:
@@ -992,6 +1505,25 @@ def main() -> None:
px.add_argument("--refine-min-det-score", type=float, default=0.6)
px.add_argument("--refine-centroid-threshold", type=float, default=0.55)
pn = sub.add_parser("enrich", help="Re-detect to persist landmark_2d_106, landmark_3d_68, pose into cache")
pn.add_argument("cache", type=Path)
pn.add_argument("--force", action="store_true", help="re-enrich even records that already have pose")
pn.add_argument("--flush-every", type=int, default=100)
pxs = sub.add_parser("export-swap", help="Build facesets_swap_ready/ with ranked single-face PNGs + .fsz per identity")
pxs.add_argument("cache", type=Path)
pxs.add_argument("refine_manifest", type=Path, help="path to refine_manifest.json of the source facesets dir")
pxs.add_argument("out_dir", type=Path)
pxs.add_argument("--raw-manifest", type=Path, default=None, help="raw_full/manifest.json (required for --candidates)")
pxs.add_argument("--top-n", type=int, default=30)
pxs.add_argument("--outlier-threshold", type=float, default=0.45)
pxs.add_argument("--pad-ratio", type=float, default=0.5)
pxs.add_argument("--out-size", type=int, default=512)
pxs.add_argument("--min-face-short", type=int, default=100)
pxs.add_argument("--candidates", action="store_true", help="rescue singletons into _candidates/")
pxs.add_argument("--candidate-match-threshold", type=float, default=0.55)
pxs.add_argument("--candidate-min-score", type=float, default=0.40)
args = p.parse_args()
if args.cmd == "embed":
cmd_embed(args.src_dir, args.cache, resume=not args.no_resume, flush_every=args.flush_every)
@@ -1013,6 +1545,15 @@ def main() -> None:
args.refine_min_short, args.refine_min_blur, args.refine_min_det_score,
args.refine_centroid_threshold,
)
elif args.cmd == "enrich":
cmd_enrich(args.cache, force=args.force, flush_every=args.flush_every)
elif args.cmd == "export-swap":
cmd_export_swap(
args.cache, args.refine_manifest, args.raw_manifest, args.out_dir,
args.top_n, args.outlier_threshold, args.pad_ratio, args.out_size,
args.candidates, args.candidate_match_threshold, args.candidate_min_score,
args.min_face_short,
)
if __name__ == "__main__":