a6c03a091e
Moved from gov-agreg/src/pages/achizitii/* to root (drop prefix). - 22 pages migrated, 127 files total - All internal links: /achizitii/X → /X (176 occurrences fixed) - AchizitiiLayout subnav rewritten: /X paths, top-right link to vreaudigital.ro hub - BaseLayout new (vreau.digital branding, OG tags, site URL) - astro.config.mjs: site https://vreau.digital, server output (was static) - docker-compose: port 5096 (vreaudigital is 5095), container vreau-digital - deploy.sh: paths /opt/vreau-digital, log /var/log/vreau-digital-deploy.log Backend shared with gov-agreg: - PostgreSQL satra (same schemas: seap, firms, anaf, anre, ...) - Photon, Martin tiles - Infisical /vreaudigital path (DATABASE_URL etc. shared) build: PASS (npx astro check 0 errors, npm run build 5s vite + 10s server)
103 lines
3.5 KiB
Python
103 lines
3.5 KiB
Python
"""
|
|
In-memory mTLS certificate loader for SEAP WSP.
|
|
|
|
Decrypts the .p12 in memory using SEAP_CERT_KEY env var, builds an
|
|
ssl.SSLContext, and exposes it via a requests.HTTPAdapter. The private
|
|
key never touches disk.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import ssl
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.serialization import pkcs12
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.poolmanager import PoolManager
|
|
|
|
|
|
def load_p12_to_ssl_context(p12_path: str | Path, password: str) -> ssl.SSLContext:
|
|
"""Load PKCS#12 in memory and return an SSLContext with client cert + key.
|
|
|
|
The PEMs are written to a temp file briefly because OpenSSL/SSLContext
|
|
requires a file path. We use mkstemp on a tmpfs-backed dir when
|
|
available (/dev/shm on Linux) and unlink immediately after load —
|
|
so the data lives only as an ephemeral inode held by SSLContext.
|
|
"""
|
|
p12_data = Path(p12_path).read_bytes()
|
|
private_key, cert, additional = pkcs12.load_key_and_certificates(
|
|
p12_data, password.encode()
|
|
)
|
|
if private_key is None or cert is None:
|
|
raise RuntimeError('p12 missing private key or certificate')
|
|
|
|
key_pem = private_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption(),
|
|
)
|
|
cert_pem = cert.public_bytes(serialization.Encoding.PEM)
|
|
if additional:
|
|
for ca in additional:
|
|
cert_pem += ca.public_bytes(serialization.Encoding.PEM)
|
|
|
|
# Use /dev/shm if available (RAM-backed); fallback to /tmp
|
|
shm = Path('/dev/shm')
|
|
tmp_dir = str(shm) if shm.is_dir() and os.access(shm, os.W_OK) else None
|
|
|
|
fd, path = tempfile.mkstemp(prefix='wsp_chain_', suffix='.pem', dir=tmp_dir)
|
|
try:
|
|
os.write(fd, cert_pem + b'\n' + key_pem)
|
|
os.close(fd)
|
|
os.chmod(path, 0o600)
|
|
|
|
ctx = ssl.create_default_context()
|
|
ctx.load_cert_chain(certfile=path)
|
|
return ctx
|
|
finally:
|
|
# Unlink immediately — kernel keeps the file alive until SSLContext closes
|
|
try:
|
|
os.unlink(path)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
class _SSLContextAdapter(HTTPAdapter):
|
|
"""Requests adapter that uses a pre-built SSLContext (with client cert)."""
|
|
|
|
def __init__(self, ssl_context: ssl.SSLContext, **kwargs):
|
|
self._ssl_context = ssl_context
|
|
super().__init__(**kwargs)
|
|
|
|
def init_poolmanager(self, *args, **kwargs):
|
|
kwargs['ssl_context'] = self._ssl_context
|
|
return super().init_poolmanager(*args, **kwargs)
|
|
|
|
def proxy_manager_for(self, *args, **kwargs):
|
|
kwargs['ssl_context'] = self._ssl_context
|
|
return super().proxy_manager_for(*args, **kwargs)
|
|
|
|
|
|
def make_mtls_session(p12_path: str | Path, password: str) -> requests.Session:
|
|
"""Build a requests.Session with mTLS via in-memory cert."""
|
|
ctx = load_p12_to_ssl_context(p12_path, password)
|
|
session = requests.Session()
|
|
adapter = _SSLContextAdapter(ssl_context=ctx)
|
|
session.mount('https://', adapter)
|
|
return session
|
|
|
|
|
|
def make_mtls_session_from_env() -> requests.Session:
|
|
"""Convenience: read p12 path + password from env."""
|
|
p12 = os.environ.get(
|
|
'SEAP_P12_PATH',
|
|
str(Path(__file__).parent.parent / 'credentials' / '50076FB3826FADA540ACFB19.p12'),
|
|
)
|
|
pwd = os.environ.get('SEAP_CERT_KEY')
|
|
if not pwd:
|
|
raise RuntimeError('SEAP_CERT_KEY not in env')
|
|
return make_mtls_session(p12, pwd)
|