/** * Sync engine — downloads eTerra features and stores them in PostgreSQL. * * Supports incremental sync: compares remote OBJECTIDs with local DB, * only downloads new features, marks removed ones. */ import { Prisma, PrismaClient } from "@prisma/client"; import { EterraClient } from "./eterra-client"; import type { LayerConfig } from "./eterra-client"; import { esriToGeojson } from "./esri-geojson"; import { findLayerById, type LayerCatalogItem } from "./eterra-layers"; import { fetchUatGeometry } from "./uat-geometry"; import { setProgress, getProgress, clearProgress, type SyncProgress, } from "./progress-store"; const prisma = new PrismaClient(); const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)); export type SyncResult = { layerId: string; siruta: string; totalRemote: number; totalLocal: number; newFeatures: number; removedFeatures: number; status: "done" | "error"; error?: string; }; /** * Sync a single layer for a UAT into the local GIS database. * * 1. Count remote features * 2. Get local OBJECTIDs already stored * 3. Download only new OBJECTIDs (incremental) * 4. Mark removed ones (present local, absent remote) * 5. Store results + sync run metadata */ export async function syncLayer( username: string, password: string, siruta: string, layerId: string, options?: { uatName?: string; jobId?: string; forceFullSync?: boolean; /** When true, don't set terminal status (done/error) on progress store. * Used when syncLayer runs as a sub-step of a larger export flow. */ isSubStep?: boolean; /** Override the default HTTP timeout for the eTerra client (ms). */ timeoutMs?: number; }, ): Promise { const jobId = options?.jobId; const isSubStep = options?.isSubStep ?? false; const layer = findLayerById(layerId); if (!layer) throw new Error(`Layer ${layerId} not found`); const push = (partial: Partial) => { if (!jobId) return; if (isSubStep) { // When running as sub-step of a larger export, only write // phase/note/phaseCurrent/phaseTotal — preserve the parent's // downloaded/total weighted percentages. const existing = getProgress(jobId); setProgress({ jobId, downloaded: existing?.downloaded ?? 0, total: existing?.total, status: "running", ...existing, // Only overwrite informational fields phase: partial.phase ?? existing?.phase, note: partial.note, phaseCurrent: partial.downloaded, // map sync's downloaded → sub-detail phaseTotal: partial.total, // map sync's total → sub-detail message: partial.message, } as SyncProgress); return; } setProgress({ jobId, downloaded: 0, status: "running", ...partial, } as SyncProgress); }; // Create sync run record const syncRun = await prisma.gisSyncRun.create({ data: { siruta, uatName: options?.uatName, layerId, status: "running", }, }); try { push({ phase: "Conectare eTerra", downloaded: 0 }); const client = await EterraClient.create(username, password, { timeoutMs: options?.timeoutMs, }); // Get UAT geometry for spatial-filtered layers let uatGeometry; if (layer.spatialFilter) { push({ phase: "Obținere geometrie UAT" }); uatGeometry = await fetchUatGeometry(client, siruta); } // Count remote features push({ phase: "Numărare remote" }); let remoteCount: number; try { remoteCount = uatGeometry ? await client.countLayerByGeometry(layer, uatGeometry) : await client.countLayer(layer, siruta); } catch { remoteCount = 0; } push({ phase: "Verificare locală", total: remoteCount }); // Get local OBJECTIDs for this layer+siruta const localFeatures = await prisma.gisFeature.findMany({ where: { layerId, siruta }, select: { objectId: true }, }); const localObjIds = new Set(localFeatures.map((f) => f.objectId)); // Fetch all remote features push({ phase: "Descărcare features", downloaded: 0, total: remoteCount }); const allRemote = uatGeometry ? await client.fetchAllLayerByGeometry(layer, uatGeometry, { total: remoteCount > 0 ? remoteCount : undefined, onProgress: (dl, tot) => push({ phase: "Descărcare features", downloaded: dl, total: tot }), delayMs: 200, }) : await client.fetchAllLayerByWhere( layer, await buildWhere(client, layer, siruta), { total: remoteCount > 0 ? remoteCount : undefined, onProgress: (dl, tot) => push({ phase: "Descărcare features", downloaded: dl, total: tot, }), delayMs: 200, }, ); // Convert to GeoJSON for geometry storage const geojson = esriToGeojson(allRemote); const geojsonByObjId = new Map(); for (const f of geojson.features) { const objId = f.properties.OBJECTID as number | undefined; if (objId != null) geojsonByObjId.set(objId, f); } // Determine which OBJECTIDs are new const remoteObjIds = new Set(); for (const f of allRemote) { const objId = f.attributes.OBJECTID as number | undefined; if (objId != null) remoteObjIds.add(objId); } const newObjIds = options?.forceFullSync ? remoteObjIds : new Set([...remoteObjIds].filter((id) => !localObjIds.has(id))); const removedObjIds = [...localObjIds].filter( (id) => !remoteObjIds.has(id), ); push({ phase: "Salvare în baza de date", downloaded: 0, total: newObjIds.size, }); // Insert new features in batches let saved = 0; const BATCH_SIZE = 100; const newArray = [...newObjIds]; for (let i = 0; i < newArray.length; i += BATCH_SIZE) { const batch = newArray.slice(i, i + BATCH_SIZE); const creates = batch .map((objId) => { const feature = allRemote.find( (f) => (f.attributes.OBJECTID as number) === objId, ); if (!feature) return null; const geoFeature = geojsonByObjId.get(objId); const geom = geoFeature?.geometry; return { layerId, siruta, objectId: objId, inspireId: (feature.attributes.INSPIRE_ID as string | undefined) ?? null, cadastralRef: (feature.attributes.NATIONAL_CADASTRAL_REFERENCE as | string | undefined) ?? null, areaValue: typeof feature.attributes.AREA_VALUE === "number" ? feature.attributes.AREA_VALUE : null, isActive: feature.attributes.IS_ACTIVE !== 0, attributes: feature.attributes as Prisma.InputJsonValue, geometry: geom ? (geom as Prisma.InputJsonValue) : Prisma.JsonNull, syncRunId: syncRun.id, }; }) .filter(Boolean); // Use upsert to handle potential conflicts (force sync) for (const item of creates) { if (!item) continue; await prisma.gisFeature.upsert({ where: { layerId_objectId: { layerId: item.layerId, objectId: item.objectId, }, }, create: item, update: { ...item, updatedAt: new Date(), }, }); } saved += creates.length; push({ phase: "Salvare în baza de date", downloaded: saved, total: newObjIds.size, }); } // Populate native PostGIS geometry (safety net if trigger not installed) try { await prisma.$executeRaw` UPDATE "GisFeature" SET geom = ST_SetSRID(ST_GeomFromGeoJSON(geometry::text), 3844) WHERE "layerId" = ${layerId} AND siruta = ${siruta} AND geometry IS NOT NULL AND geom IS NULL`; } catch { // PostGIS not available yet — not critical, skip silently } // Mark removed features if (removedObjIds.length > 0) { push({ phase: "Marcare șterse" }); await prisma.gisFeature.deleteMany({ where: { layerId, siruta, objectId: { in: removedObjIds }, }, }); } // Update sync run const localCount = await prisma.gisFeature.count({ where: { layerId, siruta }, }); await prisma.gisSyncRun.update({ where: { id: syncRun.id }, data: { status: "done", totalRemote: remoteCount, totalLocal: localCount, newFeatures: newObjIds.size, removedFeatures: removedObjIds.length, completedAt: new Date(), }, }); push({ phase: "Sync finalizat", status: isSubStep ? "running" : "done", downloaded: remoteCount, total: remoteCount, }); if (jobId && !isSubStep) setTimeout(() => clearProgress(jobId), 60_000); return { layerId, siruta, totalRemote: remoteCount, totalLocal: localCount, newFeatures: newObjIds.size, removedFeatures: removedObjIds.length, status: "done", }; } catch (error) { const msg = error instanceof Error ? error.message : "Unknown error"; await prisma.gisSyncRun.update({ where: { id: syncRun.id }, data: { status: "error", errorMessage: msg, completedAt: new Date() }, }); push({ phase: "Eroare sync", status: isSubStep ? "running" : "error", message: msg, }); if (jobId && !isSubStep) setTimeout(() => clearProgress(jobId), 60_000); return { layerId, siruta, totalRemote: 0, totalLocal: 0, newFeatures: 0, removedFeatures: 0, status: "error", error: msg, }; } } /** Helper to build where clause outside the client */ async function buildWhere( client: EterraClient, layer: LayerConfig, siruta: string, ) { const fields = await client.getLayerFieldNames(layer); const preferred = [ "ADMIN_UNIT_ID", "SIRUTA", "UAT_ID", "SIRUTA_UAT", "UAT_SIRUTA", ]; const upper = fields.map((f) => f.toUpperCase()); let adminField: string | null = null; for (const key of preferred) { const idx = upper.indexOf(key); if (idx >= 0) { adminField = fields[idx] ?? null; break; } } if (!adminField) return "1=1"; if (!layer.whereTemplate) return `${adminField}=${siruta}`; const hasIsActive = fields.some((f) => f.toUpperCase() === "IS_ACTIVE"); if (layer.whereTemplate.includes("IS_ACTIVE") && !hasIsActive) return `${adminField}=${siruta}`; return layer.whereTemplate .replace(/\{\{adminField\}\}/g, adminField) .replace(/\{\{siruta\}\}/g, siruta); } /** * Get sync status for all layers for a given UAT. */ export async function getSyncStatus(siruta: string) { const runs = await prisma.gisSyncRun.findMany({ where: { siruta }, orderBy: { startedAt: "desc" }, }); const counts = await prisma.gisFeature.groupBy({ by: ["layerId"], where: { siruta }, _count: { id: true }, }); const countMap: Record = {}; for (const c of counts) { countMap[c.layerId] = c._count.id; } return { runs, localCounts: countMap }; }