/** * ePay batch order queue. * * ePay has a GLOBAL cart per account — only one order can be processed * at a time. This queue ensures sequential execution. * * BATCH flow (correct — ONE order for N parcels): * 1. Check credits (need >= N) * 2. For each parcel: addToCart → saveMetadata * 3. ONE EditCartSubmit → ONE orderId * 4. Poll that orderId until "Finalizata" * 5. Download ALL documents from that order * 6. Store each in MinIO, update DB records */ import { prisma } from "@/core/storage/prisma"; import { EpayClient } from "./epay-client"; import { getEpayCredentials, updateEpayCredits } from "./epay-session-store"; import { storeCfExtract } from "./epay-storage"; import type { CfExtractCreateInput } from "./epay-types"; /* ------------------------------------------------------------------ */ /* Types */ /* ------------------------------------------------------------------ */ type QueueItem = { extractId: string; // CfExtract.id in DB input: CfExtractCreateInput; basketRowId?: number; // set during cart phase }; type BatchJob = { items: QueueItem[]; }; /* ------------------------------------------------------------------ */ /* Global singleton queue */ /* ------------------------------------------------------------------ */ const g = globalThis as { __epayBatchQueue?: BatchJob[]; __epayQueueProcessing?: boolean; __epayDedupMap?: Map; }; if (!g.__epayBatchQueue) g.__epayBatchQueue = []; if (!g.__epayDedupMap) g.__epayDedupMap = new Map(); /** TTL for dedup entries in milliseconds (60 seconds). */ const DEDUP_TTL_MS = 60_000; /** * Build a dedup key from a list of cadastral numbers. * Sorted and joined so order doesn't matter. */ function batchDedupKey(inputs: CfExtractCreateInput[]): string { return inputs .map((i) => i.nrCadastral) .sort() .join(","); } /** * Remove expired entries from the dedup map. */ function cleanupDedupMap(): void { const now = Date.now(); const map = g.__epayDedupMap!; for (const [key, entry] of map) { if (now - entry.timestamp > DEDUP_TTL_MS) { map.delete(key); } } } /* ------------------------------------------------------------------ */ /* Public API */ /* ------------------------------------------------------------------ */ /** * Enqueue a single CF extract order (backwards compatible). * Creates a DB record, wraps as a batch of 1, and adds to the queue. */ export async function enqueueOrder( input: CfExtractCreateInput, ): Promise { const ids = await enqueueBatch([input]); const first = ids[0]; if (!first) throw new Error("enqueueBatch returned empty array"); return first; } /** * Enqueue a batch of CF extract orders. * Creates all DB records, then processes as ONE ePay order. * Returns the CfExtract IDs immediately. * * Dedup protection: if the same set of cadastral numbers was enqueued * within the last 60 seconds, returns the existing extract IDs instead * of creating duplicate DB records and orders. */ export async function enqueueBatch( inputs: CfExtractCreateInput[], ): Promise { if (inputs.length === 0) return []; // ── Dedup check ── cleanupDedupMap(); const dedupKey = batchDedupKey(inputs); const existing = g.__epayDedupMap!.get(dedupKey); if (existing && Date.now() - existing.timestamp < DEDUP_TTL_MS) { console.log( `[epay-queue] Dedup hit: batch [${dedupKey}] was enqueued ${Math.round((Date.now() - existing.timestamp) / 1000)}s ago — returning existing IDs`, ); return existing.extractIds; } const items: QueueItem[] = []; for (const input of inputs) { // Create DB record in "queued" status — use transaction + advisory lock // to prevent duplicate version numbers from concurrent requests const record = await prisma.$transaction(async (tx) => { await tx.$executeRaw`SELECT pg_advisory_xact_lock(hashtext(${'cfextract:' + input.nrCadastral}))`; const agg = await tx.cfExtract.aggregate({ where: { nrCadastral: input.nrCadastral }, _max: { version: true }, }); return tx.cfExtract.create({ data: { nrCadastral: input.nrCadastral, nrCF: input.nrCF ?? input.nrCadastral, siruta: input.siruta, judetIndex: input.judetIndex, judetName: input.judetName, uatId: input.uatId, uatName: input.uatName, gisFeatureId: input.gisFeatureId, prodId: input.prodId ?? 14200, status: "queued", version: (agg._max.version ?? 0) + 1, }, }); }); items.push({ extractId: record.id, input }); } const extractIds = items.map((i) => i.extractId); // ── Record in dedup map ── g.__epayDedupMap!.set(dedupKey, { timestamp: Date.now(), extractIds, }); g.__epayBatchQueue!.push({ items }); console.log( `[epay-queue] Enqueued batch of ${items.length}: ${items.map((i) => i.input.nrCadastral).join(", ")} (queue=${g.__epayBatchQueue!.length})`, ); // Start processing if not already running void processQueue(); return extractIds; } /** * Get queue status for UI display. */ export function getQueueStatus(): { length: number; processing: boolean; } { return { length: g.__epayBatchQueue?.length ?? 0, processing: g.__epayQueueProcessing ?? false, }; } /* ------------------------------------------------------------------ */ /* Queue Processor */ /* ------------------------------------------------------------------ */ async function processQueue(): Promise { if (g.__epayQueueProcessing) return; // already running g.__epayQueueProcessing = true; // Track all orderIds from this session to avoid duplicates const knownOrderIds = new Set(); try { while (g.__epayBatchQueue && g.__epayBatchQueue.length > 0) { const batch = g.__epayBatchQueue.shift()!; const orderId = await processBatch(batch.items, knownOrderIds); if (orderId) knownOrderIds.add(orderId); } } finally { g.__epayQueueProcessing = false; } } async function updateStatus( id: string, status: string, extra?: Record, ): Promise { await prisma.cfExtract.update({ where: { id }, data: { status, ...extra }, }); } /** * Process a batch of items as ONE ePay order: * 1. Check credits (>= N) * 2. addToCart + saveMetadata for each * 3. ONE submitOrder * 4. Poll until complete * 5. Download + store ALL documents */ async function processBatch( items: QueueItem[], knownOrderIds: Set, ): Promise { const extractIds = items.map((i) => i.extractId); const count = items.length; try { // Get ePay credentials const creds = getEpayCredentials(); if (!creds) { for (const id of extractIds) { await updateStatus(id, "failed", { errorMessage: "Nu ești conectat la ePay.", }); } return null; } const client = await EpayClient.create(creds.username, creds.password); // Step 1: Check credits (need >= count) const credits = await client.getCredits(); updateEpayCredits(credits); if (credits < count) { for (const id of extractIds) { await updateStatus(id, "failed", { errorMessage: `Credite insuficiente: ${credits} disponibile, ${count} necesare.`, }); } return null; } // Step 2: addToCart + saveMetadata for EACH item for (const item of items) { const { extractId, input } = item; await updateStatus(extractId, "cart"); const basketRowId = await client.addToCart(input.prodId ?? 14200); item.basketRowId = basketRowId; await updateStatus(extractId, "cart", { basketRowId }); // Resolve county/UAT from SIRUTA if available let countyInternalId = input.judetIndex; let uatInternalId = input.uatId; let countyName = input.judetName; let uatName = input.uatName; if (input.siruta) { const uat = await prisma.gisUat.findUnique({ where: { siruta: input.siruta }, select: { workspacePk: true, county: true, name: true }, }); if (uat?.workspacePk) { countyInternalId = uat.workspacePk; uatInternalId = Number(input.siruta); if (uat.county) countyName = uat.county; if (uat.name) uatName = uat.name; } } const nrCF = input.nrCF ?? input.nrCadastral; await updateStatus(extractId, "ordering"); const saved = await client.saveMetadata( basketRowId, countyInternalId, countyName, uatInternalId, uatName, nrCF, input.nrCadastral, process.env.ANCPI_DEFAULT_SOLICITANT_ID || "14452", ); if (!saved) { await updateStatus(extractId, "failed", { errorMessage: "Salvarea metadatelor în ePay a eșuat.", }); // Continue with remaining items — the cart still has them // but this one won't get metadata. Remove from batch. item.basketRowId = undefined; } } // Filter to only items that had successful metadata saves const validItems = items.filter((i) => i.basketRowId !== undefined); if (validItems.length === 0) { return null; } // Step 3: ONE submitOrder for ALL items console.log( `[epay-queue] Submitting order for ${validItems.length} items...`, ); const orderId = await client.submitOrder(knownOrderIds); // Update all valid items with the shared orderId for (const item of validItems) { await updateStatus(item.extractId, "polling", { orderId }); } // Step 4: Poll until complete const finalStatus = await client.pollUntilComplete( orderId, async (attempt, status) => { for (const item of validItems) { await updateStatus(item.extractId, "polling", { epayStatus: status, pollAttempts: attempt, }); } }, ); if ( finalStatus.status === "Anulata" || finalStatus.status === "Plata refuzata" ) { for (const item of validItems) { await updateStatus(item.extractId, "cancelled", { epayStatus: finalStatus.status, errorMessage: `Comanda ${finalStatus.status.toLowerCase()}.`, }); } return null; } // Step 5: Download ALL documents and match to items // The order may contain multiple documents — match by filename/nrCadastral const downloadableDocs = finalStatus.documents.filter( (d) => d.downloadValabil && d.contentType === "application/pdf", ); console.log( `[epay-queue] Order ${orderId}: ${downloadableDocs.length} documents for ${validItems.length} items`, ); // Match documents to items by CF number (from documentsByCadastral) // This is the CORRECT way — ePay returns docs in its own order, not ours if (downloadableDocs.length === 0) { for (const item of validItems) { await updateStatus(item.extractId, "failed", { epayStatus: finalStatus.status, errorMessage: "Nu s-au găsit documente PDF în comanda finalizată.", }); } return orderId; } for (let i = 0; i < validItems.length; i++) { const item = validItems[i]!; const nrCF = item.input.nrCF ?? item.input.nrCadastral; // Try CF-based matching first (correct for batch orders) let doc = finalStatus.documentsByCadastral.get(nrCF); // Also try nrCadastral if different from nrCF if (!doc && item.input.nrCadastral !== nrCF) { doc = finalStatus.documentsByCadastral.get(item.input.nrCadastral); } // Last resort: fall back to index matching if (!doc) { doc = downloadableDocs[i]; console.warn( `[epay-queue] Could not match by CF for ${item.input.nrCadastral}, using index ${i}`, ); } if (!doc) { await updateStatus(item.extractId, "failed", { epayStatus: finalStatus.status, errorMessage: `Document lipsă (${downloadableDocs.length} documente pentru ${validItems.length} parcele).`, }); continue; } try { await updateStatus(item.extractId, "downloading", { idDocument: doc.idDocument, documentName: doc.nume, documentDate: doc.dataDocument ? new Date(doc.dataDocument) : null, }); const pdfBuffer = await client.downloadDocument(doc.idDocument, 4); // Step 6: Store in MinIO const { path, index } = await storeCfExtract( pdfBuffer, item.input.nrCadastral, { "ancpi-order-id": orderId, "nr-cadastral": item.input.nrCadastral, judet: item.input.judetName, uat: item.input.uatName, "data-document": doc.dataDocument ?? "", stare: finalStatus.status, produs: "EXI_ONLINE", }, ); // Complete — require document date from ANCPI for accurate expiry if (!doc.dataDocument) { console.warn(`[epay-queue] Missing dataDocument for extract ${item.extractId}, using download date`); } const documentDate = doc.dataDocument ? new Date(doc.dataDocument) : new Date(); const expiresAt = new Date(documentDate); expiresAt.setDate(expiresAt.getDate() + 30); await updateStatus(item.extractId, "completed", { minioPath: path, minioIndex: index, epayStatus: finalStatus.status, completedAt: new Date(), documentDate, expiresAt, }); console.log( `[epay-queue] Completed: ${item.input.nrCadastral} → ${path}`, ); } catch (error) { const message = error instanceof Error ? error.message : "Eroare download/stocare"; await updateStatus(item.extractId, "failed", { errorMessage: message, }); } } // Update credits after successful order const newCredits = await client.getCredits(); updateEpayCredits(newCredits); console.log( `[epay-queue] Batch complete: orderId=${orderId}, credits remaining=${newCredits}`, ); return orderId; } catch (error) { const message = error instanceof Error ? error.message : "Eroare necunoscută"; console.error(`[epay-queue] Batch failed:`, message); for (const id of extractIds) { await updateStatus(id, "failed", { errorMessage: message }); } return null; } }