initial publish commit
This commit is contained in:
151
tdarr_requeue.mjs
Normal file
151
tdarr_requeue.mjs
Normal file
@ -0,0 +1,151 @@
|
||||
#!/usr/bin/env node
|
||||
/**
|
||||
* tdarr-requeue.mjs – v17 (2025-07-13)
|
||||
*
|
||||
* • Runs continuously; wakes up every TDARR_INTERVAL_MIN minutes.
|
||||
* • If staging count < TDARR_STAGING_LIMIT:
|
||||
* – Pull up to TDARR_BATCH_SIZE items from Status ▸ “Processed” (table2)
|
||||
* where newSize == '-'.
|
||||
* – Re-queue them via a single /bulk-update-files call
|
||||
* (same payload as the Tdarr GUI).
|
||||
*
|
||||
* All behaviour is configured via .env variables – see README.
|
||||
*/
|
||||
|
||||
import 'dotenv/config';
|
||||
import axios from 'axios';
|
||||
import pino from 'pino';
|
||||
import minimist from 'minimist';
|
||||
|
||||
/* ────────────────────────────────────────────────────────────
|
||||
* Environment / CLI
|
||||
* ─────────────────────────────────────────────────────────── */
|
||||
const cli = minimist(process.argv.slice(2));
|
||||
const API_BASE = (cli.url || process.env.TDARR_URL || 'http://localhost:8265/api/v2').replace(/\/?$/, '/');
|
||||
const API_KEY = (cli['api-key'] || process.env.TDARR_API_KEY || '').trim();
|
||||
|
||||
const STAGING_LIMIT = +process.env.TDARR_STAGING_LIMIT || 50;
|
||||
const BATCH_SIZE = +process.env.TDARR_BATCH_SIZE || 50;
|
||||
const INTERVAL_MIN = +process.env.TDARR_INTERVAL_MIN || 60; // scheduler interval
|
||||
const RETRIES = +process.env.TDARR_RETRIES || 4;
|
||||
const BACKOFF_MS = +process.env.TDARR_BACKOFF_MS || 2_000;
|
||||
const BULK_TIMEOUT_MS= +process.env.BULK_TIMEOUT_MS || 120_000; // generous timeout
|
||||
|
||||
if (!API_KEY) {
|
||||
console.error('❌ TDARR_API_KEY is missing'); process.exit(20);
|
||||
}
|
||||
|
||||
/* ─── Logger ----------------------------------------------------------- */
|
||||
const log = pino({
|
||||
level: process.env.LOG_LEVEL ?? 'info',
|
||||
transport: process.env.LOG_PRETTY === '1'
|
||||
? { target: 'pino-pretty', options: { translateTime: 'SYS:standard', colorize: true } }
|
||||
: undefined
|
||||
});
|
||||
|
||||
/* ─── Axios client ----------------------------------------------------- */
|
||||
const http = axios.create({
|
||||
baseURL: API_BASE,
|
||||
timeout: 30_000,
|
||||
headers: { 'content-type': 'application/json', 'x-api-key': API_KEY }
|
||||
});
|
||||
http.interceptors.response.use(r => r, e => {
|
||||
if ([401, 403].includes(e.response?.status)) {
|
||||
log.error('🔑 Authentication failed – check TDARR_API_KEY');
|
||||
process.exit(20);
|
||||
}
|
||||
return Promise.reject(e);
|
||||
});
|
||||
|
||||
/* ─── API helper with retry & back-off ------------------------------- */
|
||||
async function api(endpoint, payload, tries = RETRIES) {
|
||||
for (let attempt = 1; attempt <= tries; attempt++) {
|
||||
try {
|
||||
const { data } = await http.post(endpoint, { data: payload });
|
||||
return data.data ?? data; // Tdarr wraps its result in {data: …}
|
||||
} catch (err) {
|
||||
log.warn({ endpoint, attempt, status: err.response?.status, body: err.response?.data },
|
||||
'⏳ retrying…');
|
||||
if (attempt === tries) throw err;
|
||||
await new Promise(r => setTimeout(r, BACKOFF_MS * 2 ** attempt));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* ─── Tdarr helpers ---------------------------------------------------- */
|
||||
const stagingCount = () =>
|
||||
api('cruddb', { collection: 'StagedJSONDB', mode: 'getAll' })
|
||||
.then(arr => arr.length || 0);
|
||||
|
||||
const candidatePaths = () =>
|
||||
api('client/status-tables', {
|
||||
start: 0,
|
||||
pageSize: BATCH_SIZE,
|
||||
filters: [{ id: 'newSize', value: '-' }],
|
||||
sorts: [],
|
||||
opts: { table: 'table2' } // “Processed” table
|
||||
}).then(res => Array.isArray(res.array) ? res.array.map(f => f._id) : []);
|
||||
|
||||
/* ─── Robust re-queue -------------------------------------------------- */
|
||||
async function requeue(paths) {
|
||||
if (!paths.length) {
|
||||
log.warn('⚠️ No candidates found');
|
||||
return 0;
|
||||
}
|
||||
|
||||
const payload = {
|
||||
fileIds: paths,
|
||||
updatedObj:{ TranscodeDecisionMaker: 'Queued' }
|
||||
};
|
||||
log.debug({ payload }, '📨 bulk-update-files payload');
|
||||
|
||||
const before = await stagingCount();
|
||||
|
||||
try {
|
||||
await http.post('bulk-update-files', { data: payload }, { timeout: BULK_TIMEOUT_MS });
|
||||
} catch (err) {
|
||||
if (err.code === 'ECONNABORTED') {
|
||||
log.warn('⌛ bulk-update-files timed out – verifying outcome …');
|
||||
const after = await stagingCount();
|
||||
if (after > before) {
|
||||
log.info({ before, after }, '✅ bulk-update-files succeeded despite timeout');
|
||||
return paths.length;
|
||||
}
|
||||
}
|
||||
throw err; // propagate any other failure or verification miss
|
||||
}
|
||||
return paths.length;
|
||||
}
|
||||
|
||||
/* ─── One processing cycle -------------------------------------------- */
|
||||
async function runCycle() {
|
||||
log.info('🚀 Requeue cycle started');
|
||||
|
||||
const before = await stagingCount();
|
||||
log.info({ before }, '📦 items currently in staging');
|
||||
|
||||
if (before >= STAGING_LIMIT) {
|
||||
log.info('🟢 staging limit reached – nothing to do');
|
||||
return;
|
||||
}
|
||||
|
||||
const paths = await candidatePaths();
|
||||
log.debug({ sample: paths.slice(0, 3) }, '🔍 example candidates');
|
||||
|
||||
const queued = await requeue(paths);
|
||||
log.info({ queued }, '✅ files requeued');
|
||||
|
||||
const after = await stagingCount();
|
||||
log.info({ after }, '📈 staging count after requeue');
|
||||
}
|
||||
|
||||
/* ─── Built-in scheduler ---------------------------------------------- */
|
||||
(async () => {
|
||||
log.info(`📅 Scheduler running every ${INTERVAL_MIN} minute(s)`);
|
||||
while (true) {
|
||||
try { await runCycle(); }
|
||||
catch (err) { log.error({ err }, '💥 cycle failed'); }
|
||||
|
||||
await new Promise(r => setTimeout(r, INTERVAL_MIN * 60_000));
|
||||
}
|
||||
})();
|
Reference in New Issue
Block a user