BEGIN_REPO: apexai-submanager (v2.1.11) — FULL CHAT DUMP (NO DIFFS, NO PARTIALS)
REPO_TREE: apexai-submanager/ ├── LICENSE ├── README.md ├── API.md ├── SECURITY.md ├── OPS_RUNBOOK.md ├── CHANGELOG.md ├── pyproject.toml ├── ruff.toml ├── mypy.ini ├── pytest.ini ├── .gitignore ├── .dockerignore ├── .env.example ├── Dockerfile ├── docker-compose.yml ├── migrations/ │ └── 0001_init.sql ├── admin_ui/ │ ├── index.html │ └── app.js ├── src/ │ └── app/ │ ├── init.py │ ├── main.py │ ├── settings.py │ ├── errors.py │ ├── schemas.py │ ├── middleware.py │ ├── logging.py │ ├── db.py │ ├── models.py │ ├── infra/ │ │ ├── init.py │ │ ├── redis.py │ │ ├── resilience.py │ │ ├── rate_limit.py │ │ └── locks.py │ ├── security/ │ │ ├── init.py │ │ ├── compare.py │ │ ├── admin.py │ │ ├── webhook.py │ │ └── webhook_sig.py │ ├── services/ │ │ ├── init.py │ │ ├── plans.py │ │ └── subscriptions.py │ └── routes/ │ ├── init.py │ ├── health.py │ ├── admin.py │ └── webhooks.py └── tests/ ├── init.py ├── conftest.py ├── unit/ │ ├── init.py │ ├── test_admin_sessions.py │ ├── test_circuit_breaker.py │ └── test_trusted_proxy_ip.py └── security/ ├── init.py ├── test_lockout_mechanism.py ├── test_rate_limit_effectiveness.py └── test_replay_protection.py
BEGIN_FILE: LICENSE
Copyright (c) 2026 ApexAI Official
All rights reserved.
This software and associated documentation files (the "Software") are proprietary and confidential.
Unauthorized copying, distribution, modification, sublicensing, hosting, or deployment of this Software,
via any medium, is strictly prohibited without an explicit written agreement from ApexAI Official.
NO LICENSE IS GRANTED except by a separate commercial agreement.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.
END_FILE
BEGIN_FILE: README.md
# ApexAI SubManager
Production-grade subscription management service for Telegram-native SaaS bots.
## What it does
- Accepts payment confirmation webhooks (provider allowlist + signature verification + replay protection)
- Rate limits webhook intake (per provider + per client IP, IP hashed in Redis keys)
- Grants subscriptions idempotently and atomically (single DB transaction)
- Provides an Admin Ops Plane with:
- server-side sessions (opaque cookie SID, session stored in Redis)
- CSRF protection
- lockout + progressive delay for admin auth attempts
- Health probes: `/health/live`, `/health/ready`
- Strict request body size limit (default 256 KiB)
## Quick start (local)
1. Copy env:
```bash
cp .env.example .env
2. Fill required secrets in .env:
ADMIN_TOKEN
ADMIN_TOKEN_FINGERPRINT_SECRET
WEBHOOK_SIGNATURE_SECRETS
3. Run:
docker compose up --build
4. Admin UI:
open admin_ui/index.html in your browser
set API_BASE if not running on http://localhost:8080
Security model (high level)
Admin:
login uses ADMIN_TOKEN
session cookie: __Host-admin_session (opaque SID)
CSRF cookie: __Host-csrf_token (256-bit token)
Webhooks:
provider allowlist
signature verification supports rotation via comma-separated WEBHOOK_SIGNATURE_SECRETS
timestamp skew bound
nonce replay protection (Redis)
Notes on proxies (IMPORTANT)
If you deploy behind reverse proxies/load balancers and want per-IP rate limiting to use X-Forwarded-For, configure trusted proxy networks + hops:
TRUSTED_PROXY_NETS (comma-separated CIDRs)
TRUSTED_PROXY_HOPS (int)
If the immediate peer IP is not in TRUSTED_PROXY_NETS, X-Forwarded-For is ignored.
License
Proprietary — see LICENSE.
END_FILE
BEGIN_FILE: API.md
```md
# API
Base URL: `/v1`
## Health
- `GET /health/live`
- `GET /health/ready`
## Webhooks
- `POST /v1/webhooks/{provider}`
Headers (example):
- `X-Apex-Signature: v1=<hex>`
- `X-Apex-Timestamp: <unix seconds>`
- `X-Apex-Nonce: <random>`
## Admin
- `POST /v1/admin/login`
- `POST /v1/admin/logout`
- `GET /v1/admin/stats`
All admin endpoints require:
- session cookie `__Host-admin_session`
- CSRF header `X-CSRF-Token` matching `__Host-csrf_token` cookie
END_FILE
BEGIN_FILE: SECURITY.md
# Security
## Admin authentication
- Uses a server-side session stored in Redis.
- Browser receives an opaque session id cookie only.
- Progressive delay + lockout on repeated failures.
## Cookies
- Session cookie: `__Host-admin_session` (Secure, Path=/, HttpOnly)
- CSRF cookie: `__Host-csrf_token` (Secure, Path=/)
## Webhook security
- Provider allowlist (`ALLOWED_PROVIDERS`)
- Signature verification supports rotation (`WEBHOOK_SIGNATURE_SECRETS`)
- Timestamp skew bound (`WEBHOOK_TS_SKEW_SEC`)
- Nonce replay protection (`WEBHOOK_NONCE_TTL_SEC`)
- Rate limiting: provider + client IP (IP hashed; XFF honored only for trusted proxies)
## Headers
Security headers middleware sets:
- X-Content-Type-Options: nosniff
- X-Frame-Options: DENY
- Content-Security-Policy: default-src 'none'; frame-ancestors 'none'
- Strict-Transport-Security (prod)
- Permissions-Policy: geolocation=(), microphone=(), camera=()
END_FILE
BEGIN_FILE: OPS_RUNBOOK.md
# Ops Runbook
## Environment variables
See `.env.example`.
## Redis key prefixing
All keys are namespaced via `REDIS_KEY_PREFIX`.
## Secret rotation (webhooks)
Set `WEBHOOK_SIGNATURE_SECRETS` to `old_secret,new_secret`, deploy, then remove old.
## Troubleshooting
- 401 on admin endpoints:
- session expired or missing
- CSRF token missing/mismatch
- 429 on webhooks:
- rate limiting active; verify trusted proxy settings if behind LB
- 409 replay:
- nonce already used within TTL
END_FILE
BEGIN_FILE: CHANGELOG.md
# Changelog
## 2.1.11
- Fix: trusted proxy validation for client IP extraction (prevents X-Forwarded-For spoof rate-limit bypass)
- Fix: CSRF cookie now uses __Host- prefix with Secure + Path=/
- Fix: Body size middleware returns structured 413 envelope
- Fix: Lock release failures now logged (warning)
- Improvement: Security headers add HSTS (prod) + Permissions-Policy
- Admin UI: display API error.reason + request_id when available
- DB: plans.duration_days supported (defaults to 30)
END_FILE
BEGIN_FILE: pyproject.toml
[project]
name = "apexai-submanager"
version = "2.1.11"
description = "ApexAI SubManager - production-grade FastAPI subscription manager"
requires-python = "==3.12.*"
dependencies = [
"fastapi==0.115.6",
"uvicorn==0.34.0",
"pydantic==2.10.4",
"pydantic-settings==2.7.0",
"sqlalchemy==2.0.36",
"asyncpg==0.30.0",
"redis==5.2.1",
]
[project.optional-dependencies]
dev = [
"pytest==8.3.4",
"pytest-asyncio==0.25.0",
"pytest-cov==6.0.0",
"mypy==1.13.0",
"ruff==0.8.4",
"httpx==0.28.1",
]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
[tool.coverage.run]
branch = true
source = ["src"]
[tool.ruff]
line-length = 110
target-version = "py312"
[tool.mypy]
python_version = "3.12"
strict = true
mypy_path = ["src"]
warn_unused_ignores = true
END_FILE
BEGIN_FILE: ruff.toml
line-length = 110
target-version = "py312"
[lint]
select = ["E", "F", "I", "B", "UP", "SIM"]
ignore = ["E501"]
END_FILE
BEGIN_FILE: mypy.ini
[mypy]
python_version = 3.12
strict = True
mypy_path = src
warn_unused_ignores = True
END_FILE
BEGIN_FILE: pytest.ini
[pytest]
asyncio_mode = auto
END_FILE
BEGIN_FILE: .gitignore
__pycache__/
*.pyc
.venv/
.env
.coverage
htmlcov/
dist/
build/
END_FILE
BEGIN_FILE: .dockerignore
.git/
__pycache__/
*.pyc
.venv/
.env
.coverage
htmlcov/
dist/
build/
tests/
END_FILE
BEGIN_FILE: .env.example
APP_ENV=local
PORT=8080
LOG_LEVEL=INFO
DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/postgres
REDIS_URL=redis://redis:6379/0
REDIS_KEY_PREFIX=apexai:submanager:
ADMIN_TOKEN=CHANGE_ME_32CHARS_MIN________________________________
ADMIN_TOKEN_FINGERPRINT_SECRET=CHANGE_ME_32CHARS_MIN_________________________
WEBHOOK_SIGNATURE_SECRETS=CHANGE_ME_32CHARS_MIN________________________________
COOKIE_SECURE=false
COOKIE_SAMESITE=lax
ADMIN_SESSION_TTL_SEC=3600
ALLOWED_PROVIDERS=stripe
WEBHOOK_TS_SKEW_SEC=300
WEBHOOK_NONCE_TTL_SEC=900
WEBHOOK_RL_RATE=30
WEBHOOK_RL_BURST=60
DB_OP_TIMEOUT_SEC=5.0
DB_CONCURRENCY=20
DB_SEM_TIMEOUT_SEC=2.0
MAX_BODY_BYTES=262144
# Trusted proxy configuration (for X-Forwarded-For)
TRUSTED_PROXY_NETS=127.0.0.1/32,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16
TRUSTED_PROXY_HOPS=1
END_FILE
BEGIN_FILE: Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY pyproject.toml /app/pyproject.toml
RUN pip install --no-cache-dir -U pip && \
pip install --no-cache-dir -e ".[dev]"
COPY . /app
ENV PYTHONUNBUFFERED=1
EXPOSE 8080
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"]
END_FILE
BEGIN_FILE: docker-compose.yml
services:
db:
image: postgres:16
environment:
POSTGRES_PASSWORD: postgres
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 3s
retries: 20
redis:
image: redis:7
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 3s
retries: 20
api:
build: .
env_file: .env
ports:
- "8080:8080"
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
END_FILE
BEGIN_FILE: migrations/0001_init.sql
-- 0001_init.sql (idempotent init)
CREATE TABLE IF NOT EXISTS plans (
id BIGSERIAL PRIMARY KEY,
provider TEXT NOT NULL,
provider_price_id TEXT NOT NULL,
currency TEXT NOT NULL,
amount_cents BIGINT NOT NULL,
duration_days INT NOT NULL DEFAULT 30,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE UNIQUE INDEX IF NOT EXISTS ux_plans_provider_price_id ON plans(provider, provider_price_id);
CREATE TABLE IF NOT EXISTS payments (
id BIGSERIAL PRIMARY KEY,
provider TEXT NOT NULL,
provider_event_id TEXT NOT NULL,
tg_user_id BIGINT NOT NULL,
amount_cents BIGINT NOT NULL,
currency TEXT NOT NULL,
status TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE UNIQUE INDEX IF NOT EXISTS ux_payments_provider_event_id ON payments(provider, provider_event_id);
CREATE INDEX IF NOT EXISTS ix_payments_created_at ON payments(created_at DESC);
CREATE TABLE IF NOT EXISTS subscriptions (
id BIGSERIAL PRIMARY KEY,
tg_user_id BIGINT NOT NULL,
plan_id BIGINT NOT NULL REFERENCES plans(id),
status TEXT NOT NULL,
starts_at TIMESTAMPTZ NOT NULL,
ends_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS ix_subscriptions_tg_user_id_id ON subscriptions(tg_user_id, id DESC);
CREATE TABLE IF NOT EXISTS audit_events (
id BIGSERIAL PRIMARY KEY,
event_type TEXT NOT NULL,
tg_user_id BIGINT NULL,
details_json TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
END_FILE
BEGIN_FILE: admin_ui/index.html
<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<title>ApexAI SubManager Admin</title>
<meta name="viewport" content="width=device-width, initial-scale=1" />
<style>
body { font-family: system-ui, -apple-system, Segoe UI, Roboto, Arial; margin: 24px; max-width: 900px; }
.row { display:flex; gap: 12px; align-items:center; margin-bottom: 12px; }
input { padding: 8px; width: 360px; }
button { padding: 8px 12px; cursor: pointer; }
pre { background: #111; color: #eee; padding: 12px; overflow:auto; }
.err { color: #b00020; }
.ok { color: #0a7a0a; }
</style>
</head>
<body>
<h1>ApexAI SubManager Admin</h1>
<div class="row">
<label>API Base</label>
<input id="apiBase" value="http://localhost:8080" />
</div>
<h2>Login</h2>
<div class="row">
<label>Admin Token</label>
<input id="token" type="password" placeholder="ADMIN_TOKEN" />
<button id="loginBtn">Login</button>
<button id="logoutBtn">Logout</button>
</div>
<h2>Stats</h2>
<div class="row">
<button id="statsBtn">Fetch Stats</button>
</div>
<h2>Output</h2>
<div id="status"></div>
<pre id="out"></pre>
<script src="./app.js"></script>
</body>
</html>
END_FILE
BEGIN_FILE: admin_ui/app.js
function setStatus(msg, ok) {
const el = document.getElementById("status");
el.className = ok ? "ok" : "err";
el.textContent = msg;
}
function getCookie(name) {
const parts = document.cookie.split(";").map((p) => p.trim());
for (const p of parts) {
if (p.startsWith(name + "=")) return decodeURIComponent(p.slice((name + "=").length));
}
return null;
}
async function apiFetch(path, opts = {}) {
const apiBase = document.getElementById("apiBase").value.replace(/\/+$/, "");
const url = apiBase + path;
const csrf = getCookie("__Host-csrf_token");
const headers = Object.assign({}, opts.headers || {});
if (csrf) headers["X-CSRF-Token"] = csrf;
headers["Content-Type"] = "application/json";
const res = await fetch(url, Object.assign({}, opts, { headers, credentials: "include" }));
const text = await res.text();
let data = null;
try {
data = text ? JSON.parse(text) : null;
} catch (_) {
data = null;
}
if (!res.ok) {
const reason = (data && data.error && data.error.reason) ? data.error.reason : (text || res.statusText);
const rid = (data && data.error && data.error.request_id) ? data.error.request_id : "";
const extra = rid ? ` (request_id=${rid})` : "";
throw new Error(`${reason}${extra}`);
}
return data;
}
document.getElementById("loginBtn").addEventListener("click", async () => {
try {
const token = document.getElementById("token").value;
const data = await apiFetch("/v1/admin/login", {
method: "POST",
body: JSON.stringify({ token }),
});
document.getElementById("out").textContent = JSON.stringify(data, null, 2);
setStatus("Logged in", true);
} catch (e) {
setStatus(String(e.message || e), false);
}
});
document.getElementById("logoutBtn").addEventListener("click", async () => {
try {
const data = await apiFetch("/v1/admin/logout", { method: "POST", body: "{}" });
document.getElementById("out").textContent = JSON.stringify(data, null, 2);
setStatus("Logged out", true);
} catch (e) {
setStatus(String(e.message || e), false);
}
});
document.getElementById("statsBtn").addEventListener("click", async () => {
try {
const data = await apiFetch("/v1/admin/stats", { method: "GET" });
document.getElementById("out").textContent = JSON.stringify(data, null, 2);
setStatus("OK", true);
} catch (e) {
setStatus(String(e.message || e), false);
}
});
END_FILE
BEGIN_FILE: src/app/init.py
"""
ApexAI SubManager application package.
This package provides a production-grade FastAPI service that:
- validates and processes payment confirmation webhooks
- grants subscriptions idempotently and atomically
- exposes an admin operations plane protected by server-side sessions + CSRF
- enforces rate limits, replay protection, and resilience patterns
All non-health routes are versioned under /v1.
"""
from __future__ import annotations
import sys
__version__ = "2.1.11"
if sys.version_info < (3, 12):
raise RuntimeError("Python 3.12+ required")
END_FILE
BEGIN_FILE: src/app/logging.py
from __future__ import annotations
import logging
from typing import Any
def get_logger(name: str = "app") -> logging.Logger:
"""Return a configured logger.
Note: production deployments should centralize handler config externally.
"""
logger = logging.getLogger(name)
if logger.handlers:
return logger
handler = logging.StreamHandler()
fmt = logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s")
handler.setFormatter(fmt)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def log_kv(logger: logging.Logger, msg: str, **fields: Any) -> None:
"""Log a message with key=value fields appended."""
if fields:
suffix = " " + " ".join(f"{k}={fields[k]!r}" for k in sorted(fields.keys()))
else:
suffix = ""
logger.info(msg + suffix)
END_FILE
BEGIN_FILE: src/app/settings.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Literal
from pydantic import Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Application settings loaded from environment variables.
Security-critical values have no defaults and must be provided.
"""
model_config = SettingsConfigDict(env_file=".env", extra="ignore")
APP_ENV: Literal["local", "prod", "staging"] = "local"
PORT: int = Field(default=8080, ge=1, le=65535)
LOG_LEVEL: str = "INFO"
DATABASE_URL: str = Field(min_length=10)
DB_OP_TIMEOUT_SEC: float = Field(default=5.0, ge=0.5, le=60.0)
REDIS_URL: str = Field(min_length=10)
REDIS_KEY_PREFIX: str = Field(default="apexai:submanager:", min_length=1, max_length=128)
# No defaults for secrets
ADMIN_TOKEN: str = Field(min_length=32)
ADMIN_TOKEN_FINGERPRINT_SECRET: str = Field(min_length=32)
WEBHOOK_SIGNATURE_SECRETS: str = Field(min_length=32) # comma-separated
COOKIE_SECURE: bool = True
COOKIE_SAMESITE: Literal["lax", "strict", "none"] = "lax"
ADMIN_SESSION_TTL_SEC: int = Field(default=3600, ge=60, le=86400)
ADMIN_MAX_FAILS: int = Field(default=5, ge=1, le=50)
ADMIN_LOCKOUT_SEC: int = Field(default=900, ge=60, le=86400)
ADMIN_RL_RATE: int = Field(default=5, ge=1, le=1000)
ADMIN_RL_BURST: int = Field(default=10, ge=1, le=5000)
ADMIN_PROGRESSIVE_DELAY_BASE_MS: int = Field(default=150, ge=0, le=5000)
ALLOWED_PROVIDERS: str = Field(default="stripe")
WEBHOOK_TS_SKEW_SEC: int = Field(default=300, ge=5, le=3600)
WEBHOOK_NONCE_TTL_SEC: int = Field(default=900, ge=60, le=86400)
WEBHOOK_RL_RATE: int = Field(default=30, ge=1, le=10000)
WEBHOOK_RL_BURST: int = Field(default=60, ge=1, le=20000)
# If behind proxies, configure trusted proxy nets + hops for X-Forwarded-For parsing.
TRUSTED_PROXY_NETS: str = Field(default="", description="comma-separated CIDRs; if empty, XFF ignored")
TRUSTED_PROXY_HOPS: int = Field(default=0, ge=0, le=16, description="how many proxy hops append to XFF")
DB_CONCURRENCY: int = Field(default=20, ge=1, le=200)
DB_SEM_TIMEOUT_SEC: float = Field(default=2.0, ge=0.1, le=30.0)
MAX_BODY_BYTES: int = Field(default=262144, ge=1024, le=5_000_000)
@field_validator("WEBHOOK_SIGNATURE_SECRETS")
@classmethod
def _strip_ws(cls, v: str) -> str:
parts = [p.strip() for p in v.split(",") if p.strip()]
if not parts:
raise ValueError("WEBHOOK_SIGNATURE_SECRETS must contain at least one secret")
for p in parts:
if len(p) < 32:
raise ValueError("each webhook secret must be at least 32 chars")
return ",".join(parts)
@field_validator("ALLOWED_PROVIDERS")
@classmethod
def _providers_non_empty(cls, v: str) -> str:
parts = [p.strip() for p in v.split(",") if p.strip()]
if not parts:
raise ValueError("ALLOWED_PROVIDERS must be non-empty")
return ",".join(parts)
@field_validator("COOKIE_SAMESITE")
@classmethod
def _cookie_samesite_ok(cls, v: str) -> str:
if v not in {"lax", "strict", "none"}:
raise ValueError("COOKIE_SAMESITE must be lax|strict|none")
return v
@dataclass(frozen=True)
class ProviderAllowlist:
providers: set[str]
@classmethod
def from_settings(cls, s: Settings) -> "ProviderAllowlist":
return cls(providers={p.strip().lower() for p in s.ALLOWED_PROVIDERS.split(",") if p.strip()})
END_FILE
BEGIN_FILE: src/app/errors.py
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class ApiError(Exception):
"""Base API error with stable code/reason mapping."""
status_code: int
code: str
reason: str
class BadRequestError(ApiError):
def __init__(self, code: str, reason: str) -> None:
super().__init__(400, code, reason)
class UnauthorizedError(ApiError):
def __init__(self, code: str, reason: str) -> None:
super().__init__(401, code, reason)
class ForbiddenError(ApiError):
def __init__(self, code: str, reason: str) -> None:
super().__init__(403, code, reason)
class NotFoundError(ApiError):
def __init__(self, code: str, reason: str) -> None:
super().__init__(404, code, reason)
class ConflictError(ApiError):
def __init__(self, code: str, reason: str) -> None:
super().__init__(409, code, reason)
class PayloadTooLargeError(ApiError):
def __init__(self, code: str, reason: str) -> None:
super().__init__(413, code, reason)
class ServiceUnavailableError(ApiError):
def __init__(self, code: str, reason: str) -> None:
super().__init__(503, code, reason)
END_FILE
BEGIN_FILE: src/app/schemas.py
from __future__ import annotations
from typing import Any, Generic, Literal, Optional, TypeVar
from pydantic import BaseModel, Field
from app import __version__
T = TypeVar("T")
class ErrorOut(BaseModel):
code: str
reason: str
request_id: str
class ErrorResponse(BaseModel):
ok: Literal[False] = False
error: ErrorOut
class OkResponse(BaseModel, Generic[T]):
ok: Literal[True] = True
api_version: str = Field(default=__version__)
result: Optional[T] = None
class StatsOut(BaseModel):
total_payments: int
confirmed_payments: int
failed_payments: int
active_subscriptions: int
as_of: str
class LoginIn(BaseModel):
token: str = Field(min_length=1)
class LoginOut(BaseModel):
message: str
class LogoutOut(BaseModel):
message: str
class WebhookOkOut(BaseModel):
message: str
schema_version: int = 1
def error_envelope(code: str, reason: str, request_id: str) -> dict[str, Any]:
return {"ok": False, "error": {"code": code, "reason": reason, "request_id": request_id}}
END_FILE
BEGIN_FILE: src/app/middleware.py
from __future__ import annotations
import secrets
from datetime import datetime, timezone
from typing import Callable, Optional
from fastapi import Request, Response
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from app.errors import PayloadTooLargeError
from app.schemas import error_envelope
from app.settings import Settings
class RequestIdMiddleware(BaseHTTPMiddleware):
"""Attach a stable request_id to each request for tracing and error envelopes."""
async def dispatch(self, request: Request, call_next: Callable) -> Response:
rid = request.headers.get("X-Request-Id") or secrets.token_hex(16)
request.state.request_id = rid
resp = await call_next(request)
resp.headers["X-Request-Id"] = rid
return resp
class BodySizeLimitMiddleware(BaseHTTPMiddleware):
"""Reject requests whose bodies exceed settings.MAX_BODY_BYTES.
IMPORTANT: Exceptions raised in middleware may bypass app-level exception handlers,
so this middleware returns a structured JSON envelope directly.
"""
def __init__(self, app: object, settings: Settings) -> None:
super().__init__(app)
self._max = settings.MAX_BODY_BYTES
async def dispatch(self, request: Request, call_next: Callable) -> Response:
try:
cl = request.headers.get("content-length")
if cl is not None:
try:
n = int(cl)
except ValueError:
n = 0
if n > self._max:
raise PayloadTooLargeError("payload_too_large", "request payload too large")
body = await request.body()
if len(body) > self._max:
raise PayloadTooLargeError("payload_too_large", "request payload too large")
request._body = body # type: ignore[attr-defined]
return await call_next(request)
except PayloadTooLargeError as e:
rid = getattr(request.state, "request_id", None) or secrets.token_hex(16)
payload = error_envelope(e.code, e.reason, rid)
return JSONResponse(status_code=e.status_code, content=payload)
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""Add baseline security headers."""
def __init__(self, app: object, settings: Settings) -> None:
super().__init__(app)
self._env = settings.APP_ENV
async def dispatch(self, request: Request, call_next: Callable) -> Response:
resp = await call_next(request)
resp.headers["X-Content-Type-Options"] = "nosniff"
resp.headers["X-Frame-Options"] = "DENY"
resp.headers["Content-Security-Policy"] = "default-src 'none'; frame-ancestors 'none'"
# Only meaningful with TLS (prod/staging expected)
if self._env in {"prod", "staging"}:
resp.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
resp.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"
return resp
def utc_now_iso() -> str:
"""Return current UTC time in ISO8601 format."""
return datetime.now(timezone.utc).isoformat()
END_FILE
BEGIN_FILE: src/app/db.py
from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
from app.settings import Settings
_engine: AsyncEngine | None = None
_session_factory: async_sessionmaker[AsyncSession] | None = None
def init_engine(settings: Settings) -> None:
global _engine, _session_factory
if _engine is not None:
return
_engine = create_async_engine(
settings.DATABASE_URL,
pool_pre_ping=True,
pool_recycle=1800,
pool_size=20,
max_overflow=30,
)
_session_factory = async_sessionmaker(_engine, expire_on_commit=False)
def get_engine() -> AsyncEngine:
if _engine is None:
raise RuntimeError("DB engine not initialized")
return _engine
def get_session_factory() -> async_sessionmaker[AsyncSession]:
if _session_factory is None:
raise RuntimeError("DB session factory not initialized")
return _session_factory
async def close_engine() -> None:
global _engine
if _engine is not None:
await _engine.dispose()
_engine = None
END_FILE
BEGIN_FILE: src/app/models.py
from __future__ import annotations
from datetime import datetime
from typing import Optional
from sqlalchemy import BigInteger, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class Plan(Base):
__tablename__ = "plans"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
provider: Mapped[str] = mapped_column(String, nullable=False)
provider_price_id: Mapped[str] = mapped_column(String, nullable=False)
currency: Mapped[str] = mapped_column(String, nullable=False)
amount_cents: Mapped[int] = mapped_column(BigInteger, nullable=False)
duration_days: Mapped[int] = mapped_column(Integer, nullable=False, default=30)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
class Payment(Base):
__tablename__ = "payments"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
provider: Mapped[str] = mapped_column(String, nullable=False)
provider_event_id: Mapped[str] = mapped_column(String, nullable=False)
tg_user_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
amount_cents: Mapped[int] = mapped_column(BigInteger, nullable=False)
currency: Mapped[str] = mapped_column(String, nullable=False)
status: Mapped[str] = mapped_column(String, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
class Subscription(Base):
__tablename__ = "subscriptions"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
tg_user_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
plan_id: Mapped[int] = mapped_column(BigInteger, ForeignKey("plans.id"), nullable=False)
status: Mapped[str] = mapped_column(String, nullable=False)
starts_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
ends_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
class AuditEvent(Base):
__tablename__ = "audit_events"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
event_type: Mapped[str] = mapped_column(String, nullable=False)
tg_user_id: Mapped[Optional[int]] = mapped_column(BigInteger, nullable=True)
details_json: Mapped[str] = mapped_column(Text, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
END_FILE
BEGIN_FILE: src/app/infra/init.py
from __future__ import annotations
END_FILE
BEGIN_FILE: src/app/infra/redis.py
from __future__ import annotations
from redis.asyncio import Redis
from app.settings import Settings
_client: Redis | None = None
def init_redis(settings: Settings) -> None:
global _client
if _client is not None:
return
_client = Redis.from_url(settings.REDIS_URL, decode_responses=False)
def get_redis() -> Redis:
if _client is None:
raise RuntimeError("Redis not initialized")
return _client
async def close_redis_pool() -> None:
global _client
if _client is not None:
await _client.aclose()
_client = None
END_FILE
BEGIN_FILE: src/app/infra/resilience.py
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import TypeVar
from sqlalchemy.exc import OperationalError
T = TypeVar("T")
async def with_db_retry(fn: Callable[[], Awaitable[T]], attempts: int = 3, base_delay: float = 0.2) -> T:
"""Retry transient database failures with exponential backoff."""
last: Exception | None = None
for i in range(attempts):
try:
return await fn()
except (OperationalError, ConnectionError, TimeoutError) as e:
last = e
if i == attempts - 1:
break
await asyncio.sleep(base_delay * (2**i))
assert last is not None
raise last
@dataclass
class CircuitBreaker:
"""Simple async-safe circuit breaker."""
failure_threshold: int = 5
recovery_timeout: float = 30.0
def __post_init__(self) -> None:
self._failures = 0
self._opened_at: float | None = None
self._lock = asyncio.Lock()
async def call(self, fn: Callable[[], Awaitable[T]]) -> T:
async with self._lock:
if self._opened_at is not None:
now = asyncio.get_event_loop().time()
if (now - self._opened_at) < self.recovery_timeout:
raise TimeoutError("circuit_open")
# half-open probe
self._opened_at = None
self._failures = 0
try:
out = await fn()
except Exception:
async with self._lock:
self._failures += 1
if self._failures >= self.failure_threshold:
self._opened_at = asyncio.get_event_loop().time()
raise
return out
END_FILE
BEGIN_FILE: src/app/infra/rate_limit.py
from __future__ import annotations
import hashlib
import time
from dataclasses import dataclass
from redis.asyncio import Redis
def _hash_ip(ip: str) -> str:
"""Hash an IP address for safe use in Redis keys (prevents raw IP disclosure)."""
return hashlib.sha256(ip.encode("utf-8")).hexdigest()[:24]
LUA_TOKEN_BUCKET = r"""
local key = KEYS[1]
local now = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local burst = tonumber(ARGV[3])
local ttl = tonumber(ARGV[4])
local data = redis.call("HMGET", key, "tokens", "ts")
local tokens = tonumber(data[1])
local ts = tonumber(data[2])
if tokens == nil then
tokens = burst
ts = now
end
local delta = math.max(0, now - ts)
local refill = delta * rate
tokens = math.min(burst, tokens + refill)
local allowed = 0
if tokens >= 1 then
allowed = 1
tokens = tokens - 1
end
redis.call("HMSET", key, "tokens", tokens, "ts", now)
redis.call("EXPIRE", key, ttl)
return allowed
"""
@dataclass(frozen=True)
class TokenBucketLimiter:
"""Redis-backed token bucket limiter using an atomic Lua script."""
redis: Redis
prefix: str
rate: int
burst: int
ttl_sec: int = 60
async def allow(self, key: str) -> bool:
now = int(time.time())
redis_key = f"{self.prefix}{key}".encode("utf-8")
allowed = await self.redis.eval(LUA_TOKEN_BUCKET, 1, redis_key, now, self.rate, self.burst, self.ttl_sec)
return bool(int(allowed))
def key_for_webhook(self, provider: str, client_ip: str) -> str:
return f"rl:webhook:{provider}:{_hash_ip(client_ip)}"
END_FILE
BEGIN_FILE: src/app/infra/locks.py
from __future__ import annotations
import secrets
from dataclasses import dataclass
from typing import Optional
from redis.asyncio import Redis
from app.logging import get_logger
logger = get_logger("locks")
@dataclass
class RedisLock:
redis: Redis
key: bytes
token: bytes
ttl_sec: int
@classmethod
def for_key(cls, redis: Redis, key: str, ttl_sec: int) -> "RedisLock":
tok = secrets.token_bytes(16)
return cls(redis=redis, key=key.encode("utf-8"), token=tok, ttl_sec=ttl_sec)
async def acquire(self) -> bool:
return bool(await self.redis.set(self.key, self.token, nx=True, ex=self.ttl_sec))
async def release(self) -> None:
# best-effort release; log failures so operators can see TTL-expiry fallback.
try:
val = await self.redis.get(self.key)
if val == self.token:
await self.redis.delete(self.key)
except Exception:
logger.warning("lock_release_failed", exc_info=True)
return
END_FILE
BEGIN_FILE: src/app/security/init.py
from __future__ import annotations
END_FILE
BEGIN_FILE: src/app/security/compare.py
from __future__ import annotations
import hmac
def constant_time_equals(a: str, b: str) -> bool:
"""Return True if strings are equal using constant-time comparison."""
return hmac.compare_digest(a.encode("utf-8"), b.encode("utf-8"))
END_FILE
BEGIN_FILE: src/app/security/admin.py
from __future__ import annotations
import secrets
import time
from dataclasses import dataclass
from typing import Optional
from fastapi import Request, Response
from redis.asyncio import Redis
from app.errors import ForbiddenError, UnauthorizedError
from app.security.compare import constant_time_equals
from app.settings import Settings
SESSION_COOKIE_NAME = "__Host-admin_session"
CSRF_COOKIE_NAME = "__Host-csrf_token"
def _now_ms() -> int:
return int(time.time() * 1000)
def _csrf_token() -> str:
# 256-bit token
return secrets.token_hex(32)
def _cookie_common(settings: Settings) -> dict[str, object]:
return {
"secure": settings.COOKIE_SECURE,
"samesite": settings.COOKIE_SAMESITE,
"path": "/", # required for __Host-
}
def set_csrf_cookie(resp: Response, settings: Settings) -> str:
tok = _csrf_token()
resp.set_cookie(
CSRF_COOKIE_NAME,
tok,
httponly=False,
max_age=settings.ADMIN_SESSION_TTL_SEC,
**_cookie_common(settings),
)
return tok
def clear_admin_cookies(resp: Response, settings: Settings) -> None:
resp.delete_cookie(SESSION_COOKIE_NAME, path="/")
resp.delete_cookie(CSRF_COOKIE_NAME, path="/")
def _sess_key(prefix: str, sid: str) -> str:
return f"{prefix}admin:sess:{sid}"
def _fail_key(prefix: str, ip: str) -> str:
return f"{prefix}admin:fail:{ip}"
def _lock_key(prefix: str, ip: str) -> str:
return f"{prefix}admin:lock:{ip}"
@dataclass(frozen=True)
class AdminSession:
sid: str
async def authenticate_admin(
request: Request,
token: str,
redis: Redis,
settings: Settings,
) -> AdminSession:
"""Authenticate admin token with lockout + progressive delay, and establish server-side session.
Stores only an opaque SID in the cookie; session data lives in Redis.
"""
ip = request.client.host if request.client else "unknown"
prefix = settings.REDIS_KEY_PREFIX
await _check_lockout(redis, prefix, ip)
await _apply_progressive_delay(redis, prefix, ip, settings)
if not constant_time_equals(token, settings.ADMIN_TOKEN):
await _record_failure(redis, prefix, ip, settings)
raise UnauthorizedError("bad_token", "invalid admin token")
await _reset_failures(redis, prefix, ip)
sid = secrets.token_hex(32)
key = _sess_key(prefix, sid)
# Store random session marker + created_at (no token fingerprint linkage).
val = f"v1:{secrets.token_hex(16)}:{int(time.time())}".encode("utf-8")
await redis.set(key.encode("utf-8"), val, ex=settings.ADMIN_SESSION_TTL_SEC)
return AdminSession(sid=sid)
async def _check_lockout(redis: Redis, prefix: str, ip: str) -> None:
locked = await redis.get(_lock_key(prefix, ip).encode("utf-8"))
if locked:
raise ForbiddenError("locked_out", "too many failures, try again later")
async def _apply_progressive_delay(redis: Redis, prefix: str, ip: str, settings: Settings) -> None:
k = _fail_key(prefix, ip).encode("utf-8")
raw = await redis.get(k)
fails = int(raw) if raw else 0
if fails <= 0:
return
delay_ms = min(2000, settings.ADMIN_PROGRESSIVE_DELAY_BASE_MS * fails)
time.sleep(delay_ms / 1000.0)
async def _record_failure(redis: Redis, prefix: str, ip: str, settings: Settings) -> None:
k = _fail_key(prefix, ip).encode("utf-8")
fails = await redis.incr(k)
await redis.expire(k, settings.ADMIN_LOCKOUT_SEC)
if int(fails) >= settings.ADMIN_MAX_FAILS:
await _lockout(redis, prefix, ip, settings)
async def _lockout(redis: Redis, prefix: str, ip: str, settings: Settings) -> None:
lk = _lock_key(prefix, ip).encode("utf-8")
await redis.set(lk, b"1", ex=settings.ADMIN_LOCKOUT_SEC)
async def _reset_failures(redis: Redis, prefix: str, ip: str) -> None:
await redis.delete(_fail_key(prefix, ip).encode("utf-8"))
await redis.delete(_lock_key(prefix, ip).encode("utf-8"))
async def require_admin_session(request: Request, redis: Redis, settings: Settings) -> str:
"""Validate that a server-side admin session exists; return SID."""
sid = request.cookies.get(SESSION_COOKIE_NAME)
if not sid:
raise UnauthorizedError("no_session", "missing admin session")
key = _sess_key(settings.REDIS_KEY_PREFIX, sid).encode("utf-8")
val = await redis.get(key)
if not val:
raise UnauthorizedError("session_expired", "admin session expired")
return sid
def set_session_cookie(resp: Response, sid: str, settings: Settings) -> None:
resp.set_cookie(
SESSION_COOKIE_NAME,
sid,
httponly=True,
max_age=settings.ADMIN_SESSION_TTL_SEC,
**_cookie_common(settings),
)
END_FILE
BEGIN_FILE: src/app/security/webhook.py
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import Any, Literal
from app.errors import BadRequestError
@dataclass(frozen=True)
class WebhookPayload:
provider_event_id: str
tg_user_id: int
currency: str
amount_cents: int
provider_price_id: str
schema_version: int = 1
_CURRENCY_RE = re.compile(r"^[A-Za-z0-9]{2,5}$")
def parse_webhook_payload(body: dict[str, Any]) -> WebhookPayload:
"""Parse and validate webhook payload."""
try:
schema_version = int(body.get("schema_version", 1))
provider_event_id = str(body["provider_event_id"])
tg_user_id = int(body["tg_user_id"])
currency = str(body["currency"]).upper()
amount_cents = int(body["amount_cents"])
provider_price_id = str(body["provider_price_id"])
except Exception as e:
raise BadRequestError("bad_payload", "invalid webhook payload") from e
if schema_version != 1:
raise BadRequestError("bad_schema_version", "unsupported schema_version")
if not _CURRENCY_RE.match(currency):
raise BadRequestError("bad_currency", "currency must be 2-5 alnum chars")
if amount_cents <= 0:
raise BadRequestError("bad_amount", "amount_cents must be positive")
return WebhookPayload(
provider_event_id=provider_event_id,
tg_user_id=tg_user_id,
currency=currency,
amount_cents=amount_cents,
provider_price_id=provider_price_id,
schema_version=schema_version,
)
END_FILE
BEGIN_FILE: src/app/security/webhook_sig.py
from __future__ import annotations
import hmac
import hashlib
import time
from typing import Optional
from redis.asyncio import Redis
from app.errors import ForbiddenError
from app.settings import Settings
def _get_header(headers: dict[str, str], key: str) -> Optional[str]:
for k, v in headers.items():
if k.lower() == key.lower():
return v
return None
def _iter_secrets(settings: Settings) -> list[bytes]:
return [s.strip().encode("utf-8") for s in settings.WEBHOOK_SIGNATURE_SECRETS.split(",") if s.strip()]
def verify_signature(
*,
provider: str,
headers: dict[str, str],
body_bytes: bytes,
redis: Redis,
settings: Settings,
) -> None:
"""Verify webhook signature, timestamp skew, and nonce replay protection."""
sig = _get_header(headers, "X-Apex-Signature") or _get_header(headers, "X-Signature")
ts = _get_header(headers, "X-Apex-Timestamp") or _get_header(headers, "X-Timestamp")
nonce = _get_header(headers, "X-Apex-Nonce") or _get_header(headers, "X-Nonce")
if not sig or not ts or not nonce:
raise ForbiddenError("missing_sig", "missing webhook signature headers")
try:
ts_i = int(ts)
except ValueError as e:
raise ForbiddenError("bad_ts", "invalid timestamp") from e
now = int(time.time())
if abs(now - ts_i) > settings.WEBHOOK_TS_SKEW_SEC:
raise ForbiddenError("ts_skew", "timestamp skew too large")
# replay protection
nonce_key = f"{settings.REDIS_KEY_PREFIX}wh:nonce:{provider}:{nonce}".encode("utf-8")
if not await redis.set(nonce_key, b"1", nx=True, ex=settings.WEBHOOK_NONCE_TTL_SEC):
raise ForbiddenError("replay", "replayed nonce")
msg = f"{ts_i}.{nonce}.".encode("utf-8") + body_bytes
expected = None
for sec in _iter_secrets(settings):
mac = hmac.new(sec, msg, hashlib.sha256).hexdigest()
candidate = f"v1={mac}"
if hmac.compare_digest(candidate, sig):
expected = candidate
break
if expected is None:
raise ForbiddenError("bad_sig", "invalid signature")
END_FILE
BEGIN_FILE: src/app/services/init.py
from __future__ import annotations
END_FILE
BEGIN_FILE: src/app/services/plans.py
from __future__ import annotations
from dataclasses import dataclass
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models import Plan
@dataclass(frozen=True)
class PlanInfo:
id: int
provider: str
provider_price_id: str
currency: str
amount_cents: int
duration_days: int
async def get_plan_by_provider_price_id(
session: AsyncSession,
provider: str,
provider_price_id: str,
) -> PlanInfo | None:
"""Lookup a plan by provider+price id."""
q = select(Plan).where(Plan.provider == provider, Plan.provider_price_id == provider_price_id)
res = await session.execute(q)
row = res.scalar_one_or_none()
if row is None:
return None
return PlanInfo(
id=int(row.id),
provider=str(row.provider),
provider_price_id=str(row.provider_price_id),
currency=str(row.currency),
amount_cents=int(row.amount_cents),
duration_days=int(getattr(row, "duration_days", 30)),
)
END_FILE
BEGIN_FILE: src/app/services/subscriptions.py
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.errors import BadRequestError, ConflictError
from app.models import Payment, Subscription
from app.services.plans import PlanInfo
def utc_now() -> datetime:
return datetime.now(timezone.utc)
def validate_amount_matches_plan(plan: PlanInfo, amount_cents: int, currency: str) -> None:
"""Validate payment amount matches plan exactly."""
if amount_cents != plan.amount_cents or currency.upper() != plan.currency.upper():
raise BadRequestError(
"amount_mismatch",
f"expected {plan.amount_cents} {plan.currency}, got {amount_cents} {currency}",
)
async def ensure_payment_idempotent(
session: AsyncSession,
provider: str,
provider_event_id: str,
) -> Payment | None:
"""Return existing payment if already recorded (idempotency)."""
q = select(Payment).where(Payment.provider == provider, Payment.provider_event_id == provider_event_id)
res = await session.execute(q)
return res.scalar_one_or_none()
async def grant_subscription_atomic(
session: AsyncSession,
*,
provider: str,
provider_event_id: str,
tg_user_id: int,
plan: PlanInfo,
amount_cents: int,
currency: str,
) -> None:
"""Create payment + subscription within a single transaction.
If anything fails, transaction rolls back and no orphan rows remain.
"""
validate_amount_matches_plan(plan, amount_cents, currency)
async with session.begin():
existing = await ensure_payment_idempotent(session, provider, provider_event_id)
if existing is not None:
# already processed
return
now = utc_now()
pay = Payment(
provider=provider,
provider_event_id=provider_event_id,
tg_user_id=tg_user_id,
amount_cents=amount_cents,
currency=currency.upper(),
status="confirmed",
created_at=now,
)
session.add(pay)
await session.flush()
ends = now + timedelta(days=int(plan.duration_days or 30))
sub = Subscription(
tg_user_id=tg_user_id,
plan_id=plan.id,
status="active",
starts_at=now,
ends_at=ends,
created_at=now,
)
session.add(sub)
END_FILE
BEGIN_FILE: src/app/routes/init.py
from __future__ import annotations
END_FILE
BEGIN_FILE: src/app/routes/health.py
from __future__ import annotations
from fastapi import APIRouter
from app.infra.redis import get_redis
from app.db import get_engine
router = APIRouter(prefix="/health", tags=["health"])
@router.get("/live")
async def live() -> dict[str, object]:
return {"ok": True}
@router.get("/ready")
async def ready() -> dict[str, object]:
# basic dependency checks
r = get_redis()
await r.ping()
eng = get_engine()
async with eng.connect() as conn:
await conn.exec_driver_sql("SELECT 1")
return {"ok": True}
END_FILE
BEGIN_FILE: src/app/routes/admin.py
from __future__ import annotations
from datetime import datetime, timezone
from fastapi import APIRouter, Request, Response
from fastapi.responses import JSONResponse
from sqlalchemy import func, select
from app.db import get_session_factory
from app.infra.redis import get_redis
from app.schemas import LoginIn, LoginOut, LogoutOut, OkResponse, StatsOut, error_envelope
from app.security.admin import (
clear_admin_cookies,
require_admin_session,
set_csrf_cookie,
set_session_cookie,
authenticate_admin,
)
from app.settings import Settings
router = APIRouter(prefix="/admin", tags=["admin"])
def _request_id(request: Request) -> str:
return getattr(request.state, "request_id", "missing_request_id")
@router.post("/login")
async def login(payload: LoginIn, request: Request, response: Response, settings: Settings) -> OkResponse[LoginOut]:
r = get_redis()
sess = await authenticate_admin(request, payload.token, r, settings)
set_session_cookie(response, sess.sid, settings)
set_csrf_cookie(response, settings)
return OkResponse(result=LoginOut(message="ok"))
@router.post("/logout")
async def logout(request: Request, response: Response, settings: Settings) -> OkResponse[LogoutOut]:
r = get_redis()
sid = request.cookies.get("__Host-admin_session")
if sid:
key = f"{settings.REDIS_KEY_PREFIX}admin:sess:{sid}".encode("utf-8")
await r.delete(key)
clear_admin_cookies(response, settings)
return OkResponse(result=LogoutOut(message="ok"))
@router.get("/stats")
async def stats(request: Request, settings: Settings) -> OkResponse[StatsOut]:
r = get_redis()
await require_admin_session(request, r, settings)
sf = get_session_factory()
now = datetime.now(timezone.utc)
async with sf() as session:
total_q = select(func.count()).select_from(func.cast(func.literal_column("payments"), func.text)) # type: ignore
# safer explicit counts using ORM tables
from app.models import Payment, Subscription # local import to avoid cycles
total = await session.scalar(select(func.count()).select_from(Payment))
confirmed = await session.scalar(select(func.count()).select_from(Payment).where(Payment.status == "confirmed"))
failed = await session.scalar(select(func.count()).select_from(Payment).where(Payment.status == "failed"))
active = await session.scalar(
select(func.count()).select_from(Subscription).where(Subscription.status == "active", Subscription.ends_at > now)
)
out = StatsOut(
total_payments=int(total or 0),
confirmed_payments=int(confirmed or 0),
failed_payments=int(failed or 0),
active_subscriptions=int(active or 0),
as_of=now.isoformat(),
)
return OkResponse(result=out)
END_FILE
BEGIN_FILE: src/app/routes/webhooks.py
from __future__ import annotations
import ipaddress
from typing import Optional
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_session_factory
from app.errors import ApiError, BadRequestError, ForbiddenError, ServiceUnavailableError
from app.infra.rate_limit import TokenBucketLimiter
from app.infra.redis import get_redis
from app.schemas import OkResponse, WebhookOkOut, error_envelope
from app.security.webhook import parse_webhook_payload
from app.security.webhook_sig import verify_signature
from app.services.plans import get_plan_by_provider_price_id
from app.services.subscriptions import grant_subscription_atomic
from app.settings import ProviderAllowlist, Settings
router = APIRouter(prefix="/webhooks", tags=["webhooks"])
def _request_id(request: Request) -> str:
return getattr(request.state, "request_id", "missing_request_id")
def _parse_trusted_proxy_nets(settings: Settings) -> list[ipaddress._BaseNetwork]:
nets: list[ipaddress._BaseNetwork] = []
raw = settings.TRUSTED_PROXY_NETS.strip()
if not raw:
return nets
for part in [p.strip() for p in raw.split(",") if p.strip()]:
try:
nets.append(ipaddress.ip_network(part, strict=False))
except ValueError:
continue
return nets
def _peer_is_trusted(peer_ip: str, nets: list[ipaddress._BaseNetwork]) -> bool:
if not nets:
return False
try:
ip = ipaddress.ip_address(peer_ip)
except ValueError:
return False
return any(ip in n for n in nets)
def _client_ip(request: Request, settings: Settings) -> str:
"""Get client IP safely.
X-Forwarded-For is honored ONLY if the immediate peer is a trusted proxy
(in TRUSTED_PROXY_NETS) and TRUSTED_PROXY_HOPS is configured.
"""
peer = request.client.host if request.client else ""
nets = _parse_trusted_proxy_nets(settings)
if not peer or not _peer_is_trusted(peer, nets) or settings.TRUSTED_PROXY_HOPS <= 0:
return peer or "unknown"
xff = request.headers.get("x-forwarded-for")
if not xff:
return peer or "unknown"
parts = [p.strip() for p in xff.split(",") if p.strip()]
if not parts:
return peer or "unknown"
# With N trusted proxy hops, client IP is at index len(parts) - N - 1
idx = len(parts) - settings.TRUSTED_PROXY_HOPS - 1
if idx < 0 or idx >= len(parts):
return peer or "unknown"
cand = parts[idx]
try:
ipaddress.ip_address(cand)
return cand
except ValueError:
return peer or "unknown"
@router.post("/{provider}")
async def webhook(provider: str, request: Request, settings: Settings) -> JSONResponse:
rid = _request_id(request)
allow = ProviderAllowlist.from_settings(settings)
if provider.lower() not in allow.providers:
return JSONResponse(status_code=404, content=error_envelope("unknown_provider", "provider not allowed", rid))
r = get_redis()
limiter = TokenBucketLimiter(
redis=r,
prefix=f"{settings.REDIS_KEY_PREFIX}",
rate=settings.WEBHOOK_RL_RATE,
burst=settings.WEBHOOK_RL_BURST,
ttl_sec=60,
)
ip = _client_ip(request, settings)
if not await limiter.allow(limiter.key_for_webhook(provider.lower(), ip)):
return JSONResponse(status_code=429, content=error_envelope("rate_limited", "too many requests", rid))
body = await request.body()
try:
verify_signature(provider=provider.lower(), headers=dict(request.headers), body_bytes=body, redis=r, settings=settings)
except ApiError as e:
return JSONResponse(status_code=e.status_code, content=error_envelope(e.code, e.reason, rid))
try:
data = await request.json()
payload = parse_webhook_payload(data)
except ApiError as e:
return JSONResponse(status_code=e.status_code, content=error_envelope(e.code, e.reason, rid))
except Exception:
return JSONResponse(status_code=400, content=error_envelope("bad_json", "invalid json", rid))
sf = get_session_factory()
async with sf() as session:
plan = await get_plan_by_provider_price_id(session, provider.lower(), payload.provider_price_id)
if plan is None:
return JSONResponse(status_code=400, content=error_envelope("unknown_plan", "unknown plan", rid))
try:
await grant_subscription_atomic(
session,
provider=provider.lower(),
provider_event_id=payload.provider_event_id,
tg_user_id=payload.tg_user_id,
plan=plan,
amount_cents=payload.amount_cents,
currency=payload.currency,
)
except ApiError as e:
return JSONResponse(status_code=e.status_code, content=error_envelope(e.code, e.reason, rid))
return JSONResponse(status_code=200, content=OkResponse(result=WebhookOkOut(message="ok")).model_dump())
END_FILE
BEGIN_FILE: src/app/main.py
from __future__ import annotations
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from app.db import close_engine, init_engine
from app.errors import ApiError
from app.infra.redis import close_redis_pool, init_redis
from app.middleware import BodySizeLimitMiddleware, RequestIdMiddleware, SecurityHeadersMiddleware
from app.routes.admin import router as admin_router
from app.routes.health import router as health_router
from app.routes.webhooks import router as webhooks_router
from app.schemas import error_envelope
from app.settings import Settings
settings = Settings()
@asynccontextmanager
async def lifespan(app: FastAPI):
init_engine(settings)
init_redis(settings)
yield
await close_engine()
await close_redis_pool()
app = FastAPI(lifespan=lifespan)
app.add_middleware(RequestIdMiddleware)
app.add_middleware(BodySizeLimitMiddleware, settings=settings)
app.add_middleware(SecurityHeadersMiddleware, settings=settings)
app.include_router(health_router)
app.include_router(webhooks_router, prefix="/v1")
app.include_router(admin_router, prefix="/v1")
@app.exception_handler(ApiError)
async def api_error_handler(request: Request, exc: ApiError) -> JSONResponse:
rid = getattr(request.state, "request_id", "missing_request_id")
return JSONResponse(status_code=exc.status_code, content=error_envelope(exc.code, exc.reason, rid))
@app.exception_handler(Exception)
async def unhandled_handler(request: Request, exc: Exception) -> JSONResponse:
rid = getattr(request.state, "request_id", "missing_request_id")
return JSONResponse(status_code=500, content=error_envelope("internal", "internal error", rid))
END_FILE
BEGIN_FILE: tests/init.py
from __future__ import annotations
END_FILE
BEGIN_FILE: tests/conftest.py
from __future__ import annotations
import os
import pytest
from fastapi.testclient import TestClient
from app.main import app
@pytest.fixture()
def client() -> TestClient:
return TestClient(app)
END_FILE
BEGIN_FILE: tests/unit/init.py
from __future__ import annotations
END_FILE
BEGIN_FILE: tests/unit/test_trusted_proxy_ip.py
from __future__ import annotations
from types import SimpleNamespace
from starlette.requests import Request
from app.settings import Settings
from app.routes.webhooks import _client_ip
class DummyClient:
def __init__(self, host: str) -> None:
self.host = host
def make_request(peer_ip: str, xff: str | None) -> Request:
scope = {"type": "http", "method": "POST", "path": "/", "headers": []}
req = Request(scope)
req._headers = {} # type: ignore[attr-defined]
if xff is not None:
req._headers["x-forwarded-for"] = xff # type: ignore[attr-defined]
req._client = (peer_ip, 12345) # type: ignore[attr-defined]
return req
def test_client_ip_ignores_xff_when_peer_not_trusted() -> None:
s = Settings(
DATABASE_URL="postgresql+asyncpg://x:x@localhost:5432/x",
REDIS_URL="redis://localhost:6379/0",
ADMIN_TOKEN="x" * 32,
ADMIN_TOKEN_FINGERPRINT_SECRET="y" * 32,
WEBHOOK_SIGNATURE_SECRETS="z" * 32,
TRUSTED_PROXY_NETS="10.0.0.0/8",
TRUSTED_PROXY_HOPS=1,
)
req = make_request("203.0.113.10", "1.2.3.4, 10.0.0.5")
assert _client_ip(req, s) == "203.0.113.10"
def test_client_ip_uses_xff_when_peer_trusted_and_hops_configured() -> None:
s = Settings(
DATABASE_URL="postgresql+asyncpg://x:x@localhost:5432/x",
REDIS_URL="redis://localhost:6379/0",
ADMIN_TOKEN="x" * 32,
ADMIN_TOKEN_FINGERPRINT_SECRET="y" * 32,
WEBHOOK_SIGNATURE_SECRETS="z" * 32,
TRUSTED_PROXY_NETS="10.0.0.0/8",
TRUSTED_PROXY_HOPS=1,
)
# peer is trusted proxy
req = make_request("10.0.0.5", "1.2.3.4, 10.0.0.5")
assert _client_ip(req, s) == "1.2.3.4"
END_FILE
BEGIN_FILE: tests/unit/test_admin_sessions.py
from __future__ import annotations
import re
from fastapi.testclient import TestClient
def test_admin_session_cookie_is_opaque(client: TestClient) -> None:
# this is a structural smoke test; actual auth path requires env secrets.
# We only verify cookie naming conventions are correct in code.
from app.security.admin import SESSION_COOKIE_NAME, CSRF_COOKIE_NAME
assert SESSION_COOKIE_NAME.startswith("__Host-")
assert CSRF_COOKIE_NAME.startswith("__Host-")
END_FILE
BEGIN_FILE: tests/unit/test_circuit_breaker.py
from __future__ import annotations
import pytest
from app.infra.resilience import CircuitBreaker
@pytest.mark.asyncio
async def test_circuit_breaker_opens_after_failures() -> None:
cb = CircuitBreaker(failure_threshold=2, recovery_timeout=60.0)
async def boom() -> int:
raise RuntimeError("nope")
for _ in range(2):
with pytest.raises(RuntimeError):
await cb.call(boom)
with pytest.raises(TimeoutError):
await cb.call(boom)
END_FILE
BEGIN_FILE: tests/security/init.py
from __future__ import annotations
END_FILE
BEGIN_FILE: tests/security/test_lockout_mechanism.py
from __future__ import annotations
# Placeholder: security tests depend on redis + settings in integration mode.
# Kept to mirror prior structure and prevent accidental deletion.
def test_placeholder() -> None:
assert True
END_FILE
BEGIN_FILE: tests/security/test_rate_limit_effectiveness.py
from __future__ import annotations
def test_placeholder() -> None:
assert True
END_FILE
BEGIN_FILE: tests/security/test_replay_protection.py
from __future__ import annotations
def test_placeholder() -> None:
assert True
END_FILE
END_REPO
MicroBin by Dániel Szabó and the FOSS Community. Let's keep the Web compact, accessible and humane!