"""DB sinks — idempotent UPSERT for each parsed row type.""" from __future__ import annotations import json import logging import os from contextlib import contextmanager from datetime import datetime from decimal import Decimal from typing import Any, Iterable import psycopg2 from psycopg2.extras import Json, execute_values log = logging.getLogger(__name__) def get_db_url() -> str: url = os.environ.get('DATABASE_URL') if not url: raise RuntimeError('DATABASE_URL not set in env') return url def connect(): return psycopg2.connect(get_db_url()) @contextmanager def connection(): conn = connect() try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() def _json_safe(obj): """Convert Decimal/datetime/date to JSON-friendly types recursively.""" if isinstance(obj, Decimal): return str(obj) if isinstance(obj, datetime): return obj.isoformat() if isinstance(obj, dict): return {k: _json_safe(v) for k, v in obj.items()} if isinstance(obj, list): return [_json_safe(v) for v in obj] return obj # ── seap.announcements (public feeds) ── ANNOUNCEMENTS_COLS = [ 'type', 'ref_number', 'authority_name', 'authority_cui', 'authority_address', 'authority_email', 'authority_phone', 'authority_url', 'authority_type', 'authority_main_activity', 'authority_entity_id', 'title', 'cpv_code', 'contract_type', 'publication_date', 'estimated_value', 'awarded_value', 'currency', 'supplier_name', 'supplier_cui', 'supplier_address', 'supplier_is_sme', 'procedure_type', 'procedure_state', 'legislation', 'has_lots', 'contract_has_lots', 'lots_count', 'joue', 'county_code', 'notice_state', 'notice_state_id', 'framework_agreement', 'notice_id_internal', 'deadline_submission', 'opening_date', 'duration_months', 'documents', 'award_criteria', 'lots', 'details', 'seap_url', 'source', ] _JSONB_COLS = {'documents', 'award_criteria', 'lots', 'details'} def insert_announcements(cur, rows: Iterable[dict]) -> int: """Bulk upsert into seap.announcements. Returns count inserted/updated.""" rows = list(rows) if not rows: return 0 values = [] for r in rows: values.append(tuple( Json(_json_safe(r.get(c))) if c in _JSONB_COLS and r.get(c) is not None else r.get(c) for c in ANNOUNCEMENTS_COLS )) cols_sql = ', '.join(ANNOUNCEMENTS_COLS) update_cols = [c for c in ANNOUNCEMENTS_COLS if c not in ('type', 'ref_number')] update_sql = ', '.join( f'{c} = COALESCE(EXCLUDED.{c}, seap.announcements.{c})' if c not in _JSONB_COLS else f'{c} = EXCLUDED.{c}' for c in update_cols ) sql = f""" INSERT INTO seap.announcements ({cols_sql}) VALUES %s ON CONFLICT (type, ref_number) DO UPDATE SET {update_sql}, enriched_at = now() """ execute_values(cur, sql, values, page_size=100) return len(rows) # ── Beletage tables ── BELETAGE_CONTRACTS_COLS = [ 'contract_id', 'contract_no', 'contract_title', 'contract_type', 'contract_phase', 'contract_state', 'awarding_date', 'contract_date', 'publication_date', 'duration_months', 'contract_value', 'default_currency_value', 'currency', 'ca_notice_id', 'ca_notice_no', 'authority_name', 'authority_cui', 'is_current_version', 'is_rejected', 'version_no', 'version_date', 'justification', 'additional_information', 'details', ] def insert_beletage_contracts(cur, rows): rows = list(rows) if not rows: return 0 values = [ tuple( Json(_json_safe(r.get(c))) if c == 'details' and r.get(c) is not None else r.get(c) for c in BELETAGE_CONTRACTS_COLS ) for r in rows ] cols_sql = ', '.join(BELETAGE_CONTRACTS_COLS) update_cols = [c for c in BELETAGE_CONTRACTS_COLS if c != 'contract_id'] update_sql = ', '.join(f'{c} = EXCLUDED.{c}' for c in update_cols) sql = f""" INSERT INTO seap.beletage_contracts ({cols_sql}) VALUES %s ON CONFLICT (contract_id) DO UPDATE SET {update_sql}, enriched_at = now() """ execute_values(cur, sql, values, page_size=100) return len(rows) BELETAGE_INVOICES_COLS = [ 'invoice_id', 'invoice_no', 'invoice_date', 'due_date', 'contract_id', 'contract_no', 'authority_name', 'authority_cui', 'total_value', 'total_value_no_vat', 'vat_value', 'currency', 'state', 'paid_value', 'paid_at', 'details', ] def insert_beletage_invoices(cur, rows): rows = list(rows) if not rows: return 0 values = [ tuple( Json(_json_safe(r.get(c))) if c == 'details' and r.get(c) is not None else r.get(c) for c in BELETAGE_INVOICES_COLS ) for r in rows ] cols_sql = ', '.join(BELETAGE_INVOICES_COLS) update_cols = [c for c in BELETAGE_INVOICES_COLS if c != 'invoice_id'] update_sql = ', '.join(f'{c} = EXCLUDED.{c}' for c in update_cols) sql = f""" INSERT INTO seap.beletage_invoices ({cols_sql}) VALUES %s ON CONFLICT (invoice_id) DO UPDATE SET {update_sql} """ execute_values(cur, sql, values, page_size=100) return len(rows) BELETAGE_DA_COLS = [ 'da_id', 'da_name', 'unique_identification_code', 'cpv_code', 'cpv_name', 'contract_type', 'publication_date', 'finalization_date', 'estimated_value', 'closing_value', 'currency', 'da_state', 'authority_id', 'authority_name', 'authority_cui', 'details', ] def insert_beletage_da(cur, rows): rows = list(rows) if not rows: return 0 values = [ tuple( Json(_json_safe(r.get(c))) if c == 'details' and r.get(c) is not None else r.get(c) for c in BELETAGE_DA_COLS ) for r in rows ] cols_sql = ', '.join(BELETAGE_DA_COLS) update_cols = [c for c in BELETAGE_DA_COLS if c != 'da_id'] update_sql = ', '.join(f'{c} = EXCLUDED.{c}' for c in update_cols) sql = f""" INSERT INTO seap.beletage_direct_acquisitions ({cols_sql}) VALUES %s ON CONFLICT (da_id) DO UPDATE SET {update_sql} """ execute_values(cur, sql, values, page_size=100) return len(rows) BELETAGE_CATALOG_COLS = ['item_code', 'item_name', 'cpv_code', 'unit_price', 'currency', 'last_updated', 'details'] def insert_beletage_catalog(cur, rows): rows = list(rows) if not rows: return 0 values = [ tuple( Json(_json_safe(r.get(c))) if c == 'details' and r.get(c) is not None else r.get(c) for c in BELETAGE_CATALOG_COLS ) for r in rows ] cols_sql = ', '.join(BELETAGE_CATALOG_COLS) update_cols = [c for c in BELETAGE_CATALOG_COLS if c != 'item_code'] update_sql = ', '.join(f'{c} = EXCLUDED.{c}' for c in update_cols) sql = f""" INSERT INTO seap.beletage_catalog ({cols_sql}) VALUES %s ON CONFLICT (item_code) DO UPDATE SET {update_sql} """ execute_values(cur, sql, values, page_size=100) return len(rows) # Sink dispatch SINK_FUNCS = { 'announcements': insert_announcements, 'beletage_contracts': insert_beletage_contracts, 'beletage_invoices': insert_beletage_invoices, 'beletage_da': insert_beletage_da, 'beletage_catalog': insert_beletage_catalog, } # ── Sync state ── def get_cursor(cur, feed: str) -> dict | None: cur.execute('SELECT last_cursor_date, items_imported_total, consecutive_errors ' 'FROM seap.wsp_sync_state WHERE feed = %s', (feed,)) row = cur.fetchone() if not row: return None return { 'last_cursor_date': row[0], 'items_imported_total': row[1], 'consecutive_errors': row[2], } def update_sync_state(cur, feed: str, *, cursor_date=None, window_start=None, window_end=None, items_added=0, error: str | None = None): cur.execute(""" INSERT INTO seap.wsp_sync_state (feed, last_run_at, last_cursor_date, last_window_start, last_window_end, items_imported_total, items_imported_24h, consecutive_errors, last_error, last_error_at) VALUES (%s, now(), %s, %s, %s, %s, %s, CASE WHEN %s IS NULL THEN 0 ELSE 1 END, %s, CASE WHEN %s IS NULL THEN NULL ELSE now() END) ON CONFLICT (feed) DO UPDATE SET last_run_at = now(), last_cursor_date = COALESCE(EXCLUDED.last_cursor_date, seap.wsp_sync_state.last_cursor_date), last_window_start = EXCLUDED.last_window_start, last_window_end = EXCLUDED.last_window_end, items_imported_total = seap.wsp_sync_state.items_imported_total + %s, items_imported_24h = %s, consecutive_errors = CASE WHEN %s IS NULL THEN 0 ELSE seap.wsp_sync_state.consecutive_errors + 1 END, last_error = EXCLUDED.last_error, last_error_at = CASE WHEN %s IS NULL THEN seap.wsp_sync_state.last_error_at ELSE now() END """, (feed, cursor_date, window_start, window_end, items_added, items_added, error, error, error, items_added, items_added, error, error)) def upsert_backfill_window(cur, feed: str, ws, we, county=None) -> int: cur.execute(""" INSERT INTO seap.wsp_backfill_windows (feed, window_start, window_end, county_code) VALUES (%s, %s, %s, %s) ON CONFLICT (feed, window_start, window_end, county_code) DO NOTHING RETURNING id """, (feed, ws, we, county)) row = cur.fetchone() return row[0] if row else 0 def claim_backfill_window(cur, feed: str) -> dict | None: """Atomically claim oldest pending window for processing.""" cur.execute(""" UPDATE seap.wsp_backfill_windows SET state = 'in_progress', started_at = now(), attempts = attempts + 1 WHERE id = ( SELECT id FROM seap.wsp_backfill_windows WHERE feed = %s AND state IN ('pending', 'failed') AND attempts < 5 ORDER BY window_start LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING id, window_start, window_end, county_code, attempts """, (feed,)) row = cur.fetchone() if not row: return None return { 'id': row[0], 'window_start': row[1], 'window_end': row[2], 'county_code': row[3], 'attempts': row[4], } def complete_backfill_window(cur, win_id: int, items: int, page_total: int, error: str | None = None): state = 'failed' if error else 'completed' cur.execute(""" UPDATE seap.wsp_backfill_windows SET state = %s, items_imported = %s, page_total = %s, completed_at = now(), last_error = %s WHERE id = %s """, (state, items, page_total, error, win_id))