""" In-memory mTLS certificate loader for SEAP WSP. Decrypts the .p12 in memory using SEAP_CERT_KEY env var, builds an ssl.SSLContext, and exposes it via a requests.HTTPAdapter. The private key never touches disk. """ from __future__ import annotations import os import ssl import tempfile from pathlib import Path import requests from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.serialization import pkcs12 from requests.adapters import HTTPAdapter from urllib3.poolmanager import PoolManager def load_p12_to_ssl_context(p12_path: str | Path, password: str) -> ssl.SSLContext: """Load PKCS#12 in memory and return an SSLContext with client cert + key. The PEMs are written to a temp file briefly because OpenSSL/SSLContext requires a file path. We use mkstemp on a tmpfs-backed dir when available (/dev/shm on Linux) and unlink immediately after load — so the data lives only as an ephemeral inode held by SSLContext. """ p12_data = Path(p12_path).read_bytes() private_key, cert, additional = pkcs12.load_key_and_certificates( p12_data, password.encode() ) if private_key is None or cert is None: raise RuntimeError('p12 missing private key or certificate') key_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption(), ) cert_pem = cert.public_bytes(serialization.Encoding.PEM) if additional: for ca in additional: cert_pem += ca.public_bytes(serialization.Encoding.PEM) # Use /dev/shm if available (RAM-backed); fallback to /tmp shm = Path('/dev/shm') tmp_dir = str(shm) if shm.is_dir() and os.access(shm, os.W_OK) else None fd, path = tempfile.mkstemp(prefix='wsp_chain_', suffix='.pem', dir=tmp_dir) try: os.write(fd, cert_pem + b'\n' + key_pem) os.close(fd) os.chmod(path, 0o600) ctx = ssl.create_default_context() ctx.load_cert_chain(certfile=path) return ctx finally: # Unlink immediately — kernel keeps the file alive until SSLContext closes try: os.unlink(path) except OSError: pass class _SSLContextAdapter(HTTPAdapter): """Requests adapter that uses a pre-built SSLContext (with client cert).""" def __init__(self, ssl_context: ssl.SSLContext, **kwargs): self._ssl_context = ssl_context super().__init__(**kwargs) def init_poolmanager(self, *args, **kwargs): kwargs['ssl_context'] = self._ssl_context return super().init_poolmanager(*args, **kwargs) def proxy_manager_for(self, *args, **kwargs): kwargs['ssl_context'] = self._ssl_context return super().proxy_manager_for(*args, **kwargs) def make_mtls_session(p12_path: str | Path, password: str) -> requests.Session: """Build a requests.Session with mTLS via in-memory cert.""" ctx = load_p12_to_ssl_context(p12_path, password) session = requests.Session() adapter = _SSLContextAdapter(ssl_context=ctx) session.mount('https://', adapter) return session def make_mtls_session_from_env() -> requests.Session: """Convenience: read p12 path + password from env.""" p12 = os.environ.get( 'SEAP_P12_PATH', str(Path(__file__).parent.parent / 'credentials' / '50076FB3826FADA540ACFB19.p12'), ) pwd = os.environ.get('SEAP_CERT_KEY') if not pwd: raise RuntimeError('SEAP_CERT_KEY not in env') return make_mtls_session(p12, pwd)