""" Sync orchestrator — backfill (range, parallel) and incremental modes. Backfill enqueues windows in seap.wsp_backfill_windows; workers claim atomically (SKIP LOCKED), fetch, parse, write, mark complete. Resumable. Incremental: reads cursor, fetches (cursor, now), updates cursor on success. """ from __future__ import annotations import logging import threading from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta, timezone from typing import Optional from .client import WspClient, make_client_from_env from . import db from .operations import WspOp, ALL_OPS from .pagination import fetch_window, split_into_windows log = logging.getLogger(__name__) def enqueue_backfill_windows(op: WspOp, start: datetime, end: datetime) -> int: """Pre-populate seap.wsp_backfill_windows for [start, end) using op's max window.""" windows = split_into_windows(start, end, op.max_window_days) queued = 0 with db.connection() as conn: cur = conn.cursor() for ws, we in windows: new_id = db.upsert_backfill_window(cur, op.name, ws, we) if new_id: queued += 1 log.info('Queued %d windows for %s [%s, %s)', queued, op.name, start, end) return queued def process_one_window(client: WspClient, op: WspOp, ws: datetime, we: datetime, win_id: int | None = None) -> dict: """Fetch + parse + write one window. Returns stats.""" rows: list[dict] = [] sink = db.SINK_FUNCS[op.sink] def collect(parsed, _el): if parsed: rows.append(parsed) result = fetch_window(client, op, ws, we, on_item=collect) items_imported = 0 error_msg = None try: with db.connection() as conn: cur = conn.cursor() items_imported = sink(cur, rows) except Exception as e: error_msg = str(e)[:500] log.exception('DB write failed for %s [%s, %s)', op.name, ws, we) if win_id is not None: try: with db.connection() as conn: cur = conn.cursor() db.complete_backfill_window( cur, win_id, items_imported, result.page_total_first, error=error_msg, ) except Exception: log.exception('Failed to mark window %d complete', win_id) return { 'window_start': ws, 'window_end': we, 'items_imported': items_imported, 'page_total': result.page_total_first, 'pages_fetched': result.pages_fetched, 'errors': result.errors, 'sub_windows': len(result.sub_windows), 'error': error_msg, } def run_backfill(op_name: str, start: datetime, end: datetime, workers: int = 3, enqueue: bool = True, endpoint: str | None = None) -> dict: """Backfill an operation across [start, end) with parallel workers. Each worker holds its own WspClient (own SSL session) for thread safety. """ op = ALL_OPS[op_name] if enqueue: enqueue_backfill_windows(op, start, end) log.info('Starting backfill: op=%s workers=%d range=[%s, %s)', op_name, workers, start, end) stats = {'op': op_name, 'windows_processed': 0, 'items_imported': 0, 'errors': []} stop_event = threading.Event() def worker_loop(worker_id: int): client = make_client_from_env(endpoint or 'https://e-licitatie.ro:8883/Pub') local_count = 0 while not stop_event.is_set(): with db.connection() as conn: cur = conn.cursor() claimed = db.claim_backfill_window(cur, op_name) if not claimed: log.info('Worker %d: no more pending windows, exiting', worker_id) return local_count ws, we, win_id = claimed['window_start'], claimed['window_end'], claimed['id'] log.info(' W%d: processing [%s → %s) (attempt %d)', worker_id, ws, we, claimed['attempts']) try: r = process_one_window(client, op, ws, we, win_id=win_id) local_count += r['items_imported'] log.info(' W%d: done [%s → %s) — items=%d page_total=%d', worker_id, ws, we, r['items_imported'], r['page_total']) except Exception as e: log.exception('Worker %d failed on window %s-%s', worker_id, ws, we) with db.connection() as conn: cur = conn.cursor() db.complete_backfill_window(cur, win_id, 0, 0, error=str(e)[:500]) return local_count with ThreadPoolExecutor(max_workers=workers) as pool: futures = [pool.submit(worker_loop, i) for i in range(workers)] for f in as_completed(futures): try: count = f.result() stats['items_imported'] += count except Exception as e: stats['errors'].append(str(e)) log.info('Backfill done: op=%s items=%d', op_name, stats['items_imported']) return stats def run_incremental(op_name: str, lookback_hours: int = 36, endpoint: str | None = None) -> dict: """Incremental sync: from (cursor or now-lookback) to now. Cursor advances to window_end on success. """ op = ALL_OPS[op_name] client = make_client_from_env(endpoint or 'https://e-licitatie.ro:8883/Pub') # Use tz-aware UTC so we can compare with cursor (TIMESTAMPTZ from Postgres). end = datetime.now(timezone.utc) with db.connection() as conn: cur = conn.cursor() state = db.get_cursor(cur, op_name) or {} cursor = state.get('last_cursor_date') if cursor is None: start = end - timedelta(hours=lookback_hours) else: # overlap by 1 hour to catch late-publishing # Ensure cursor is tz-aware (assume UTC for any naive value from DB). if cursor.tzinfo is None: cursor = cursor.replace(tzinfo=timezone.utc) start = cursor - timedelta(hours=1) windows = split_into_windows(start, end, op.max_window_days) log.info('Incremental %s: %d windows from %s to %s', op_name, len(windows), start, end) total_imported = 0 last_error = None last_window_end = cursor for ws, we in windows: try: r = process_one_window(client, op, ws, we) total_imported += r['items_imported'] if not r.get('error'): last_window_end = we except Exception as e: last_error = str(e)[:500] log.exception('Incremental window failed: %s [%s, %s)', op_name, ws, we) with db.connection() as conn: cur = conn.cursor() db.update_sync_state( cur, op_name, cursor_date=last_window_end, window_start=start, window_end=end, items_added=total_imported, error=last_error, ) return {'op': op_name, 'items_imported': total_imported, 'cursor_advanced_to': last_window_end, 'error': last_error}