a6c03a091e
Moved from gov-agreg/src/pages/achizitii/* to root (drop prefix). - 22 pages migrated, 127 files total - All internal links: /achizitii/X → /X (176 occurrences fixed) - AchizitiiLayout subnav rewritten: /X paths, top-right link to vreaudigital.ro hub - BaseLayout new (vreau.digital branding, OG tags, site URL) - astro.config.mjs: site https://vreau.digital, server output (was static) - docker-compose: port 5096 (vreaudigital is 5095), container vreau-digital - deploy.sh: paths /opt/vreau-digital, log /var/log/vreau-digital-deploy.log Backend shared with gov-agreg: - PostgreSQL satra (same schemas: seap, firms, anaf, anre, ...) - Photon, Martin tiles - Infisical /vreaudigital path (DATABASE_URL etc. shared) build: PASS (npx astro check 0 errors, npm run build 5s vite + 10s server)
194 lines
7.0 KiB
Python
194 lines
7.0 KiB
Python
"""
|
|
Sync orchestrator — backfill (range, parallel) and incremental modes.
|
|
|
|
Backfill enqueues windows in seap.wsp_backfill_windows; workers claim
|
|
atomically (SKIP LOCKED), fetch, parse, write, mark complete. Resumable.
|
|
|
|
Incremental: reads cursor, fetches (cursor, now), updates cursor on success.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import threading
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional
|
|
|
|
from .client import WspClient, make_client_from_env
|
|
from . import db
|
|
from .operations import WspOp, ALL_OPS
|
|
from .pagination import fetch_window, split_into_windows
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def enqueue_backfill_windows(op: WspOp, start: datetime, end: datetime) -> int:
|
|
"""Pre-populate seap.wsp_backfill_windows for [start, end) using op's max window."""
|
|
windows = split_into_windows(start, end, op.max_window_days)
|
|
queued = 0
|
|
with db.connection() as conn:
|
|
cur = conn.cursor()
|
|
for ws, we in windows:
|
|
new_id = db.upsert_backfill_window(cur, op.name, ws, we)
|
|
if new_id:
|
|
queued += 1
|
|
log.info('Queued %d windows for %s [%s, %s)', queued, op.name, start, end)
|
|
return queued
|
|
|
|
|
|
def process_one_window(client: WspClient, op: WspOp, ws: datetime, we: datetime,
|
|
win_id: int | None = None) -> dict:
|
|
"""Fetch + parse + write one window. Returns stats."""
|
|
rows: list[dict] = []
|
|
sink = db.SINK_FUNCS[op.sink]
|
|
|
|
def collect(parsed, _el):
|
|
if parsed:
|
|
rows.append(parsed)
|
|
|
|
result = fetch_window(client, op, ws, we, on_item=collect)
|
|
|
|
items_imported = 0
|
|
error_msg = None
|
|
try:
|
|
with db.connection() as conn:
|
|
cur = conn.cursor()
|
|
items_imported = sink(cur, rows)
|
|
except Exception as e:
|
|
error_msg = str(e)[:500]
|
|
log.exception('DB write failed for %s [%s, %s)', op.name, ws, we)
|
|
|
|
if win_id is not None:
|
|
try:
|
|
with db.connection() as conn:
|
|
cur = conn.cursor()
|
|
db.complete_backfill_window(
|
|
cur, win_id, items_imported, result.page_total_first,
|
|
error=error_msg,
|
|
)
|
|
except Exception:
|
|
log.exception('Failed to mark window %d complete', win_id)
|
|
|
|
return {
|
|
'window_start': ws,
|
|
'window_end': we,
|
|
'items_imported': items_imported,
|
|
'page_total': result.page_total_first,
|
|
'pages_fetched': result.pages_fetched,
|
|
'errors': result.errors,
|
|
'sub_windows': len(result.sub_windows),
|
|
'error': error_msg,
|
|
}
|
|
|
|
|
|
def run_backfill(op_name: str, start: datetime, end: datetime,
|
|
workers: int = 3, enqueue: bool = True,
|
|
endpoint: str | None = None) -> dict:
|
|
"""Backfill an operation across [start, end) with parallel workers.
|
|
|
|
Each worker holds its own WspClient (own SSL session) for thread safety.
|
|
"""
|
|
op = ALL_OPS[op_name]
|
|
if enqueue:
|
|
enqueue_backfill_windows(op, start, end)
|
|
|
|
log.info('Starting backfill: op=%s workers=%d range=[%s, %s)',
|
|
op_name, workers, start, end)
|
|
|
|
stats = {'op': op_name, 'windows_processed': 0, 'items_imported': 0,
|
|
'errors': []}
|
|
stop_event = threading.Event()
|
|
|
|
def worker_loop(worker_id: int):
|
|
client = make_client_from_env(endpoint or 'https://e-licitatie.ro:8883/Pub')
|
|
local_count = 0
|
|
while not stop_event.is_set():
|
|
with db.connection() as conn:
|
|
cur = conn.cursor()
|
|
claimed = db.claim_backfill_window(cur, op_name)
|
|
if not claimed:
|
|
log.info('Worker %d: no more pending windows, exiting', worker_id)
|
|
return local_count
|
|
ws, we, win_id = claimed['window_start'], claimed['window_end'], claimed['id']
|
|
log.info(' W%d: processing [%s → %s) (attempt %d)',
|
|
worker_id, ws, we, claimed['attempts'])
|
|
try:
|
|
r = process_one_window(client, op, ws, we, win_id=win_id)
|
|
local_count += r['items_imported']
|
|
log.info(' W%d: done [%s → %s) — items=%d page_total=%d',
|
|
worker_id, ws, we, r['items_imported'], r['page_total'])
|
|
except Exception as e:
|
|
log.exception('Worker %d failed on window %s-%s', worker_id, ws, we)
|
|
with db.connection() as conn:
|
|
cur = conn.cursor()
|
|
db.complete_backfill_window(cur, win_id, 0, 0, error=str(e)[:500])
|
|
return local_count
|
|
|
|
with ThreadPoolExecutor(max_workers=workers) as pool:
|
|
futures = [pool.submit(worker_loop, i) for i in range(workers)]
|
|
for f in as_completed(futures):
|
|
try:
|
|
count = f.result()
|
|
stats['items_imported'] += count
|
|
except Exception as e:
|
|
stats['errors'].append(str(e))
|
|
|
|
log.info('Backfill done: op=%s items=%d', op_name, stats['items_imported'])
|
|
return stats
|
|
|
|
|
|
def run_incremental(op_name: str, lookback_hours: int = 36,
|
|
endpoint: str | None = None) -> dict:
|
|
"""Incremental sync: from (cursor or now-lookback) to now.
|
|
|
|
Cursor advances to window_end on success.
|
|
"""
|
|
op = ALL_OPS[op_name]
|
|
client = make_client_from_env(endpoint or 'https://e-licitatie.ro:8883/Pub')
|
|
|
|
# Use tz-aware UTC so we can compare with cursor (TIMESTAMPTZ from Postgres).
|
|
end = datetime.now(timezone.utc)
|
|
with db.connection() as conn:
|
|
cur = conn.cursor()
|
|
state = db.get_cursor(cur, op_name) or {}
|
|
cursor = state.get('last_cursor_date')
|
|
if cursor is None:
|
|
start = end - timedelta(hours=lookback_hours)
|
|
else:
|
|
# overlap by 1 hour to catch late-publishing
|
|
# Ensure cursor is tz-aware (assume UTC for any naive value from DB).
|
|
if cursor.tzinfo is None:
|
|
cursor = cursor.replace(tzinfo=timezone.utc)
|
|
start = cursor - timedelta(hours=1)
|
|
|
|
windows = split_into_windows(start, end, op.max_window_days)
|
|
log.info('Incremental %s: %d windows from %s to %s',
|
|
op_name, len(windows), start, end)
|
|
|
|
total_imported = 0
|
|
last_error = None
|
|
last_window_end = cursor
|
|
for ws, we in windows:
|
|
try:
|
|
r = process_one_window(client, op, ws, we)
|
|
total_imported += r['items_imported']
|
|
if not r.get('error'):
|
|
last_window_end = we
|
|
except Exception as e:
|
|
last_error = str(e)[:500]
|
|
log.exception('Incremental window failed: %s [%s, %s)', op_name, ws, we)
|
|
|
|
with db.connection() as conn:
|
|
cur = conn.cursor()
|
|
db.update_sync_state(
|
|
cur, op_name,
|
|
cursor_date=last_window_end,
|
|
window_start=start,
|
|
window_end=end,
|
|
items_added=total_imported,
|
|
error=last_error,
|
|
)
|
|
|
|
return {'op': op_name, 'items_imported': total_imported,
|
|
'cursor_advanced_to': last_window_end, 'error': last_error}
|