Files
Claude VM a6c03a091e initial: split from gov-agreg — vreau.digital standalone platform
Moved from gov-agreg/src/pages/achizitii/* to root (drop prefix).
- 22 pages migrated, 127 files total
- All internal links: /achizitii/X → /X (176 occurrences fixed)
- AchizitiiLayout subnav rewritten: /X paths, top-right link to vreaudigital.ro hub
- BaseLayout new (vreau.digital branding, OG tags, site URL)
- astro.config.mjs: site https://vreau.digital, server output (was static)
- docker-compose: port 5096 (vreaudigital is 5095), container vreau-digital
- deploy.sh: paths /opt/vreau-digital, log /var/log/vreau-digital-deploy.log

Backend shared with gov-agreg:
- PostgreSQL satra (same schemas: seap, firms, anaf, anre, ...)
- Photon, Martin tiles
- Infisical /vreaudigital path (DATABASE_URL etc. shared)

build: PASS (npx astro check 0 errors, npm run build 5s vite + 10s server)
2026-05-13 00:10:32 +03:00

337 lines
11 KiB
Python

"""DB sinks — idempotent UPSERT for each parsed row type."""
from __future__ import annotations
import json
import logging
import os
from contextlib import contextmanager
from datetime import datetime
from decimal import Decimal
from typing import Any, Iterable
import psycopg2
from psycopg2.extras import Json, execute_values
log = logging.getLogger(__name__)
def get_db_url() -> str:
url = os.environ.get('DATABASE_URL')
if not url:
raise RuntimeError('DATABASE_URL not set in env')
return url
def connect():
return psycopg2.connect(get_db_url())
@contextmanager
def connection():
conn = connect()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def _json_safe(obj):
"""Convert Decimal/datetime/date to JSON-friendly types recursively."""
if isinstance(obj, Decimal):
return str(obj)
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, dict):
return {k: _json_safe(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_json_safe(v) for v in obj]
return obj
# ── seap.announcements (public feeds) ──
ANNOUNCEMENTS_COLS = [
'type', 'ref_number',
'authority_name', 'authority_cui', 'authority_address',
'authority_email', 'authority_phone', 'authority_url',
'authority_type', 'authority_main_activity', 'authority_entity_id',
'title', 'cpv_code', 'contract_type',
'publication_date', 'estimated_value', 'awarded_value', 'currency',
'supplier_name', 'supplier_cui', 'supplier_address', 'supplier_is_sme',
'procedure_type', 'procedure_state', 'legislation',
'has_lots', 'contract_has_lots', 'lots_count',
'joue', 'county_code', 'notice_state', 'notice_state_id',
'framework_agreement', 'notice_id_internal',
'deadline_submission', 'opening_date', 'duration_months',
'documents', 'award_criteria', 'lots', 'details',
'seap_url', 'source',
]
_JSONB_COLS = {'documents', 'award_criteria', 'lots', 'details'}
def insert_announcements(cur, rows: Iterable[dict]) -> int:
"""Bulk upsert into seap.announcements. Returns count inserted/updated."""
rows = list(rows)
if not rows:
return 0
values = []
for r in rows:
values.append(tuple(
Json(_json_safe(r.get(c))) if c in _JSONB_COLS and r.get(c) is not None
else r.get(c)
for c in ANNOUNCEMENTS_COLS
))
cols_sql = ', '.join(ANNOUNCEMENTS_COLS)
update_cols = [c for c in ANNOUNCEMENTS_COLS if c not in ('type', 'ref_number')]
update_sql = ', '.join(
f'{c} = COALESCE(EXCLUDED.{c}, seap.announcements.{c})' if c not in _JSONB_COLS
else f'{c} = EXCLUDED.{c}'
for c in update_cols
)
sql = f"""
INSERT INTO seap.announcements ({cols_sql})
VALUES %s
ON CONFLICT (type, ref_number) DO UPDATE SET
{update_sql},
enriched_at = now()
"""
execute_values(cur, sql, values, page_size=100)
return len(rows)
# ── Beletage tables ──
BELETAGE_CONTRACTS_COLS = [
'contract_id', 'contract_no', 'contract_title', 'contract_type',
'contract_phase', 'contract_state',
'awarding_date', 'contract_date', 'publication_date',
'duration_months', 'contract_value', 'default_currency_value', 'currency',
'ca_notice_id', 'ca_notice_no', 'authority_name', 'authority_cui',
'is_current_version', 'is_rejected', 'version_no', 'version_date',
'justification', 'additional_information', 'details',
]
def insert_beletage_contracts(cur, rows):
rows = list(rows)
if not rows:
return 0
values = [
tuple(
Json(_json_safe(r.get(c))) if c == 'details' and r.get(c) is not None
else r.get(c)
for c in BELETAGE_CONTRACTS_COLS
)
for r in rows
]
cols_sql = ', '.join(BELETAGE_CONTRACTS_COLS)
update_cols = [c for c in BELETAGE_CONTRACTS_COLS if c != 'contract_id']
update_sql = ', '.join(f'{c} = EXCLUDED.{c}' for c in update_cols)
sql = f"""
INSERT INTO seap.beletage_contracts ({cols_sql})
VALUES %s
ON CONFLICT (contract_id) DO UPDATE SET
{update_sql},
enriched_at = now()
"""
execute_values(cur, sql, values, page_size=100)
return len(rows)
BELETAGE_INVOICES_COLS = [
'invoice_id', 'invoice_no', 'invoice_date', 'due_date',
'contract_id', 'contract_no', 'authority_name', 'authority_cui',
'total_value', 'total_value_no_vat', 'vat_value', 'currency',
'state', 'paid_value', 'paid_at', 'details',
]
def insert_beletage_invoices(cur, rows):
rows = list(rows)
if not rows:
return 0
values = [
tuple(
Json(_json_safe(r.get(c))) if c == 'details' and r.get(c) is not None
else r.get(c)
for c in BELETAGE_INVOICES_COLS
)
for r in rows
]
cols_sql = ', '.join(BELETAGE_INVOICES_COLS)
update_cols = [c for c in BELETAGE_INVOICES_COLS if c != 'invoice_id']
update_sql = ', '.join(f'{c} = EXCLUDED.{c}' for c in update_cols)
sql = f"""
INSERT INTO seap.beletage_invoices ({cols_sql})
VALUES %s
ON CONFLICT (invoice_id) DO UPDATE SET
{update_sql}
"""
execute_values(cur, sql, values, page_size=100)
return len(rows)
BELETAGE_DA_COLS = [
'da_id', 'da_name', 'unique_identification_code',
'cpv_code', 'cpv_name', 'contract_type',
'publication_date', 'finalization_date',
'estimated_value', 'closing_value', 'currency',
'da_state', 'authority_id', 'authority_name', 'authority_cui',
'details',
]
def insert_beletage_da(cur, rows):
rows = list(rows)
if not rows:
return 0
values = [
tuple(
Json(_json_safe(r.get(c))) if c == 'details' and r.get(c) is not None
else r.get(c)
for c in BELETAGE_DA_COLS
)
for r in rows
]
cols_sql = ', '.join(BELETAGE_DA_COLS)
update_cols = [c for c in BELETAGE_DA_COLS if c != 'da_id']
update_sql = ', '.join(f'{c} = EXCLUDED.{c}' for c in update_cols)
sql = f"""
INSERT INTO seap.beletage_direct_acquisitions ({cols_sql})
VALUES %s
ON CONFLICT (da_id) DO UPDATE SET
{update_sql}
"""
execute_values(cur, sql, values, page_size=100)
return len(rows)
BELETAGE_CATALOG_COLS = ['item_code', 'item_name', 'cpv_code', 'unit_price',
'currency', 'last_updated', 'details']
def insert_beletage_catalog(cur, rows):
rows = list(rows)
if not rows:
return 0
values = [
tuple(
Json(_json_safe(r.get(c))) if c == 'details' and r.get(c) is not None
else r.get(c)
for c in BELETAGE_CATALOG_COLS
)
for r in rows
]
cols_sql = ', '.join(BELETAGE_CATALOG_COLS)
update_cols = [c for c in BELETAGE_CATALOG_COLS if c != 'item_code']
update_sql = ', '.join(f'{c} = EXCLUDED.{c}' for c in update_cols)
sql = f"""
INSERT INTO seap.beletage_catalog ({cols_sql})
VALUES %s
ON CONFLICT (item_code) DO UPDATE SET {update_sql}
"""
execute_values(cur, sql, values, page_size=100)
return len(rows)
# Sink dispatch
SINK_FUNCS = {
'announcements': insert_announcements,
'beletage_contracts': insert_beletage_contracts,
'beletage_invoices': insert_beletage_invoices,
'beletage_da': insert_beletage_da,
'beletage_catalog': insert_beletage_catalog,
}
# ── Sync state ──
def get_cursor(cur, feed: str) -> dict | None:
cur.execute('SELECT last_cursor_date, items_imported_total, consecutive_errors '
'FROM seap.wsp_sync_state WHERE feed = %s', (feed,))
row = cur.fetchone()
if not row:
return None
return {
'last_cursor_date': row[0],
'items_imported_total': row[1],
'consecutive_errors': row[2],
}
def update_sync_state(cur, feed: str, *, cursor_date=None, window_start=None,
window_end=None, items_added=0, error: str | None = None):
cur.execute("""
INSERT INTO seap.wsp_sync_state (feed, last_run_at, last_cursor_date,
last_window_start, last_window_end, items_imported_total,
items_imported_24h, consecutive_errors, last_error, last_error_at)
VALUES (%s, now(), %s, %s, %s, %s, %s,
CASE WHEN %s IS NULL THEN 0 ELSE 1 END,
%s,
CASE WHEN %s IS NULL THEN NULL ELSE now() END)
ON CONFLICT (feed) DO UPDATE SET
last_run_at = now(),
last_cursor_date = COALESCE(EXCLUDED.last_cursor_date, seap.wsp_sync_state.last_cursor_date),
last_window_start = EXCLUDED.last_window_start,
last_window_end = EXCLUDED.last_window_end,
items_imported_total = seap.wsp_sync_state.items_imported_total + %s,
items_imported_24h = %s,
consecutive_errors = CASE WHEN %s IS NULL THEN 0
ELSE seap.wsp_sync_state.consecutive_errors + 1 END,
last_error = EXCLUDED.last_error,
last_error_at = CASE WHEN %s IS NULL THEN seap.wsp_sync_state.last_error_at
ELSE now() END
""", (feed, cursor_date, window_start, window_end, items_added, items_added,
error, error, error,
items_added, items_added, error, error))
def upsert_backfill_window(cur, feed: str, ws, we, county=None) -> int:
cur.execute("""
INSERT INTO seap.wsp_backfill_windows (feed, window_start, window_end, county_code)
VALUES (%s, %s, %s, %s)
ON CONFLICT (feed, window_start, window_end, county_code) DO NOTHING
RETURNING id
""", (feed, ws, we, county))
row = cur.fetchone()
return row[0] if row else 0
def claim_backfill_window(cur, feed: str) -> dict | None:
"""Atomically claim oldest pending window for processing."""
cur.execute("""
UPDATE seap.wsp_backfill_windows
SET state = 'in_progress', started_at = now(),
attempts = attempts + 1
WHERE id = (
SELECT id FROM seap.wsp_backfill_windows
WHERE feed = %s AND state IN ('pending', 'failed') AND attempts < 5
ORDER BY window_start
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, window_start, window_end, county_code, attempts
""", (feed,))
row = cur.fetchone()
if not row:
return None
return {
'id': row[0], 'window_start': row[1], 'window_end': row[2],
'county_code': row[3], 'attempts': row[4],
}
def complete_backfill_window(cur, win_id: int, items: int, page_total: int,
error: str | None = None):
state = 'failed' if error else 'completed'
cur.execute("""
UPDATE seap.wsp_backfill_windows
SET state = %s, items_imported = %s, page_total = %s,
completed_at = now(), last_error = %s
WHERE id = %s
""", (state, items, page_total, error, win_id))