""" Window-aware pagination + auto-split. Constraints discovered empirically: - PageSize is server-fixed at 100 items per response. - Server caps results at PageTotal == 1000 — when reached, the window truncated and we lose data. Auto-split window in halves until PageTotal < 1000. - Server silently returns PageTotal=0 for windows that are too wide (e.g. >7 days for SU_CaNotices) — also triggers auto-split. Yields parsed item dicts as they arrive; caller is responsible for batching DB writes. Cursor advancement (last successful publication_date) is tracked externally in seap.wsp_sync_state. """ from __future__ import annotations import importlib import logging from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Any, Iterator from lxml import etree from .client import WspClient from .operations import WspOp log = logging.getLogger(__name__) @dataclass class WindowResult: op_name: str window_start: datetime window_end: datetime items_imported: int = 0 pages_fetched: int = 0 page_total_first: int = 0 # PageTotal returned on page 1 sub_windows: list['WindowResult'] = field(default_factory=list) # if split errors: list[str] = field(default_factory=list) skipped: bool = False # set if window was outside cursor range @property def all_items_imported(self) -> int: return self.items_imported + sum(s.all_items_imported for s in self.sub_windows) def _load_parser(op: WspOp): """Dynamically import the parser for an operation.""" if not op.parser_module: return None try: mod = importlib.import_module(op.parser_module) return mod except ModuleNotFoundError: log.warning('Parser module %s not found — items will be skipped', op.parser_module) return None def _iter_items_xml(items_xml: bytes, op: WspOp): """Iterate item elements out of .""" if not items_xml: return try: items_el = etree.fromstring(items_xml) except etree.XMLSyntaxError as e: log.error('Cannot parse items XML for %s: %s', op.name, e) return for el in items_el.iter(): # Match the configured item tag (or fallback: any direct child) if op.item_xpath: qname = op.item_xpath.rsplit('}', 1)[-1] if etree.QName(el.tag).localname == qname: yield el else: # fallback: yield all children of Items if el.getparent() is items_el: yield el def fetch_window( client: WspClient, op: WspOp, window_start: datetime, window_end: datetime, extra_fields: dict | None = None, on_item: callable = None, ) -> WindowResult: """Fetch all pages for a single (start, end) window. Auto-splits if PageTotal == op.items_cap (server-side cap reached). Calls `on_item(parsed_dict, raw_xml_element)` for each item parsed. Returns WindowResult with stats; sub_windows populated if splits occurred. """ extra_fields = extra_fields or {} parser = _load_parser(op) result = WindowResult( op_name=op.name, window_start=window_start, window_end=window_end, ) # Build base fields fields_base = dict(extra_fields) if op.date_start_field: fields_base[op.date_start_field] = window_start.strftime('%Y-%m-%dT%H:%M:%S') if op.date_end_field: fields_base[op.date_end_field] = window_end.strftime('%Y-%m-%dT%H:%M:%S') # Page 1 fields = {**fields_base, 'PageIndex': 1} r = client.call(op, fields) result.pages_fetched = 1 result.page_total_first = r.page_total if r.status != 'Success': result.errors.append(f'page1: {r.status} — {r.description}') return result # Detect server cap and split if r.page_total >= op.items_cap and (window_end - window_start) > timedelta(hours=1): log.info(' %s: PageTotal=%d hit cap (>= %d) on %s → %s — splitting', op.name, r.page_total, op.items_cap, window_start, window_end) return _split_window(client, op, window_start, window_end, extra_fields, on_item) # Process page 1 items if r.items_xml and parser is not None: for el in _iter_items_xml(r.items_xml, op): try: parsed = parser.parse(el) if parsed and on_item: on_item(parsed, el) result.items_imported += 1 except Exception as e: log.exception('Parse error in %s: %s', op.name, e) result.errors.append(f'parse: {e}') # Remaining pages total_pages = r.num_pages for page in range(2, total_pages + 1): fields = {**fields_base, 'PageIndex': page} r2 = client.call(op, fields) result.pages_fetched += 1 if r2.status != 'Success': result.errors.append(f'page{page}: {r2.status} — {r2.description}') break if parser is None: continue for el in _iter_items_xml(r2.items_xml, op): try: parsed = parser.parse(el) if parsed and on_item: on_item(parsed, el) result.items_imported += 1 except Exception as e: log.exception('Parse error in %s page %d: %s', op.name, page, e) result.errors.append(f'parse p{page}: {e}') return result def _split_window(client: WspClient, op: WspOp, start: datetime, end: datetime, extra_fields: dict, on_item) -> WindowResult: """Recursively split window in half until each fits under cap.""" parent = WindowResult(op_name=op.name, window_start=start, window_end=end) span = end - start if span <= timedelta(hours=1): # Cannot split further — accept truncation log.warning('Cannot split below 1h for %s [%s, %s] — accepting cap', op.name, start, end) # Just fetch with single window and tolerate cap sub = fetch_window(client, op, start, end, extra_fields, on_item) parent.sub_windows.append(sub) return parent mid = start + span / 2 sub1 = fetch_window(client, op, start, mid, extra_fields, on_item) sub2 = fetch_window(client, op, mid, end, extra_fields, on_item) parent.sub_windows.extend([sub1, sub2]) return parent def split_into_windows(start: datetime, end: datetime, max_window_days: int) -> list[tuple[datetime, datetime]]: """Slice [start, end) into chunks of max_window_days each. Returns list of (window_start, window_end) tuples. """ if start >= end: return [] span = timedelta(days=max_window_days) windows = [] cur = start while cur < end: nxt = min(cur + span, end) windows.append((cur, nxt)) cur = nxt return windows