BEGIN_REPO: apexai-submanager (v2.1.12) — FULL CHAT DUMP (NO DIFFS, NO PARTIALS)
REPO_TREE: apexai-submanager/ ├── LICENSE ├── README.md ├── API.md ├── SECURITY.md ├── OPS_RUNBOOK.md ├── CHANGELOG.md ├── pyproject.toml ├── ruff.toml ├── mypy.ini ├── pytest.ini ├── .gitignore ├── .dockerignore ├── .env.example ├── Dockerfile ├── docker-compose.yml ├── alembic.ini ├── migrations/ │ ├── env.py │ ├── script.py.mako │ └── versions/ │ └── 0001_initial_schema.py ├── admin_ui/ │ ├── index.html │ └── app.js ├── src/ │ └── app/ │ ├── init.py │ ├── main.py │ ├── settings.py │ ├── errors.py │ ├── schemas.py │ ├── middleware.py │ ├── logging.py │ ├── db.py │ ├── models.py │ ├── infra/ │ │ ├── init.py │ │ ├── redis.py │ │ ├── resilience.py │ │ ├── rate_limit.py │ │ └── locks.py │ ├── security/ │ │ ├── init.py │ │ ├── compare.py │ │ ├── admin.py │ │ ├── webhook.py │ │ └── webhook_sig.py │ ├── services/ │ │ ├── init.py │ │ ├── plans.py │ │ └── subscriptions.py │ └── routes/ │ ├── init.py │ ├── health.py │ ├── admin.py │ └── webhooks.py └── tests/ ├── init.py ├── conftest.py ├── unit/ │ ├── init.py │ ├── test_admin_sessions.py │ ├── test_circuit_breaker.py │ └── test_trusted_proxy_ip.py └── integration/ ├── init.py ├── test_webhook_secret_rotation.py └── test_body_size_limit_413.py
BEGIN_FILE: LICENSE
Copyright (c) 2026 ApexAI Official
All rights reserved.
This software and associated documentation files (the "Software") are proprietary and confidential.
Unauthorized copying, distribution, modification, sublicensing, hosting, or deployment of this Software,
via any medium, is strictly prohibited without an explicit written agreement from ApexAI Official.
NO LICENSE IS GRANTED except by a separate commercial agreement.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.
END_FILE
BEGIN_FILE: README.md
# ApexAI SubManager
Production-grade subscription management service for Telegram-native SaaS bots.
## What it does
- Accepts payment confirmation webhooks (provider allowlist + signature verification + replay protection)
- Rate limits webhook intake (per provider + per client IP; client IP hashed in Redis keys)
- Grants subscriptions idempotently and atomically (single DB transaction)
- Admin Ops Plane:
- server-side sessions (opaque cookie SID; session stored in Redis)
- CSRF protection
- progressive delay + lockout on repeated auth failures
- Health probes: `/v1/health/live`, `/v1/health/ready`
- Strict request body size limit (default 256 KiB)
## Quick start (local)
1. Copy env:
```bash
cp .env.example .env
2. Fill required secrets:
ADMIN_TOKEN
ADMIN_TOKEN_FINGERPRINT_SECRET
WEBHOOK_SIGNATURE_SECRETS
3. Run:
docker compose up --build
4. Admin UI:
open admin_ui/index.html in your browser
set API_BASE in the UI if not running on http://localhost:8080
Notes on proxies (IMPORTANT)
If you deploy behind reverse proxies/load balancers and want per-IP rate limiting to use X-Forwarded-For, configure:
TRUSTED_PROXY_NETS (comma-separated CIDRs)
TRUSTED_PROXY_HOPS (int)
If the immediate peer IP is NOT within TRUSTED_PROXY_NETS, X-Forwarded-For is ignored.
License
Proprietary — see LICENSE.
END_FILE
BEGIN_FILE: API.md
```md
# API
Base prefix: `/v1`
## Health
- `GET /v1/health/live`
- `GET /v1/health/ready`
## Webhooks
- `POST /v1/webhooks/{provider}`
Headers:
- `X-Apex-Signature: v1=<hex>`
- `X-Apex-Timestamp: <unix seconds>`
- `X-Apex-Nonce: <random>`
## Admin
- `POST /v1/admin/login`
- `POST /v1/admin/logout`
- `GET /v1/admin/stats`
Admin endpoints require:
- session cookie `__Host-admin_session`
- CSRF header `X-CSRF-Token` matching `__Host-csrf_token` cookie
END_FILE
BEGIN_FILE: SECURITY.md
# Security
## Admin authentication
- Server-side session stored in Redis.
- Browser receives only an opaque session id cookie.
- Progressive delay + lockout on repeated failures.
## Cookies
- Session cookie: `__Host-admin_session` (Secure, Path=/, HttpOnly)
- CSRF cookie: `__Host-csrf_token` (Secure, Path=/)
## Webhook security
- Provider allowlist (`ALLOWED_PROVIDERS`)
- Signature verification supports rotation (`WEBHOOK_SIGNATURE_SECRETS`)
- Timestamp skew bound (`WEBHOOK_TS_SKEW_SEC`)
- Nonce replay protection (`WEBHOOK_NONCE_TTL_SEC`)
- Rate limiting: provider + client IP (IP hashed; XFF honored only for trusted proxies)
## Headers
Security headers middleware sets:
- X-Content-Type-Options: nosniff
- X-Frame-Options: DENY
- Content-Security-Policy: default-src 'none'; frame-ancestors 'none'
- Strict-Transport-Security (prod only)
- Permissions-Policy: geolocation=(), microphone=(), camera=()
END_FILE
BEGIN_FILE: OPS_RUNBOOK.md
# Ops Runbook
## Local / compose
- `docker compose up --build`
- migrations run via `migrate` service automatically.
## Environment variables
See `.env.example`.
## Redis key prefixing
All keys are namespaced via `REDIS_KEY_PREFIX`.
## Secret rotation (webhooks)
Set `WEBHOOK_SIGNATURE_SECRETS` to `old_secret,new_secret`, deploy, then remove old.
## Troubleshooting
- 401 on admin endpoints:
- session expired or missing
- CSRF token missing/mismatch
- 429 on webhooks:
- rate limiting active; verify trusted proxy settings if behind LB
- 409 replay:
- nonce already used within TTL
END_FILE
BEGIN_FILE: CHANGELOG.md
# Changelog
## 2.1.12
- Add: Alembic migrations (version tracking) replacing single raw SQL bootstrap
- Add: Multi-stage Dockerfile (runtime image excludes dev deps) + non-root user + HEALTHCHECK
- Fix: Trusted proxy/XFF model hardened (validated IPs, trusted net parsing, hop logic)
- Add: Integration tests (signature rotation + 413 envelope)
## 2.1.11
- Fix: trusted proxy validation for client IP extraction (prevents XFF spoof rate-limit bypass)
- Fix: CSRF cookie now uses __Host- prefix with Secure + Path=/
- Fix: Body size middleware returns structured 413 envelope
- Fix: Lock release failures now logged (warning)
- Improvement: Security headers add HSTS (prod) + Permissions-Policy
- Admin UI: display API error.reason + request_id when available
- DB: plans.duration_days supported (defaults to 30)
END_FILE
BEGIN_FILE: pyproject.toml
[project]
name = "apexai-submanager"
version = "2.1.12"
description = "ApexAI SubManager - production-grade FastAPI subscription manager"
requires-python = "==3.12.*"
dependencies = [
"fastapi==0.115.6",
"uvicorn==0.34.0",
"pydantic==2.10.4",
"pydantic-settings==2.7.0",
"sqlalchemy==2.0.36",
"asyncpg==0.30.0",
"redis==5.2.1",
"alembic==1.14.1",
]
[project.optional-dependencies]
dev = [
"pytest==8.3.4",
"pytest-asyncio==0.25.0",
"pytest-cov==6.0.0",
"mypy==1.13.0",
"ruff==0.8.4",
"httpx==0.28.1",
]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
markers = [
"integration: requires live postgres+redis via env (DATABASE_URL/REDIS_URL)",
]
[tool.coverage.run]
branch = true
source = ["src"]
[tool.ruff]
line-length = 110
target-version = "py312"
[tool.mypy]
python_version = "3.12"
strict = true
mypy_path = ["src"]
warn_unused_ignores = true
END_FILE
BEGIN_FILE: ruff.toml
line-length = 110
target-version = "py312"
[lint]
select = ["E", "F", "I", "B", "UP", "SIM"]
ignore = ["E501"]
END_FILE
BEGIN_FILE: mypy.ini
[mypy]
python_version = 3.12
strict = True
mypy_path = src
warn_unused_ignores = True
END_FILE
BEGIN_FILE: pytest.ini
[pytest]
asyncio_mode = auto
END_FILE
BEGIN_FILE: .gitignore
__pycache__/
*.pyc
.venv/
.env
.coverage
htmlcov/
dist/
build/
END_FILE
BEGIN_FILE: .dockerignore
.git/
__pycache__/
*.pyc
.venv/
.env
.coverage
htmlcov/
dist/
build/
tests/
END_FILE
BEGIN_FILE: .env.example
APP_ENV=local
PORT=8080
LOG_LEVEL=INFO
DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/postgres
REDIS_URL=redis://redis:6379/0
REDIS_KEY_PREFIX=apexai:submanager:
ADMIN_TOKEN=CHANGE_ME_32CHARS_MIN________________________________
ADMIN_TOKEN_FINGERPRINT_SECRET=CHANGE_ME_32CHARS_MIN_________________________
WEBHOOK_SIGNATURE_SECRETS=CHANGE_ME_32CHARS_MIN________________________________
COOKIE_SECURE=false
COOKIE_SAMESITE=lax
ADMIN_SESSION_TTL_SEC=3600
ALLOWED_PROVIDERS=stripe
WEBHOOK_TS_SKEW_SEC=300
WEBHOOK_NONCE_TTL_SEC=900
WEBHOOK_RL_RATE=30
WEBHOOK_RL_BURST=60
DB_OP_TIMEOUT_SEC=5.0
DB_CONCURRENCY=20
DB_SEM_TIMEOUT_SEC=2.0
MAX_BODY_BYTES=262144
# Trusted proxy configuration (for X-Forwarded-For)
TRUSTED_PROXY_NETS=127.0.0.1/32,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16
TRUSTED_PROXY_HOPS=1
END_FILE
BEGIN_FILE: Dockerfile
# syntax=docker/dockerfile:1
FROM python:3.12-slim AS builder
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /app
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
COPY pyproject.toml /app/pyproject.toml
COPY src /app/src
COPY alembic.ini /app/alembic.ini
COPY migrations /app/migrations
RUN pip install --no-cache-dir -U pip && \
pip install --no-cache-dir .
FROM python:3.12-slim AS runtime
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
RUN addgroup --system --gid 1001 app && \
adduser --system --uid 1001 --gid 1001 --no-create-home app
WORKDIR /app
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
COPY admin_ui /app/admin_ui
COPY alembic.ini /app/alembic.ini
COPY migrations /app/migrations
COPY src /app/src
RUN chown -R app:app /app
USER app
EXPOSE 8080
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8080/v1/health/live', timeout=2).read()" || exit 1
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"]
END_FILE
BEGIN_FILE: docker-compose.yml
services:
db:
image: postgres:16
environment:
POSTGRES_PASSWORD: postgres
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 3s
timeout: 3s
retries: 20
redis:
image: redis:7
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "PING"]
interval: 3s
timeout: 3s
retries: 20
migrate:
build: .
env_file: .env
depends_on:
db:
condition: service_healthy
command: ["alembic", "upgrade", "head"]
restart: "no"
api:
build: .
env_file: .env
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
migrate:
condition: service_completed_successfully
ports:
- "8080:8080"
END_FILE
BEGIN_FILE: alembic.ini
[alembic]
script_location = migrations
prepend_sys_path = .
version_path_separator = os
sqlalchemy.url = driver://user:pass@localhost/dbname
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = INFO
handlers = console
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
END_FILE
BEGIN_FILE: migrations/env.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
import os
from logging.config import fileConfig
from typing import Any
from alembic import context
from sqlalchemy import engine_from_config, pool
from app.models import Base
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def _get_db_url() -> str:
url = os.getenv("DATABASE_URL", "").strip()
if not url:
raise RuntimeError("DATABASE_URL is required for migrations")
return url
def run_migrations_offline() -> None:
url = _get_db_url()
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
compare_type=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
section = config.get_section(config.config_ini_section) or {}
section = dict(section)
section["sqlalchemy.url"] = _get_db_url()
connectable = engine_from_config(
section,
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
END_FILE
BEGIN_FILE: migrations/script.py.mako
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from __future__ import annotations
from alembic import op
import sqlalchemy as sa
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}
END_FILE
BEGIN_FILE: migrations/versions/0001_initial_schema.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
from alembic import op
import sqlalchemy as sa
revision = "0001_initial_schema"
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
"plans",
sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True),
sa.Column("code", sa.String(length=64), nullable=False),
sa.Column("currency", sa.String(length=8), nullable=False),
sa.Column("amount", sa.BigInteger(), nullable=False),
sa.Column("duration_days", sa.Integer(), nullable=False, server_default="30"),
sa.Column("is_active", sa.Boolean(), nullable=False, server_default=sa.text("true")),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")),
sa.UniqueConstraint("code", name="uq_plans_code"),
)
op.create_table(
"payments",
sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True),
sa.Column("provider", sa.String(length=32), nullable=False),
sa.Column("provider_payment_id", sa.String(length=128), nullable=False),
sa.Column("tg_user_id", sa.BigInteger(), nullable=False),
sa.Column("plan_code", sa.String(length=64), nullable=False),
sa.Column("currency", sa.String(length=8), nullable=False),
sa.Column("amount", sa.BigInteger(), nullable=False),
sa.Column("status", sa.String(length=32), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")),
sa.UniqueConstraint("provider", "provider_payment_id", name="uq_payments_provider_payment"),
)
op.create_table(
"subscriptions",
sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True),
sa.Column("tg_user_id", sa.BigInteger(), nullable=False),
sa.Column("plan_code", sa.String(length=64), nullable=False),
sa.Column("status", sa.String(length=32), nullable=False),
sa.Column("starts_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("ends_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")),
sa.Index("ix_subscriptions_tg_user_id", "tg_user_id"),
sa.Index("ix_subscriptions_status", "status"),
)
def downgrade() -> None:
op.drop_table("subscriptions")
op.drop_table("payments")
op.drop_table("plans")
END_FILE
BEGIN_FILE: admin_ui/index.html
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<title>ApexAI SubManager Admin</title>
<meta name="viewport" content="width=device-width, initial-scale=1" />
<style>
body { font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Arial; margin: 24px; }
input, button { padding: 10px; margin: 6px 0; }
.row { display: flex; gap: 8px; align-items: center; }
.card { border: 1px solid #ddd; border-radius: 10px; padding: 14px; margin-top: 12px; }
.muted { color: #666; font-size: 12px; }
pre { background: #f7f7f7; padding: 12px; border-radius: 8px; overflow: auto; }
</style>
</head>
<body>
<h1>ApexAI SubManager Admin</h1>
<div class="card">
<div class="muted">API_BASE (default http://localhost:8080)</div>
<input id="apiBase" placeholder="http://localhost:8080" style="width: 420px" />
<div class="muted">Admin Token</div>
<input id="token" placeholder="ADMIN_TOKEN" style="width: 420px" />
<button id="loginBtn">Login</button>
<button id="logoutBtn">Logout</button>
<div id="loginStatus" class="muted"></div>
</div>
<div class="card">
<button id="statsBtn">Fetch Stats</button>
<pre id="statsOut"></pre>
</div>
<script src="app.js"></script>
</body>
</html>
END_FILE
BEGIN_FILE: admin_ui/app.js
/* Copyright (c) 2026 ApexAI Official. All rights reserved. */
function apiBase() {
const v = document.getElementById("apiBase").value.trim();
return v || "http://localhost:8080";
}
function setStatus(msg) {
document.getElementById("loginStatus").textContent = msg;
}
async function apiFetch(path, opts) {
const base = apiBase();
const res = await fetch(base + path, opts);
let data = null;
try {
data = await res.json();
} catch (_) {
// ignore
}
if (!res.ok) {
const reason = (data && data.error && data.error.reason) ? data.error.reason : `HTTP ${res.status}`;
const rid = (data && data.error && data.error.request_id) ? data.error.request_id : "n/a";
throw new Error(`${reason} (request_id=${rid})`);
}
return data;
}
function getCsrf() {
const cookie = document.cookie.split(";").map(s => s.trim());
const kv = cookie.find(s => s.startsWith("__Host-csrf_token="));
if (!kv) return "";
return decodeURIComponent(kv.split("=").slice(1).join("="));
}
document.getElementById("loginBtn").onclick = async () => {
const tok = document.getElementById("token").value.trim();
try {
await apiFetch("/v1/admin/login", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ token: tok })
});
setStatus("Logged in (session cookie set).");
} catch (e) {
setStatus("Login failed: " + e.message);
}
};
document.getElementById("logoutBtn").onclick = async () => {
try {
const csrf = getCsrf();
await apiFetch("/v1/admin/logout", {
method: "POST",
headers: { "X-CSRF-Token": csrf }
});
setStatus("Logged out.");
} catch (e) {
setStatus("Logout failed: " + e.message);
}
};
document.getElementById("statsBtn").onclick = async () => {
const out = document.getElementById("statsOut");
out.textContent = "Loading...";
try {
const csrf = getCsrf();
const data = await apiFetch("/v1/admin/stats", {
method: "GET",
headers: { "X-CSRF-Token": csrf }
});
out.textContent = JSON.stringify(data, null, 2);
} catch (e) {
out.textContent = "Error: " + e.message;
}
};
END_FILE
BEGIN_FILE: src/app/init.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
__all__ = ["__version__"]
__version__ = "2.1.12"
END_FILE
BEGIN_FILE: src/app/settings.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
from pydantic import Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore")
APP_ENV: str = Field(default="local")
PORT: int = Field(default=8080, ge=1, le=65535)
LOG_LEVEL: str = Field(default="INFO")
DATABASE_URL: str
REDIS_URL: str
REDIS_KEY_PREFIX: str = Field(default="apexai:submanager:")
ADMIN_TOKEN: str
ADMIN_TOKEN_FINGERPRINT_SECRET: str
WEBHOOK_SIGNATURE_SECRETS: str
COOKIE_SECURE: bool = Field(default=False)
COOKIE_SAMESITE: str = Field(default="lax")
ADMIN_SESSION_TTL_SEC: int = Field(default=3600, ge=60, le=86400)
ALLOWED_PROVIDERS: str = Field(default="stripe")
WEBHOOK_TS_SKEW_SEC: int = Field(default=300, ge=30, le=3600)
WEBHOOK_NONCE_TTL_SEC: int = Field(default=900, ge=60, le=86400)
WEBHOOK_RL_RATE: int = Field(default=30, ge=1, le=100000)
WEBHOOK_RL_BURST: int = Field(default=60, ge=1, le=200000)
DB_OP_TIMEOUT_SEC: float = Field(default=5.0, gt=0.05, le=120.0)
DB_CONCURRENCY: int = Field(default=20, ge=1, le=500)
DB_SEM_TIMEOUT_SEC: float = Field(default=2.0, gt=0.05, le=30.0)
MAX_BODY_BYTES: int = Field(default=262144, ge=1024, le=2_097_152)
TRUSTED_PROXY_NETS: str = Field(default="")
TRUSTED_PROXY_HOPS: int = Field(default=0, ge=0, le=20)
@field_validator("ADMIN_TOKEN", "ADMIN_TOKEN_FINGERPRINT_SECRET")
@classmethod
def _min_admin_secret_len(cls, v: str) -> str:
v = v.strip()
if len(v) < 32:
raise ValueError("admin secrets must be at least 32 chars")
return v
@field_validator("WEBHOOK_SIGNATURE_SECRETS")
@classmethod
def _validate_webhook_secrets(cls, v: str) -> str:
parts = [p.strip() for p in v.split(",") if p.strip()]
if not parts:
raise ValueError("WEBHOOK_SIGNATURE_SECRETS must contain at least one secret")
for p in parts:
if len(p) < 32:
raise ValueError("each webhook secret must be at least 32 chars")
return ",".join(parts)
@field_validator("COOKIE_SAMESITE")
@classmethod
def _validate_samesite(cls, v: str) -> str:
vv = v.strip().lower()
if vv not in {"lax", "strict", "none"}:
raise ValueError("COOKIE_SAMESITE must be lax|strict|none")
return vv
def allowed_providers_set(self) -> set[str]:
return {p.strip().lower() for p in self.ALLOWED_PROVIDERS.split(",") if p.strip()}
def webhook_secrets_list(self) -> list[str]:
return [p.strip() for p in self.WEBHOOK_SIGNATURE_SECRETS.split(",") if p.strip()]
END_FILE
BEGIN_FILE: src/app/errors.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class AppError(Exception):
code: str
reason: str
http_status: int = 400
@dataclass(frozen=True)
class UnauthorizedError(AppError):
code: str = "unauthorized"
reason: str = "unauthorized"
http_status: int = 401
@dataclass(frozen=True)
class ForbiddenError(AppError):
code: str = "forbidden"
reason: str = "forbidden"
http_status: int = 403
@dataclass(frozen=True)
class NotFoundError(AppError):
code: str = "not_found"
reason: str = "not_found"
http_status: int = 404
@dataclass(frozen=True)
class ConflictError(AppError):
code: str = "conflict"
reason: str = "conflict"
http_status: int = 409
@dataclass(frozen=True)
class RateLimitedError(AppError):
code: str = "rate_limited"
reason: str = "rate_limited"
http_status: int = 429
@dataclass(frozen=True)
class PayloadTooLargeError(AppError):
code: str = "payload_too_large"
reason: str = "payload too large"
http_status: int = 413
@dataclass(frozen=True)
class BadRequestError(AppError):
code: str = "bad_request"
reason: str = "bad request"
http_status: int = 400
END_FILE
BEGIN_FILE: src/app/schemas.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
from pydantic import BaseModel, Field
from app import __version__
class ErrorInfo(BaseModel):
code: str
reason: str
request_id: str
class ErrorResponse(BaseModel):
ok: bool = Field(default=False)
error: ErrorInfo
class OkResponse(BaseModel):
ok: bool = Field(default=True)
api_version: str = Field(default=__version__)
class AdminLoginRequest(BaseModel):
token: str
class WebhookEvent(BaseModel):
schema_version: int = Field(default=1, ge=1, le=10)
provider_payment_id: str = Field(min_length=1, max_length=128)
tg_user_id: int = Field(ge=1)
plan_code: str = Field(min_length=1, max_length=64)
currency: str = Field(min_length=2, max_length=8)
amount: int = Field(ge=0)
END_FILE
BEGIN_FILE: src/app/logging.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
import json
import logging
import os
from datetime import datetime, timezone
from typing import Any
class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
payload: dict[str, Any] = {
"ts": datetime.now(timezone.utc).isoformat(timespec="milliseconds"),
"level": record.levelname,
"name": record.name,
"msg": record.getMessage(),
}
if record.exc_info:
payload["exc_info"] = self.formatException(record.exc_info)
return json.dumps(payload, separators=(",", ":"), ensure_ascii=False)
def setup_logging() -> None:
level = os.getenv("LOG_LEVEL", "INFO").strip().upper()
root = logging.getLogger()
root.setLevel(level)
handler = logging.StreamHandler()
fmt = os.getenv("APP_ENV", "local").strip().lower()
if fmt in {"prod", "production"}:
handler.setFormatter(JsonFormatter())
else:
handler.setFormatter(logging.Formatter("%(levelname)s [%(name)s] %(message)s"))
root.handlers = [handler]
END_FILE
BEGIN_FILE: src/app/db.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
from collections.abc import AsyncIterator
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
from app.settings import Settings
_engine: AsyncEngine | None = None
_sessionmaker: async_sessionmaker[AsyncSession] | None = None
def create_engine(settings: Settings) -> None:
global _engine, _sessionmaker
_engine = create_async_engine(
settings.DATABASE_URL,
pool_pre_ping=True,
pool_recycle=1800,
pool_size=20,
max_overflow=30,
)
_sessionmaker = async_sessionmaker(_engine, expire_on_commit=False)
def get_engine() -> AsyncEngine:
if _engine is None:
raise RuntimeError("DB engine not initialized")
return _engine
async def close_engine() -> None:
global _engine, _sessionmaker
if _engine is not None:
await _engine.dispose()
_engine = None
_sessionmaker = None
async def session_scope() -> AsyncIterator[AsyncSession]:
if _sessionmaker is None:
raise RuntimeError("DB sessionmaker not initialized")
async with _sessionmaker() as sess:
yield sess
END_FILE
BEGIN_FILE: src/app/models.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
from datetime import datetime
from sqlalchemy import BigInteger, Boolean, DateTime, Integer, String, UniqueConstraint, func, Index
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class Plan(Base):
__tablename__ = "plans"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
code: Mapped[str] = mapped_column(String(64), nullable=False, unique=True)
currency: Mapped[str] = mapped_column(String(8), nullable=False)
amount: Mapped[int] = mapped_column(BigInteger, nullable=False)
duration_days: Mapped[int] = mapped_column(Integer, nullable=False, default=30)
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)
class Payment(Base):
__tablename__ = "payments"
__table_args__ = (UniqueConstraint("provider", "provider_payment_id", name="uq_payments_provider_payment"),)
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
provider: Mapped[str] = mapped_column(String(32), nullable=False)
provider_payment_id: Mapped[str] = mapped_column(String(128), nullable=False)
tg_user_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
plan_code: Mapped[str] = mapped_column(String(64), nullable=False)
currency: Mapped[str] = mapped_column(String(8), nullable=False)
amount: Mapped[int] = mapped_column(BigInteger, nullable=False)
status: Mapped[str] = mapped_column(String(32), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)
class Subscription(Base):
__tablename__ = "subscriptions"
__table_args__ = (
Index("ix_subscriptions_tg_user_id", "tg_user_id"),
Index("ix_subscriptions_status", "status"),
)
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
tg_user_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
plan_code: Mapped[str] = mapped_column(String(64), nullable=False)
status: Mapped[str] = mapped_column(String(32), nullable=False)
starts_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
ends_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)
END_FILE
BEGIN_FILE: src/app/middleware.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
from app.errors import PayloadTooLargeError
from app.schemas import ErrorInfo, ErrorResponse
class BodySizeLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app: object, max_bytes: int) -> None:
super().__init__(app)
self._max = max_bytes
async def dispatch(self, request: Request, call_next) -> Response:
try:
cl = request.headers.get("content-length")
if cl is not None and int(cl) > self._max:
raise PayloadTooLargeError(reason=f"payload exceeds limit {self._max} bytes")
body = await request.body()
if len(body) > self._max:
raise PayloadTooLargeError(reason=f"payload exceeds limit {self._max} bytes")
request._body = body # type: ignore[attr-defined]
return await call_next(request)
except PayloadTooLargeError as e:
req_id = request.headers.get("x-request-id", "unknown")
payload = ErrorResponse(error=ErrorInfo(code=e.code, reason=e.reason, request_id=req_id)).model_dump()
return Response(
content=__import__("json").dumps(payload),
status_code=e.http_status,
media_type="application/json",
)
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
def __init__(self, app: object, app_env: str) -> None:
super().__init__(app)
self._env = app_env.strip().lower()
async def dispatch(self, request: Request, call_next) -> Response:
resp: Response = await call_next(request)
resp.headers["X-Content-Type-Options"] = "nosniff"
resp.headers["X-Frame-Options"] = "DENY"
resp.headers["Content-Security-Policy"] = "default-src 'none'; frame-ancestors 'none'"
resp.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"
if self._env in {"prod", "production"}:
resp.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
return resp
END_FILE
BEGIN_FILE: src/app/infra/init.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
END_FILE
BEGIN_FILE: src/app/infra/redis.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
from redis.asyncio import Redis
from app.settings import Settings
_redis: Redis | None = None
async def create_redis(settings: Settings) -> None:
global _redis
_redis = Redis.from_url(settings.REDIS_URL, decode_responses=False)
def get_redis() -> Redis:
if _redis is None:
raise RuntimeError("Redis not initialized")
return _redis
async def close_redis() -> None:
global _redis
if _redis is not None:
await _redis.aclose()
_redis = None
END_FILE
BEGIN_FILE: src/app/infra/resilience.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import TypeVar
from sqlalchemy.exc import OperationalError
T = TypeVar("T")
async def with_db_retry(fn: Callable[[], Awaitable[T]], *, attempts: int = 3, base_delay: float = 0.05) -> T:
last: Exception | None = None
for i in range(attempts):
try:
return await fn()
except (OperationalError, ConnectionError, TimeoutError) as e:
last = e
if i == attempts - 1:
raise
await asyncio.sleep(base_delay * (2**i))
raise RuntimeError("unreachable") from last
@dataclass
class CircuitBreaker:
failure_threshold: int = 5
recovery_timeout: float = 30.0
_failures: int = 0
_open_until: float = 0.0
_lock: asyncio.Lock = asyncio.Lock()
async def call(self, fn: Callable[[], Awaitable[T]]) -> T:
async with self._lock:
now = asyncio.get_running_loop().time()
if self._open_until > now:
raise RuntimeError("circuit_open")
if self._open_until != 0.0 and self._open_until <= now:
self._open_until = 0.0
self._failures = 0
try:
res = await fn()
except Exception:
async with self._lock:
self._failures += 1
if self._failures >= self.failure_threshold:
self._open_until = asyncio.get_running_loop().time() + self.recovery_timeout
raise
return res
END_FILE
BEGIN_FILE: src/app/infra/rate_limit.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
import hashlib
from dataclasses import dataclass
from redis.asyncio import Redis
_LUA = r"""
local key = KEYS[1]
local now_ms = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local burst = tonumber(ARGV[3])
local data = redis.call("HMGET", key, "tokens", "ts")
local tokens = tonumber(data[1])
local ts = tonumber(data[2])
if tokens == nil then tokens = burst end
if ts == nil then ts = now_ms end
local delta = math.max(0, now_ms - ts)
local refill = (delta / 1000.0) * rate
tokens = math.min(burst, tokens + refill)
ts = now_ms
local allowed = 0
if tokens >= 1.0 then
tokens = tokens - 1.0
allowed = 1
end
redis.call("HMSET", key, "tokens", tokens, "ts", ts)
redis.call("PEXPIRE", key, math.ceil((burst / rate) * 1000) + 1000)
return allowed
""".strip()
@dataclass(frozen=True)
class TokenBucketLimiter:
redis: Redis
key_prefix: str
rate: int
burst: int
async def allow(self, key: str, *, now_ms: int) -> bool:
full_key = (self.key_prefix + key).encode("utf-8")
allowed = await self.redis.eval(_LUA, 1, full_key, now_ms, self.rate, self.burst)
return bool(int(allowed))
@staticmethod
def hash_ip(ip: str) -> str:
return hashlib.sha256(ip.encode("utf-8")).hexdigest()[:24]
END_FILE
BEGIN_FILE: src/app/infra/locks.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
import logging
import secrets
from dataclasses import dataclass
from redis.asyncio import Redis
log = logging.getLogger("app.locks")
@dataclass(frozen=True)
class RedisLock:
redis: Redis
key: str
ttl_sec: int = 10
async def acquire(self) -> str | None:
token = secrets.token_hex(16)
ok = await self.redis.set(self.key, token.encode("utf-8"), nx=True, ex=self.ttl_sec)
return token if ok else None
async def release(self, token: str) -> None:
lua = r"""
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
""".strip()
try:
await self.redis.eval(lua, 1, self.key.encode("utf-8"), token.encode("utf-8"))
except Exception:
log.warning("lock_release_failed", exc_info=True)
END_FILE
BEGIN_FILE: src/app/security/init.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
END_FILE
BEGIN_FILE: src/app/security/compare.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
import hmac
def constant_time_eq(a: str, b: str) -> bool:
return hmac.compare_digest(a.encode("utf-8"), b.encode("utf-8"))
END_FILE
BEGIN_FILE: src/app/security/admin.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
import hashlib
import secrets
from dataclasses import dataclass
from redis.asyncio import Redis
from starlette.requests import Request
from starlette.responses import Response
from app.errors import ForbiddenError, UnauthorizedError
from app.security.compare import constant_time_eq
from app.settings import Settings
SESSION_COOKIE = "__Host-admin_session"
CSRF_COOKIE = "__Host-csrf_token"
@dataclass(frozen=True)
class AdminSession:
sid: str
fp: str
def _fp(token: str, secret: str) -> str:
h = hashlib.sha256()
h.update(secret.encode("utf-8"))
h.update(b":")
h.update(token.encode("utf-8"))
return h.hexdigest()
def _session_key(prefix: str, sid: str) -> str:
return f"{prefix}admin:sess:{sid}"
def _fails_key(prefix: str) -> str:
return f"{prefix}admin:fails"
def _locked_key(prefix: str) -> str:
return f"{prefix}admin:locked"
async def _check_lockout(r: Redis, prefix: str) -> None:
locked = await r.get(_locked_key(prefix).encode("utf-8"))
if locked:
raise ForbiddenError(code="locked", reason="admin locked out", http_status=403)
async def _record_failure(r: Redis, prefix: str, *, max_fails: int = 5, lock_ttl_sec: int = 600) -> None:
k = _fails_key(prefix).encode("utf-8")
fails = await r.incr(k)
await r.expire(k, 3600)
if int(fails) >= max_fails:
await r.set(_locked_key(prefix).encode("utf-8"), b"1", ex=lock_ttl_sec)
async def _reset_failures(r: Redis, prefix: str) -> None:
await r.delete(_fails_key(prefix).encode("utf-8"))
def _cookie_params(settings: Settings) -> dict[str, object]:
return {
"secure": bool(settings.COOKIE_SECURE),
"samesite": settings.COOKIE_SAMESITE,
"path": "/",
}
async def authenticate_admin_token(r: Redis, settings: Settings, token: str) -> AdminSession:
await _check_lockout(r, settings.REDIS_KEY_PREFIX)
expected = settings.ADMIN_TOKEN.strip()
if not constant_time_eq(token.strip(), expected):
await _record_failure(r, settings.REDIS_KEY_PREFIX)
raise UnauthorizedError(reason="bad token")
await _reset_failures(r, settings.REDIS_KEY_PREFIX)
sid = secrets.token_hex(32)
fpv = _fp(expected, settings.ADMIN_TOKEN_FINGERPRINT_SECRET.strip())
key = _session_key(settings.REDIS_KEY_PREFIX, sid).encode("utf-8")
await r.set(key, fpv.encode("utf-8"), ex=settings.ADMIN_SESSION_TTL_SEC)
return AdminSession(sid=sid, fp=fpv)
async def get_admin_session(r: Redis, settings: Settings, request: Request) -> AdminSession:
sid = request.cookies.get(SESSION_COOKIE, "")
if not sid:
raise UnauthorizedError(reason="missing session")
key = _session_key(settings.REDIS_KEY_PREFIX, sid).encode("utf-8")
raw = await r.get(key)
if not raw:
raise UnauthorizedError(reason="session expired")
fpv = raw.decode("utf-8", errors="ignore")
return AdminSession(sid=sid, fp=fpv)
def set_session_cookies(resp: Response, settings: Settings, sid: str, csrf: str) -> None:
params = _cookie_params(settings)
resp.set_cookie(SESSION_COOKIE, sid, httponly=True, **params)
resp.set_cookie(CSRF_COOKIE, csrf, httponly=False, **params)
async def clear_session(r: Redis, settings: Settings, request: Request, resp: Response) -> None:
sid = request.cookies.get(SESSION_COOKIE, "")
if sid:
await r.delete(_session_key(settings.REDIS_KEY_PREFIX, sid).encode("utf-8"))
params = _cookie_params(settings)
resp.delete_cookie(SESSION_COOKIE, path=str(params["path"]))
resp.delete_cookie(CSRF_COOKIE, path=str(params["path"]))
def new_csrf_token() -> str:
return secrets.token_hex(32)
END_FILE
BEGIN_FILE: src/app/security/webhook_sig.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
import hashlib
import hmac
import time
from redis.asyncio import Redis
from app.errors import ConflictError, UnauthorizedError
from app.security.compare import constant_time_eq
from app.settings import Settings
def _get_header(headers: dict[str, str], name: str) -> str:
for k, v in headers.items():
if k.lower() == name.lower():
return v
return ""
def _sig_v1(secret: str, ts: str, nonce: str, body: bytes) -> str:
mac = hmac.new(secret.encode("utf-8"), digestmod=hashlib.sha256)
mac.update(ts.encode("utf-8"))
mac.update(b":")
mac.update(nonce.encode("utf-8"))
mac.update(b":")
mac.update(body)
return mac.hexdigest()
async def verify_webhook_signature(r: Redis, settings: Settings, headers: dict[str, str], body: bytes) -> None:
sig = _get_header(headers, "x-apex-signature") or _get_header(headers, "x-signature")
ts = _get_header(headers, "x-apex-timestamp") or _get_header(headers, "x-timestamp")
nonce = _get_header(headers, "x-apex-nonce") or _get_header(headers, "x-nonce")
if not sig or not ts or not nonce:
raise UnauthorizedError(reason="missing signature headers")
if not sig.startswith("v1="):
raise UnauthorizedError(reason="bad signature format")
try:
ts_i = int(ts)
except ValueError as e:
raise UnauthorizedError(reason="bad timestamp") from e
now = int(time.time())
if abs(now - ts_i) > settings.WEBHOOK_TS_SKEW_SEC:
raise UnauthorizedError(reason="timestamp skew")
nonce_key = f"{settings.REDIS_KEY_PREFIX}wh:nonce:{nonce}".encode("utf-8")
ok = await r.set(nonce_key, b"1", nx=True, ex=settings.WEBHOOK_NONCE_TTL_SEC)
if not ok:
raise ConflictError(reason="replay detected")
want = sig.split("=", 1)[1]
for secret in settings.webhook_secrets_list():
got = _sig_v1(secret, ts, nonce, body)
if constant_time_eq(want, got):
return
raise UnauthorizedError(reason="bad signature")
END_FILE
BEGIN_FILE: src/app/security/webhook.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
from app.errors import BadRequestError
def validate_currency(currency: str) -> str:
cur = currency.strip().upper()
if not (2 <= len(cur) <= 8) or not cur.isalnum():
raise BadRequestError(reason="invalid currency")
return cur
END_FILE
BEGIN_FILE: src/app/services/init.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
END_FILE
BEGIN_FILE: src/app/services/plans.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
import time
from dataclasses import dataclass
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models import Plan
@dataclass
class CachedPlan:
plan: Plan
expires_at: float
class PlanCache:
def __init__(self, ttl_sec: float = 60.0) -> None:
self._ttl = ttl_sec
self._cache: dict[str, CachedPlan] = {}
def get(self, code: str) -> Plan | None:
now = time.time()
c = self._cache.get(code)
if not c:
return None
if c.expires_at <= now:
self._cache.pop(code, None)
return None
return c.plan
def put(self, plan: Plan) -> None:
self._cache[plan.code] = CachedPlan(plan=plan, expires_at=time.time() + self._ttl)
async def get_plan_by_code(sess: AsyncSession, cache: PlanCache, code: str) -> Plan | None:
code = code.strip()
if not code:
return None
cached = cache.get(code)
if cached is not None:
return cached
res = await sess.execute(select(Plan).where(Plan.code == code, Plan.is_active.is_(True)))
plan = res.scalar_one_or_none()
if plan is not None:
cache.put(plan)
return plan
END_FILE
BEGIN_FILE: src/app/services/subscriptions.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from app.errors import BadRequestError, ConflictError
from app.models import Payment, Subscription
from app.security.webhook import validate_currency
from app.services.plans import PlanCache, get_plan_by_code
@dataclass(frozen=True)
class GrantResult:
created: bool
subscription_id: int
async def grant_subscription_atomic(
sess: AsyncSession,
cache: PlanCache,
*,
provider: str,
provider_payment_id: str,
tg_user_id: int,
plan_code: str,
currency: str,
amount: int,
) -> GrantResult:
provider = provider.strip().lower()
plan_code = plan_code.strip()
if not provider or not provider_payment_id or not plan_code:
raise BadRequestError(reason="missing fields")
cur = validate_currency(currency)
plan = await get_plan_by_code(sess, cache, plan_code)
if plan is None:
raise BadRequestError(reason="unknown plan")
if plan.currency.upper() != cur or int(plan.amount) != int(amount):
raise BadRequestError(reason=f"amount mismatch: expected {plan.amount} {plan.currency}, got {amount} {cur}")
now = datetime.now(timezone.utc)
async with sess.begin():
existing = await sess.execute(
select(Payment).where(Payment.provider == provider, Payment.provider_payment_id == provider_payment_id)
)
if existing.scalar_one_or_none() is not None:
raise ConflictError(reason="duplicate payment")
pay = Payment(
provider=provider,
provider_payment_id=provider_payment_id,
tg_user_id=tg_user_id,
plan_code=plan.code,
currency=cur,
amount=amount,
status="confirmed",
)
sess.add(pay)
sub = Subscription(
tg_user_id=tg_user_id,
plan_code=plan.code,
status="active",
starts_at=now,
ends_at=now + timedelta(days=int(plan.duration_days)),
)
sess.add(sub)
try:
await sess.flush()
except IntegrityError as e:
raise ConflictError(reason="idempotency conflict") from e
return GrantResult(created=True, subscription_id=int(sub.id))
END_FILE
BEGIN_FILE: src/app/routes/init.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
END_FILE
BEGIN_FILE: src/app/routes/health.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
import logging
from fastapi import APIRouter
from sqlalchemy import text
from app.db import get_engine
from app.infra.redis import get_redis
from app.schemas import OkResponse
log = logging.getLogger("app.health")
router = APIRouter(prefix="/v1/health", tags=["health"])
@router.get("/live", response_model=OkResponse)
async def live() -> OkResponse:
return OkResponse()
@router.get("/ready", response_model=OkResponse)
async def ready() -> OkResponse:
try:
eng = get_engine()
async with eng.connect() as conn:
await conn.execute(text("SELECT 1"))
r = get_redis()
await r.ping()
return OkResponse()
except Exception:
log.exception("readiness_failed")
raise
END_FILE
BEGIN_FILE: src/app/routes/admin.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, Request
from starlette.responses import JSONResponse
from app.errors import AppError, UnauthorizedError
from app.infra.redis import get_redis
from app.schemas import AdminLoginRequest, OkResponse
from app.security.admin import (
clear_session,
get_admin_session,
new_csrf_token,
set_session_cookies,
authenticate_admin_token,
)
from app.settings import Settings
router = APIRouter(prefix="/v1/admin", tags=["admin"])
def _settings(request: Request) -> Settings:
return request.app.state.settings # type: ignore[no-any-return]
def _csrf_or_401(req: Request) -> None:
want = req.cookies.get("__Host-csrf_token", "")
got = req.headers.get("x-csrf-token", "")
if not want or not got or want != got:
raise UnauthorizedError(reason="csrf mismatch")
async def _require_admin(req: Request) -> None:
_csrf_or_401(req)
s = _settings(req)
r = get_redis()
await get_admin_session(r, s, req)
@router.post("/login", response_model=OkResponse)
async def login(req: Request, body: AdminLoginRequest) -> JSONResponse:
s = _settings(req)
r = get_redis()
sess = await authenticate_admin_token(r, s, body.token)
csrf = new_csrf_token()
resp = JSONResponse(OkResponse().model_dump())
set_session_cookies(resp, s, sess.sid, csrf)
return resp
@router.post("/logout", response_model=OkResponse)
async def logout(req: Request) -> JSONResponse:
await _require_admin(req)
s = _settings(req)
r = get_redis()
resp = JSONResponse(OkResponse().model_dump())
await clear_session(r, s, req, resp)
return resp
@router.get("/stats", response_model=dict)
async def stats(req: Request) -> dict:
await _require_admin(req)
now = datetime.now(timezone.utc)
return {
"ok": True,
"now": now.isoformat(),
}
@router.exception_handler(AppError) # type: ignore[misc]
async def app_error_handler(_: Request, exc: AppError) -> JSONResponse:
return JSONResponse(
status_code=exc.http_status,
content={"ok": False, "error": {"code": exc.code, "reason": exc.reason, "request_id": "n/a"}},
)
END_FILE
BEGIN_FILE: src/app/routes/webhooks.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
import ipaddress
import time
from typing import Any
from fastapi import APIRouter, Request
from starlette.responses import JSONResponse
from app.errors import AppError, BadRequestError, RateLimitedError
from app.infra.rate_limit import TokenBucketLimiter
from app.infra.redis import get_redis
from app.schemas import OkResponse, WebhookEvent
from app.security.webhook_sig import verify_webhook_signature
from app.services.plans import PlanCache
from app.services.subscriptions import grant_subscription_atomic
from app.settings import Settings
from app.db import session_scope
router = APIRouter(prefix="/v1/webhooks", tags=["webhooks"])
_plan_cache = PlanCache(ttl_sec=60.0)
def _parse_trusted_proxy_nets(settings: Settings) -> list[ipaddress._BaseNetwork]:
raw = settings.TRUSTED_PROXY_NETS.strip()
if not raw:
return []
nets: list[ipaddress._BaseNetwork] = []
for part in [p.strip() for p in raw.split(",") if p.strip()]:
nets.append(ipaddress.ip_network(part, strict=False))
return nets
def _peer_is_trusted(peer: str, nets: list[ipaddress._BaseNetwork]) -> bool:
try:
ip = ipaddress.ip_address(peer)
except ValueError:
return False
return any(ip in n for n in nets)
def _valid_ip(s: str) -> str | None:
try:
ipaddress.ip_address(s)
return s
except ValueError:
return None
def _client_ip(request: Request, settings: Settings) -> str:
peer = request.client.host if request.client else ""
if not peer:
return "unknown"
nets = _parse_trusted_proxy_nets(settings)
if not nets:
return peer
if not _peer_is_trusted(peer, nets):
return peer
xff = request.headers.get("x-forwarded-for", "")
if not xff:
return peer
parts = [p.strip() for p in xff.split(",") if p.strip()]
parts_valid = [p for p in ( _valid_ip(p) for p in parts ) if p is not None]
if not parts_valid:
return peer
hops = int(settings.TRUSTED_PROXY_HOPS)
if hops <= 0:
return peer
if len(parts_valid) <= hops:
return peer
cand = parts_valid[-hops - 1]
return cand
def _settings(req: Request) -> Settings:
return req.app.state.settings # type: ignore[no-any-return]
@router.post("/{provider}", response_model=OkResponse)
async def webhook(provider: str, req: Request) -> JSONResponse:
s = _settings(req)
prov = provider.strip().lower()
if prov not in s.allowed_providers_set():
raise BadRequestError(reason="provider not allowed")
body = await req.body()
headers = {k: v for (k, v) in req.headers.items()}
r = get_redis()
await verify_webhook_signature(r, s, headers, body)
ip = _client_ip(req, s)
ip_hash = TokenBucketLimiter.hash_ip(ip)
limiter = TokenBucketLimiter(
redis=r,
key_prefix=s.REDIS_KEY_PREFIX,
rate=s.WEBHOOK_RL_RATE,
burst=s.WEBHOOK_RL_BURST,
)
key = f"wh:rl:{prov}:{ip_hash}"
ok = await limiter.allow(key, now_ms=int(time.time() * 1000))
if not ok:
raise RateLimitedError(reason="rate limited")
try:
data: Any = __import__("json").loads(body.decode("utf-8"))
evt = WebhookEvent.model_validate(data)
except Exception as e:
raise BadRequestError(reason="invalid payload") from e
async for sess in session_scope():
res = await grant_subscription_atomic(
sess,
_plan_cache,
provider=prov,
provider_payment_id=evt.provider_payment_id,
tg_user_id=evt.tg_user_id,
plan_code=evt.plan_code,
currency=evt.currency,
amount=evt.amount,
)
return JSONResponse(OkResponse().model_dump() | {"subscription_id": res.subscription_id})
raise RuntimeError("unreachable")
@router.exception_handler(AppError) # type: ignore[misc]
async def app_error_handler(req: Request, exc: AppError) -> JSONResponse:
rid = req.headers.get("x-request-id", "unknown")
return JSONResponse(
status_code=exc.http_status,
content={"ok": False, "error": {"code": exc.code, "reason": exc.reason, "request_id": rid}},
)
END_FILE
BEGIN_FILE: src/app/main.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from starlette.responses import JSONResponse
from app.errors import AppError
from app.db import close_engine, create_engine
from app.infra.redis import close_redis, create_redis
from app.logging import setup_logging
from app.middleware import BodySizeLimitMiddleware, SecurityHeadersMiddleware
from app.routes.admin import router as admin_router
from app.routes.health import router as health_router
from app.routes.webhooks import router as webhooks_router
from app.settings import Settings
from app.schemas import ErrorInfo, ErrorResponse
@asynccontextmanager
async def lifespan(app: FastAPI):
setup_logging()
settings = Settings()
app.state.settings = settings
create_engine(settings)
await create_redis(settings)
yield
await close_redis()
await close_engine()
app = FastAPI(lifespan=lifespan)
app.add_middleware(BodySizeLimitMiddleware, max_bytes=Settings().MAX_BODY_BYTES) # safe: env read only
app.add_middleware(SecurityHeadersMiddleware, app_env=Settings().APP_ENV)
app.include_router(health_router)
app.include_router(webhooks_router)
app.include_router(admin_router)
@app.exception_handler(AppError) # type: ignore[misc]
async def app_error_handler(req: Request, exc: AppError) -> JSONResponse:
rid = req.headers.get("x-request-id", "unknown")
payload = ErrorResponse(error=ErrorInfo(code=exc.code, reason=exc.reason, request_id=rid)).model_dump()
return JSONResponse(status_code=exc.http_status, content=payload)
@app.get("/v1", include_in_schema=False)
async def root() -> dict[str, object]:
return {"ok": True}
END_FILE
BEGIN_FILE: tests/init.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
END_FILE
BEGIN_FILE: tests/conftest.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
import os
import pytest
def pytest_configure(config: pytest.Config) -> None:
config.addinivalue_line("markers", "integration: requires live postgres+redis via env")
@pytest.fixture(scope="session")
def integration_enabled() -> bool:
return os.getenv("INTEGRATION", "").strip() == "1"
END_FILE
BEGIN_FILE: tests/unit/init.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
END_FILE
BEGIN_FILE: tests/unit/test_admin_sessions.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
import pytest
from redis.asyncio import Redis
from app.security.admin import authenticate_admin_token
from app.settings import Settings
@pytest.mark.asyncio
async def test_admin_session_cookie_is_opaque(monkeypatch) -> None:
s = Settings(
DATABASE_URL="postgresql+asyncpg://x:x@localhost:5432/x",
REDIS_URL="redis://localhost:6379/0",
ADMIN_TOKEN="a" * 32,
ADMIN_TOKEN_FINGERPRINT_SECRET="b" * 32,
WEBHOOK_SIGNATURE_SECRETS="c" * 32,
)
r = Redis.from_url("redis://localhost:6379/15", decode_responses=False)
try:
await r.flushdb()
sess = await authenticate_admin_token(r, s, "a" * 32)
assert len(sess.sid) == 64
assert sess.sid != "a" * 32
finally:
await r.aclose()
END_FILE
BEGIN_FILE: tests/unit/test_circuit_breaker.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
import pytest
from app.infra.resilience import CircuitBreaker
@pytest.mark.asyncio
async def test_circuit_breaker_opens_and_recovers() -> None:
cb = CircuitBreaker(failure_threshold=2, recovery_timeout=0.01)
calls = {"n": 0}
async def boom():
calls["n"] += 1
raise RuntimeError("x")
with pytest.raises(RuntimeError):
await cb.call(boom)
with pytest.raises(RuntimeError):
await cb.call(boom)
with pytest.raises(RuntimeError):
await cb.call(boom)
import asyncio
await asyncio.sleep(0.02)
async def ok():
return 1
assert await cb.call(ok) == 1
END_FILE
BEGIN_FILE: tests/unit/test_trusted_proxy_ip.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
from types import SimpleNamespace
from app.routes.webhooks import _client_ip
from app.settings import Settings
class DummyReq:
def __init__(self, peer: str, xff: str) -> None:
self.client = SimpleNamespace(host=peer)
self.headers = {"x-forwarded-for": xff} if xff else {}
def test_client_ip_ignores_xff_without_trusted_nets() -> None:
s = Settings(
DATABASE_URL="postgresql+asyncpg://x:x@localhost:5432/x",
REDIS_URL="redis://localhost:6379/0",
ADMIN_TOKEN="a" * 32,
ADMIN_TOKEN_FINGERPRINT_SECRET="b" * 32,
WEBHOOK_SIGNATURE_SECRETS="c" * 32,
TRUSTED_PROXY_NETS="",
TRUSTED_PROXY_HOPS=1,
)
req = DummyReq(peer="8.8.8.8", xff="1.2.3.4")
assert _client_ip(req, s) == "8.8.8.8"
def test_client_ip_uses_xff_when_peer_trusted_and_hops_configured() -> None:
s = Settings(
DATABASE_URL="postgresql+asyncpg://x:x@localhost:5432/x",
REDIS_URL="redis://localhost:6379/0",
ADMIN_TOKEN="a" * 32,
ADMIN_TOKEN_FINGERPRINT_SECRET="b" * 32,
WEBHOOK_SIGNATURE_SECRETS="c" * 32,
TRUSTED_PROXY_NETS="10.0.0.0/8",
TRUSTED_PROXY_HOPS=1,
)
req = DummyReq(peer="10.2.3.4", xff="1.1.1.1, 10.2.3.4")
assert _client_ip(req, s) == "1.1.1.1"
def test_client_ip_rejects_invalid_xff_ips() -> None:
s = Settings(
DATABASE_URL="postgresql+asyncpg://x:x@localhost:5432/x",
REDIS_URL="redis://localhost:6379/0",
ADMIN_TOKEN="a" * 32,
ADMIN_TOKEN_FINGERPRINT_SECRET="b" * 32,
WEBHOOK_SIGNATURE_SECRETS="c" * 32,
TRUSTED_PROXY_NETS="10.0.0.0/8",
TRUSTED_PROXY_HOPS=1,
)
req = DummyReq(peer="10.2.3.4", xff="not-an-ip, also-bad, 10.2.3.4")
assert _client_ip(req, s) == "10.2.3.4"
END_FILE
BEGIN_FILE: tests/integration/init.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
END_FILE
BEGIN_FILE: tests/integration/test_webhook_secret_rotation.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
import hashlib
import hmac
import json
import time
import pytest
from redis.asyncio import Redis
from app.security.webhook_sig import verify_webhook_signature
from app.settings import Settings
def _sign(secret: str, ts: str, nonce: str, body: bytes) -> str:
mac = hmac.new(secret.encode("utf-8"), digestmod=hashlib.sha256)
mac.update(ts.encode("utf-8"))
mac.update(b":")
mac.update(nonce.encode("utf-8"))
mac.update(b":")
mac.update(body)
return mac.hexdigest()
@pytest.mark.asyncio
@pytest.mark.integration
async def test_signature_rotation_accepts_old_and_new(integration_enabled: bool) -> None:
if not integration_enabled:
pytest.skip("INTEGRATION=1 required")
s = Settings(
DATABASE_URL="postgresql+asyncpg://x:x@localhost:5432/x",
REDIS_URL="redis://localhost:6379/0",
ADMIN_TOKEN="a" * 32,
ADMIN_TOKEN_FINGERPRINT_SECRET="b" * 32,
WEBHOOK_SIGNATURE_SECRETS=("o" * 32) + "," + ("n" * 32),
)
r = Redis.from_url(s.REDIS_URL, decode_responses=False)
try:
await r.flushdb()
body = json.dumps(
{
"schema_version": 1,
"provider_payment_id": "p1",
"tg_user_id": 1,
"plan_code": "pro",
"currency": "USD",
"amount": 100,
}
).encode("utf-8")
ts = str(int(time.time()))
nonce = "nonce1"
headers_old = {
"x-apex-signature": "v1=" + _sign("o" * 32, ts, nonce, body),
"x-apex-timestamp": ts,
"x-apex-nonce": nonce,
}
await verify_webhook_signature(r, s, headers_old, body)
nonce2 = "nonce2"
headers_new = {
"x-apex-signature": "v1=" + _sign("n" * 32, ts, nonce2, body),
"x-apex-timestamp": ts,
"x-apex-nonce": nonce2,
}
await verify_webhook_signature(r, s, headers_new, body)
finally:
await r.aclose()
END_FILE
BEGIN_FILE: tests/integration/test_body_size_limit_413.py
# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations
import pytest
from httpx import AsyncClient
from app.main import app
@pytest.mark.asyncio
async def test_body_size_limit_returns_structured_413(monkeypatch) -> None:
# Force a small limit to keep the test cheap/deterministic.
monkeypatch.setenv("MAX_BODY_BYTES", "1024")
client = AsyncClient(app=app, base_url="http://test")
try:
payload = b"x" * 2048
r = await client.post("/v1/webhooks/stripe", content=payload, headers={"content-length": str(len(payload))})
assert r.status_code == 413
data = r.json()
assert data["ok"] is False
assert data["error"]["code"] == "payload_too_large"
finally:
await client.aclose()
END_FILE
END_REPO
MicroBin by Dániel Szabó and the FOSS Community. Let's keep the Web compact, accessible and humane!