Raw Text Content Remove
snake-jaguar-bird



BEGIN_REPO: apexai-submanager (v2.1.12) — FULL CHAT DUMP (NO DIFFS, NO PARTIALS)

REPO_TREE: apexai-submanager/ ├── LICENSE ├── README.md ├── API.md ├── SECURITY.md ├── OPS_RUNBOOK.md ├── CHANGELOG.md ├── pyproject.toml ├── ruff.toml ├── mypy.ini ├── pytest.ini ├── .gitignore ├── .dockerignore ├── .env.example ├── Dockerfile ├── docker-compose.yml ├── alembic.ini ├── migrations/ │   ├── env.py │   ├── script.py.mako │   └── versions/ │       └── 0001_initial_schema.py ├── admin_ui/ │   ├── index.html │   └── app.js ├── src/ │   └── app/ │       ├── init.py │       ├── main.py │       ├── settings.py │       ├── errors.py │       ├── schemas.py │       ├── middleware.py │       ├── logging.py │       ├── db.py │       ├── models.py │       ├── infra/ │       │   ├── init.py │       │   ├── redis.py │       │   ├── resilience.py │       │   ├── rate_limit.py │       │   └── locks.py │       ├── security/ │       │   ├── init.py │       │   ├── compare.py │       │   ├── admin.py │       │   ├── webhook.py │       │   └── webhook_sig.py │       ├── services/ │       │   ├── init.py │       │   ├── plans.py │       │   └── subscriptions.py │       └── routes/ │           ├── init.py │           ├── health.py │           ├── admin.py │           └── webhooks.py └── tests/ ├── init.py ├── conftest.py ├── unit/ │   ├── init.py │   ├── test_admin_sessions.py │   ├── test_circuit_breaker.py │   └── test_trusted_proxy_ip.py └── integration/ ├── init.py ├── test_webhook_secret_rotation.py └── test_body_size_limit_413.py

BEGIN_FILE: LICENSE

Copyright (c) 2026 ApexAI Official

All rights reserved.

This software and associated documentation files (the "Software") are proprietary and confidential.
Unauthorized copying, distribution, modification, sublicensing, hosting, or deployment of this Software,
via any medium, is strictly prohibited without an explicit written agreement from ApexAI Official.

NO LICENSE IS GRANTED except by a separate commercial agreement.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.

END_FILE

BEGIN_FILE: README.md

# ApexAI SubManager

Production-grade subscription management service for Telegram-native SaaS bots.

## What it does
- Accepts payment confirmation webhooks (provider allowlist + signature verification + replay protection)
- Rate limits webhook intake (per provider + per client IP; client IP hashed in Redis keys)
- Grants subscriptions idempotently and atomically (single DB transaction)
- Admin Ops Plane:
  - server-side sessions (opaque cookie SID; session stored in Redis)
  - CSRF protection
  - progressive delay + lockout on repeated auth failures
- Health probes: `/v1/health/live`, `/v1/health/ready`
- Strict request body size limit (default 256 KiB)

## Quick start (local)
1. Copy env:
   ```bash
   cp .env.example .env

2. Fill required secrets:

ADMIN_TOKEN

ADMIN_TOKEN_FINGERPRINT_SECRET

WEBHOOK_SIGNATURE_SECRETS



3. Run:

docker compose up --build


4. Admin UI:

open admin_ui/index.html in your browser

set API_BASE in the UI if not running on http://localhost:8080




Notes on proxies (IMPORTANT)

If you deploy behind reverse proxies/load balancers and want per-IP rate limiting to use X-Forwarded-For, configure:

TRUSTED_PROXY_NETS (comma-separated CIDRs)

TRUSTED_PROXY_HOPS (int)


If the immediate peer IP is NOT within TRUSTED_PROXY_NETS, X-Forwarded-For is ignored.

License

Proprietary — see LICENSE.

END_FILE

BEGIN_FILE: API.md
```md
# API

Base prefix: `/v1`

## Health
- `GET /v1/health/live`
- `GET /v1/health/ready`

## Webhooks
- `POST /v1/webhooks/{provider}`

Headers:
- `X-Apex-Signature: v1=<hex>`
- `X-Apex-Timestamp: <unix seconds>`
- `X-Apex-Nonce: <random>`

## Admin
- `POST /v1/admin/login`
- `POST /v1/admin/logout`
- `GET  /v1/admin/stats`

Admin endpoints require:
- session cookie `__Host-admin_session`
- CSRF header `X-CSRF-Token` matching `__Host-csrf_token` cookie

END_FILE

BEGIN_FILE: SECURITY.md

# Security

## Admin authentication
- Server-side session stored in Redis.
- Browser receives only an opaque session id cookie.
- Progressive delay + lockout on repeated failures.

## Cookies
- Session cookie: `__Host-admin_session` (Secure, Path=/, HttpOnly)
- CSRF cookie: `__Host-csrf_token` (Secure, Path=/)

## Webhook security
- Provider allowlist (`ALLOWED_PROVIDERS`)
- Signature verification supports rotation (`WEBHOOK_SIGNATURE_SECRETS`)
- Timestamp skew bound (`WEBHOOK_TS_SKEW_SEC`)
- Nonce replay protection (`WEBHOOK_NONCE_TTL_SEC`)
- Rate limiting: provider + client IP (IP hashed; XFF honored only for trusted proxies)

## Headers
Security headers middleware sets:
- X-Content-Type-Options: nosniff
- X-Frame-Options: DENY
- Content-Security-Policy: default-src 'none'; frame-ancestors 'none'
- Strict-Transport-Security (prod only)
- Permissions-Policy: geolocation=(), microphone=(), camera=()

END_FILE

BEGIN_FILE: OPS_RUNBOOK.md

# Ops Runbook

## Local / compose
- `docker compose up --build`
- migrations run via `migrate` service automatically.

## Environment variables
See `.env.example`.

## Redis key prefixing
All keys are namespaced via `REDIS_KEY_PREFIX`.

## Secret rotation (webhooks)
Set `WEBHOOK_SIGNATURE_SECRETS` to `old_secret,new_secret`, deploy, then remove old.

## Troubleshooting
- 401 on admin endpoints:
  - session expired or missing
  - CSRF token missing/mismatch
- 429 on webhooks:
  - rate limiting active; verify trusted proxy settings if behind LB
- 409 replay:
  - nonce already used within TTL

END_FILE

BEGIN_FILE: CHANGELOG.md

# Changelog

## 2.1.12
- Add: Alembic migrations (version tracking) replacing single raw SQL bootstrap
- Add: Multi-stage Dockerfile (runtime image excludes dev deps) + non-root user + HEALTHCHECK
- Fix: Trusted proxy/XFF model hardened (validated IPs, trusted net parsing, hop logic)
- Add: Integration tests (signature rotation + 413 envelope)

## 2.1.11
- Fix: trusted proxy validation for client IP extraction (prevents XFF spoof rate-limit bypass)
- Fix: CSRF cookie now uses __Host- prefix with Secure + Path=/
- Fix: Body size middleware returns structured 413 envelope
- Fix: Lock release failures now logged (warning)
- Improvement: Security headers add HSTS (prod) + Permissions-Policy
- Admin UI: display API error.reason + request_id when available
- DB: plans.duration_days supported (defaults to 30)

END_FILE

BEGIN_FILE: pyproject.toml

[project]
name = "apexai-submanager"
version = "2.1.12"
description = "ApexAI SubManager - production-grade FastAPI subscription manager"
requires-python = "==3.12.*"
dependencies = [
  "fastapi==0.115.6",
  "uvicorn==0.34.0",
  "pydantic==2.10.4",
  "pydantic-settings==2.7.0",
  "sqlalchemy==2.0.36",
  "asyncpg==0.30.0",
  "redis==5.2.1",
  "alembic==1.14.1",
]

[project.optional-dependencies]
dev = [
  "pytest==8.3.4",
  "pytest-asyncio==0.25.0",
  "pytest-cov==6.0.0",
  "mypy==1.13.0",
  "ruff==0.8.4",
  "httpx==0.28.1",
]

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
markers = [
  "integration: requires live postgres+redis via env (DATABASE_URL/REDIS_URL)",
]

[tool.coverage.run]
branch = true
source = ["src"]

[tool.ruff]
line-length = 110
target-version = "py312"

[tool.mypy]
python_version = "3.12"
strict = true
mypy_path = ["src"]
warn_unused_ignores = true

END_FILE

BEGIN_FILE: ruff.toml

line-length = 110
target-version = "py312"

[lint]
select = ["E", "F", "I", "B", "UP", "SIM"]
ignore = ["E501"]

END_FILE

BEGIN_FILE: mypy.ini

[mypy]
python_version = 3.12
strict = True
mypy_path = src
warn_unused_ignores = True

END_FILE

BEGIN_FILE: pytest.ini

[pytest]
asyncio_mode = auto

END_FILE

BEGIN_FILE: .gitignore

__pycache__/
*.pyc
.venv/
.env
.coverage
htmlcov/
dist/
build/

END_FILE

BEGIN_FILE: .dockerignore

.git/
__pycache__/
*.pyc
.venv/
.env
.coverage
htmlcov/
dist/
build/
tests/

END_FILE

BEGIN_FILE: .env.example

APP_ENV=local
PORT=8080
LOG_LEVEL=INFO

DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/postgres
REDIS_URL=redis://redis:6379/0
REDIS_KEY_PREFIX=apexai:submanager:

ADMIN_TOKEN=CHANGE_ME_32CHARS_MIN________________________________
ADMIN_TOKEN_FINGERPRINT_SECRET=CHANGE_ME_32CHARS_MIN_________________________
WEBHOOK_SIGNATURE_SECRETS=CHANGE_ME_32CHARS_MIN________________________________

COOKIE_SECURE=false
COOKIE_SAMESITE=lax
ADMIN_SESSION_TTL_SEC=3600

ALLOWED_PROVIDERS=stripe
WEBHOOK_TS_SKEW_SEC=300
WEBHOOK_NONCE_TTL_SEC=900
WEBHOOK_RL_RATE=30
WEBHOOK_RL_BURST=60

DB_OP_TIMEOUT_SEC=5.0
DB_CONCURRENCY=20
DB_SEM_TIMEOUT_SEC=2.0
MAX_BODY_BYTES=262144

# Trusted proxy configuration (for X-Forwarded-For)
TRUSTED_PROXY_NETS=127.0.0.1/32,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16
TRUSTED_PROXY_HOPS=1

END_FILE

BEGIN_FILE: Dockerfile

# syntax=docker/dockerfile:1

FROM python:3.12-slim AS builder

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

WORKDIR /app

RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

COPY pyproject.toml /app/pyproject.toml
COPY src /app/src
COPY alembic.ini /app/alembic.ini
COPY migrations /app/migrations

RUN pip install --no-cache-dir -U pip && \
    pip install --no-cache-dir .

FROM python:3.12-slim AS runtime

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

RUN addgroup --system --gid 1001 app && \
    adduser --system --uid 1001 --gid 1001 --no-create-home app

WORKDIR /app

COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

COPY admin_ui /app/admin_ui
COPY alembic.ini /app/alembic.ini
COPY migrations /app/migrations
COPY src /app/src

RUN chown -R app:app /app

USER app

EXPOSE 8080

HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
  CMD python -c "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8080/v1/health/live', timeout=2).read()" || exit 1

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"]

END_FILE

BEGIN_FILE: docker-compose.yml

services:
  db:
    image: postgres:16
    environment:
      POSTGRES_PASSWORD: postgres
    ports:
      - "5432:5432"
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 3s
      timeout: 3s
      retries: 20

  redis:
    image: redis:7
    ports:
      - "6379:6379"
    healthcheck:
      test: ["CMD", "redis-cli", "PING"]
      interval: 3s
      timeout: 3s
      retries: 20

  migrate:
    build: .
    env_file: .env
    depends_on:
      db:
        condition: service_healthy
    command: ["alembic", "upgrade", "head"]
    restart: "no"

  api:
    build: .
    env_file: .env
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_healthy
      migrate:
        condition: service_completed_successfully
    ports:
      - "8080:8080"

END_FILE

BEGIN_FILE: alembic.ini

[alembic]
script_location = migrations
prepend_sys_path = .
version_path_separator = os
sqlalchemy.url = driver://user:pass@localhost/dbname

[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = INFO
handlers = console

[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s

END_FILE

BEGIN_FILE: migrations/env.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

import os
from logging.config import fileConfig
from typing import Any

from alembic import context
from sqlalchemy import engine_from_config, pool

from app.models import Base

config = context.config
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

target_metadata = Base.metadata


def _get_db_url() -> str:
    url = os.getenv("DATABASE_URL", "").strip()
    if not url:
        raise RuntimeError("DATABASE_URL is required for migrations")
    return url


def run_migrations_offline() -> None:
    url = _get_db_url()
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        compare_type=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()


def run_migrations_online() -> None:
    section = config.get_section(config.config_ini_section) or {}
    section = dict(section)
    section["sqlalchemy.url"] = _get_db_url()

    connectable = engine_from_config(
        section,
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata,
            compare_type=True,
        )

        with context.begin_transaction():
            context.run_migrations()


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

END_FILE

BEGIN_FILE: migrations/script.py.mako

"""${message}

Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}

"""
from __future__ import annotations

from alembic import op
import sqlalchemy as sa


revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}


def upgrade() -> None:
    ${upgrades if upgrades else "pass"}


def downgrade() -> None:
    ${downgrades if downgrades else "pass"}

END_FILE

BEGIN_FILE: migrations/versions/0001_initial_schema.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

from alembic import op
import sqlalchemy as sa


revision = "0001_initial_schema"
down_revision = None
branch_labels = None
depends_on = None


def upgrade() -> None:
    op.create_table(
        "plans",
        sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True),
        sa.Column("code", sa.String(length=64), nullable=False),
        sa.Column("currency", sa.String(length=8), nullable=False),
        sa.Column("amount", sa.BigInteger(), nullable=False),
        sa.Column("duration_days", sa.Integer(), nullable=False, server_default="30"),
        sa.Column("is_active", sa.Boolean(), nullable=False, server_default=sa.text("true")),
        sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")),
        sa.UniqueConstraint("code", name="uq_plans_code"),
    )

    op.create_table(
        "payments",
        sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True),
        sa.Column("provider", sa.String(length=32), nullable=False),
        sa.Column("provider_payment_id", sa.String(length=128), nullable=False),
        sa.Column("tg_user_id", sa.BigInteger(), nullable=False),
        sa.Column("plan_code", sa.String(length=64), nullable=False),
        sa.Column("currency", sa.String(length=8), nullable=False),
        sa.Column("amount", sa.BigInteger(), nullable=False),
        sa.Column("status", sa.String(length=32), nullable=False),
        sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")),
        sa.UniqueConstraint("provider", "provider_payment_id", name="uq_payments_provider_payment"),
    )

    op.create_table(
        "subscriptions",
        sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True),
        sa.Column("tg_user_id", sa.BigInteger(), nullable=False),
        sa.Column("plan_code", sa.String(length=64), nullable=False),
        sa.Column("status", sa.String(length=32), nullable=False),
        sa.Column("starts_at", sa.DateTime(timezone=True), nullable=False),
        sa.Column("ends_at", sa.DateTime(timezone=True), nullable=False),
        sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")),
        sa.Index("ix_subscriptions_tg_user_id", "tg_user_id"),
        sa.Index("ix_subscriptions_status", "status"),
    )


def downgrade() -> None:
    op.drop_table("subscriptions")
    op.drop_table("payments")
    op.drop_table("plans")

END_FILE

BEGIN_FILE: admin_ui/index.html

<!doctype html>
<html lang="en">
  <head>
    <meta charset="utf-8" />
    <title>ApexAI SubManager Admin</title>
    <meta name="viewport" content="width=device-width, initial-scale=1" />
    <style>
      body { font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Arial; margin: 24px; }
      input, button { padding: 10px; margin: 6px 0; }
      .row { display: flex; gap: 8px; align-items: center; }
      .card { border: 1px solid #ddd; border-radius: 10px; padding: 14px; margin-top: 12px; }
      .muted { color: #666; font-size: 12px; }
      pre { background: #f7f7f7; padding: 12px; border-radius: 8px; overflow: auto; }
    </style>
  </head>
  <body>
    <h1>ApexAI SubManager Admin</h1>

    <div class="card">
      <div class="muted">API_BASE (default http://localhost:8080)</div>
      <input id="apiBase" placeholder="http://localhost:8080" style="width: 420px" />
      <div class="muted">Admin Token</div>
      <input id="token" placeholder="ADMIN_TOKEN" style="width: 420px" />
      <button id="loginBtn">Login</button>
      <button id="logoutBtn">Logout</button>
      <div id="loginStatus" class="muted"></div>
    </div>

    <div class="card">
      <button id="statsBtn">Fetch Stats</button>
      <pre id="statsOut"></pre>
    </div>

    <script src="app.js"></script>
  </body>
</html>

END_FILE

BEGIN_FILE: admin_ui/app.js

/* Copyright (c) 2026 ApexAI Official. All rights reserved. */
function apiBase() {
  const v = document.getElementById("apiBase").value.trim();
  return v || "http://localhost:8080";
}

function setStatus(msg) {
  document.getElementById("loginStatus").textContent = msg;
}

async function apiFetch(path, opts) {
  const base = apiBase();
  const res = await fetch(base + path, opts);
  let data = null;
  try {
    data = await res.json();
  } catch (_) {
    // ignore
  }
  if (!res.ok) {
    const reason = (data && data.error && data.error.reason) ? data.error.reason : `HTTP ${res.status}`;
    const rid = (data && data.error && data.error.request_id) ? data.error.request_id : "n/a";
    throw new Error(`${reason} (request_id=${rid})`);
  }
  return data;
}

function getCsrf() {
  const cookie = document.cookie.split(";").map(s => s.trim());
  const kv = cookie.find(s => s.startsWith("__Host-csrf_token="));
  if (!kv) return "";
  return decodeURIComponent(kv.split("=").slice(1).join("="));
}

document.getElementById("loginBtn").onclick = async () => {
  const tok = document.getElementById("token").value.trim();
  try {
    await apiFetch("/v1/admin/login", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ token: tok })
    });
    setStatus("Logged in (session cookie set).");
  } catch (e) {
    setStatus("Login failed: " + e.message);
  }
};

document.getElementById("logoutBtn").onclick = async () => {
  try {
    const csrf = getCsrf();
    await apiFetch("/v1/admin/logout", {
      method: "POST",
      headers: { "X-CSRF-Token": csrf }
    });
    setStatus("Logged out.");
  } catch (e) {
    setStatus("Logout failed: " + e.message);
  }
};

document.getElementById("statsBtn").onclick = async () => {
  const out = document.getElementById("statsOut");
  out.textContent = "Loading...";
  try {
    const csrf = getCsrf();
    const data = await apiFetch("/v1/admin/stats", {
      method: "GET",
      headers: { "X-CSRF-Token": csrf }
    });
    out.textContent = JSON.stringify(data, null, 2);
  } catch (e) {
    out.textContent = "Error: " + e.message;
  }
};

END_FILE

BEGIN_FILE: src/app/init.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

__all__ = ["__version__"]

__version__ = "2.1.12"

END_FILE

BEGIN_FILE: src/app/settings.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

from pydantic import Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore")

    APP_ENV: str = Field(default="local")
    PORT: int = Field(default=8080, ge=1, le=65535)
    LOG_LEVEL: str = Field(default="INFO")

    DATABASE_URL: str
    REDIS_URL: str
    REDIS_KEY_PREFIX: str = Field(default="apexai:submanager:")

    ADMIN_TOKEN: str
    ADMIN_TOKEN_FINGERPRINT_SECRET: str
    WEBHOOK_SIGNATURE_SECRETS: str

    COOKIE_SECURE: bool = Field(default=False)
    COOKIE_SAMESITE: str = Field(default="lax")
    ADMIN_SESSION_TTL_SEC: int = Field(default=3600, ge=60, le=86400)

    ALLOWED_PROVIDERS: str = Field(default="stripe")
    WEBHOOK_TS_SKEW_SEC: int = Field(default=300, ge=30, le=3600)
    WEBHOOK_NONCE_TTL_SEC: int = Field(default=900, ge=60, le=86400)
    WEBHOOK_RL_RATE: int = Field(default=30, ge=1, le=100000)
    WEBHOOK_RL_BURST: int = Field(default=60, ge=1, le=200000)

    DB_OP_TIMEOUT_SEC: float = Field(default=5.0, gt=0.05, le=120.0)
    DB_CONCURRENCY: int = Field(default=20, ge=1, le=500)
    DB_SEM_TIMEOUT_SEC: float = Field(default=2.0, gt=0.05, le=30.0)

    MAX_BODY_BYTES: int = Field(default=262144, ge=1024, le=2_097_152)

    TRUSTED_PROXY_NETS: str = Field(default="")
    TRUSTED_PROXY_HOPS: int = Field(default=0, ge=0, le=20)

    @field_validator("ADMIN_TOKEN", "ADMIN_TOKEN_FINGERPRINT_SECRET")
    @classmethod
    def _min_admin_secret_len(cls, v: str) -> str:
        v = v.strip()
        if len(v) < 32:
            raise ValueError("admin secrets must be at least 32 chars")
        return v

    @field_validator("WEBHOOK_SIGNATURE_SECRETS")
    @classmethod
    def _validate_webhook_secrets(cls, v: str) -> str:
        parts = [p.strip() for p in v.split(",") if p.strip()]
        if not parts:
            raise ValueError("WEBHOOK_SIGNATURE_SECRETS must contain at least one secret")
        for p in parts:
            if len(p) < 32:
                raise ValueError("each webhook secret must be at least 32 chars")
        return ",".join(parts)

    @field_validator("COOKIE_SAMESITE")
    @classmethod
    def _validate_samesite(cls, v: str) -> str:
        vv = v.strip().lower()
        if vv not in {"lax", "strict", "none"}:
            raise ValueError("COOKIE_SAMESITE must be lax|strict|none")
        return vv

    def allowed_providers_set(self) -> set[str]:
        return {p.strip().lower() for p in self.ALLOWED_PROVIDERS.split(",") if p.strip()}

    def webhook_secrets_list(self) -> list[str]:
        return [p.strip() for p in self.WEBHOOK_SIGNATURE_SECRETS.split(",") if p.strip()]

END_FILE

BEGIN_FILE: src/app/errors.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

from dataclasses import dataclass


@dataclass(frozen=True)
class AppError(Exception):
    code: str
    reason: str
    http_status: int = 400


@dataclass(frozen=True)
class UnauthorizedError(AppError):
    code: str = "unauthorized"
    reason: str = "unauthorized"
    http_status: int = 401


@dataclass(frozen=True)
class ForbiddenError(AppError):
    code: str = "forbidden"
    reason: str = "forbidden"
    http_status: int = 403


@dataclass(frozen=True)
class NotFoundError(AppError):
    code: str = "not_found"
    reason: str = "not_found"
    http_status: int = 404


@dataclass(frozen=True)
class ConflictError(AppError):
    code: str = "conflict"
    reason: str = "conflict"
    http_status: int = 409


@dataclass(frozen=True)
class RateLimitedError(AppError):
    code: str = "rate_limited"
    reason: str = "rate_limited"
    http_status: int = 429


@dataclass(frozen=True)
class PayloadTooLargeError(AppError):
    code: str = "payload_too_large"
    reason: str = "payload too large"
    http_status: int = 413


@dataclass(frozen=True)
class BadRequestError(AppError):
    code: str = "bad_request"
    reason: str = "bad request"
    http_status: int = 400

END_FILE

BEGIN_FILE: src/app/schemas.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

from pydantic import BaseModel, Field

from app import __version__


class ErrorInfo(BaseModel):
    code: str
    reason: str
    request_id: str


class ErrorResponse(BaseModel):
    ok: bool = Field(default=False)
    error: ErrorInfo


class OkResponse(BaseModel):
    ok: bool = Field(default=True)
    api_version: str = Field(default=__version__)


class AdminLoginRequest(BaseModel):
    token: str


class WebhookEvent(BaseModel):
    schema_version: int = Field(default=1, ge=1, le=10)
    provider_payment_id: str = Field(min_length=1, max_length=128)
    tg_user_id: int = Field(ge=1)
    plan_code: str = Field(min_length=1, max_length=64)
    currency: str = Field(min_length=2, max_length=8)
    amount: int = Field(ge=0)

END_FILE

BEGIN_FILE: src/app/logging.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

import json
import logging
import os
from datetime import datetime, timezone
from typing import Any


class JsonFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord) -> str:
        payload: dict[str, Any] = {
            "ts": datetime.now(timezone.utc).isoformat(timespec="milliseconds"),
            "level": record.levelname,
            "name": record.name,
            "msg": record.getMessage(),
        }
        if record.exc_info:
            payload["exc_info"] = self.formatException(record.exc_info)
        return json.dumps(payload, separators=(",", ":"), ensure_ascii=False)


def setup_logging() -> None:
    level = os.getenv("LOG_LEVEL", "INFO").strip().upper()
    root = logging.getLogger()
    root.setLevel(level)

    handler = logging.StreamHandler()
    fmt = os.getenv("APP_ENV", "local").strip().lower()
    if fmt in {"prod", "production"}:
        handler.setFormatter(JsonFormatter())
    else:
        handler.setFormatter(logging.Formatter("%(levelname)s [%(name)s] %(message)s"))
    root.handlers = [handler]

END_FILE

BEGIN_FILE: src/app/db.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

from collections.abc import AsyncIterator

from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine

from app.settings import Settings

_engine: AsyncEngine | None = None
_sessionmaker: async_sessionmaker[AsyncSession] | None = None


def create_engine(settings: Settings) -> None:
    global _engine, _sessionmaker
    _engine = create_async_engine(
        settings.DATABASE_URL,
        pool_pre_ping=True,
        pool_recycle=1800,
        pool_size=20,
        max_overflow=30,
    )
    _sessionmaker = async_sessionmaker(_engine, expire_on_commit=False)


def get_engine() -> AsyncEngine:
    if _engine is None:
        raise RuntimeError("DB engine not initialized")
    return _engine


async def close_engine() -> None:
    global _engine, _sessionmaker
    if _engine is not None:
        await _engine.dispose()
    _engine = None
    _sessionmaker = None


async def session_scope() -> AsyncIterator[AsyncSession]:
    if _sessionmaker is None:
        raise RuntimeError("DB sessionmaker not initialized")
    async with _sessionmaker() as sess:
        yield sess

END_FILE

BEGIN_FILE: src/app/models.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

from datetime import datetime

from sqlalchemy import BigInteger, Boolean, DateTime, Integer, String, UniqueConstraint, func, Index
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


class Base(DeclarativeBase):
    pass


class Plan(Base):
    __tablename__ = "plans"

    id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
    code: Mapped[str] = mapped_column(String(64), nullable=False, unique=True)
    currency: Mapped[str] = mapped_column(String(8), nullable=False)
    amount: Mapped[int] = mapped_column(BigInteger, nullable=False)
    duration_days: Mapped[int] = mapped_column(Integer, nullable=False, default=30)
    is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)


class Payment(Base):
    __tablename__ = "payments"
    __table_args__ = (UniqueConstraint("provider", "provider_payment_id", name="uq_payments_provider_payment"),)

    id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
    provider: Mapped[str] = mapped_column(String(32), nullable=False)
    provider_payment_id: Mapped[str] = mapped_column(String(128), nullable=False)
    tg_user_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
    plan_code: Mapped[str] = mapped_column(String(64), nullable=False)
    currency: Mapped[str] = mapped_column(String(8), nullable=False)
    amount: Mapped[int] = mapped_column(BigInteger, nullable=False)
    status: Mapped[str] = mapped_column(String(32), nullable=False)
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)


class Subscription(Base):
    __tablename__ = "subscriptions"
    __table_args__ = (
        Index("ix_subscriptions_tg_user_id", "tg_user_id"),
        Index("ix_subscriptions_status", "status"),
    )

    id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
    tg_user_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
    plan_code: Mapped[str] = mapped_column(String(64), nullable=False)
    status: Mapped[str] = mapped_column(String(32), nullable=False)
    starts_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
    ends_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)

END_FILE

BEGIN_FILE: src/app/middleware.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response

from app.errors import PayloadTooLargeError
from app.schemas import ErrorInfo, ErrorResponse


class BodySizeLimitMiddleware(BaseHTTPMiddleware):
    def __init__(self, app: object, max_bytes: int) -> None:
        super().__init__(app)
        self._max = max_bytes

    async def dispatch(self, request: Request, call_next) -> Response:
        try:
            cl = request.headers.get("content-length")
            if cl is not None and int(cl) > self._max:
                raise PayloadTooLargeError(reason=f"payload exceeds limit {self._max} bytes")
            body = await request.body()
            if len(body) > self._max:
                raise PayloadTooLargeError(reason=f"payload exceeds limit {self._max} bytes")
            request._body = body  # type: ignore[attr-defined]
            return await call_next(request)
        except PayloadTooLargeError as e:
            req_id = request.headers.get("x-request-id", "unknown")
            payload = ErrorResponse(error=ErrorInfo(code=e.code, reason=e.reason, request_id=req_id)).model_dump()
            return Response(
                content=__import__("json").dumps(payload),
                status_code=e.http_status,
                media_type="application/json",
            )


class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    def __init__(self, app: object, app_env: str) -> None:
        super().__init__(app)
        self._env = app_env.strip().lower()

    async def dispatch(self, request: Request, call_next) -> Response:
        resp: Response = await call_next(request)
        resp.headers["X-Content-Type-Options"] = "nosniff"
        resp.headers["X-Frame-Options"] = "DENY"
        resp.headers["Content-Security-Policy"] = "default-src 'none'; frame-ancestors 'none'"
        resp.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"
        if self._env in {"prod", "production"}:
            resp.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
        return resp

END_FILE

BEGIN_FILE: src/app/infra/init.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.

END_FILE

BEGIN_FILE: src/app/infra/redis.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

from redis.asyncio import Redis

from app.settings import Settings

_redis: Redis | None = None


async def create_redis(settings: Settings) -> None:
    global _redis
    _redis = Redis.from_url(settings.REDIS_URL, decode_responses=False)


def get_redis() -> Redis:
    if _redis is None:
        raise RuntimeError("Redis not initialized")
    return _redis


async def close_redis() -> None:
    global _redis
    if _redis is not None:
        await _redis.aclose()
    _redis = None

END_FILE

BEGIN_FILE: src/app/infra/resilience.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

import asyncio
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import TypeVar

from sqlalchemy.exc import OperationalError

T = TypeVar("T")


async def with_db_retry(fn: Callable[[], Awaitable[T]], *, attempts: int = 3, base_delay: float = 0.05) -> T:
    last: Exception | None = None
    for i in range(attempts):
        try:
            return await fn()
        except (OperationalError, ConnectionError, TimeoutError) as e:
            last = e
            if i == attempts - 1:
                raise
            await asyncio.sleep(base_delay * (2**i))
    raise RuntimeError("unreachable") from last


@dataclass
class CircuitBreaker:
    failure_threshold: int = 5
    recovery_timeout: float = 30.0

    _failures: int = 0
    _open_until: float = 0.0
    _lock: asyncio.Lock = asyncio.Lock()

    async def call(self, fn: Callable[[], Awaitable[T]]) -> T:
        async with self._lock:
            now = asyncio.get_running_loop().time()
            if self._open_until > now:
                raise RuntimeError("circuit_open")
            if self._open_until != 0.0 and self._open_until <= now:
                self._open_until = 0.0
                self._failures = 0

        try:
            res = await fn()
        except Exception:
            async with self._lock:
                self._failures += 1
                if self._failures >= self.failure_threshold:
                    self._open_until = asyncio.get_running_loop().time() + self.recovery_timeout
            raise
        return res

END_FILE

BEGIN_FILE: src/app/infra/rate_limit.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

import hashlib
from dataclasses import dataclass

from redis.asyncio import Redis


_LUA = r"""
local key = KEYS[1]
local now_ms = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local burst = tonumber(ARGV[3])

local data = redis.call("HMGET", key, "tokens", "ts")
local tokens = tonumber(data[1])
local ts = tonumber(data[2])

if tokens == nil then tokens = burst end
if ts == nil then ts = now_ms end

local delta = math.max(0, now_ms - ts)
local refill = (delta / 1000.0) * rate
tokens = math.min(burst, tokens + refill)
ts = now_ms

local allowed = 0
if tokens >= 1.0 then
  tokens = tokens - 1.0
  allowed = 1
end

redis.call("HMSET", key, "tokens", tokens, "ts", ts)
redis.call("PEXPIRE", key, math.ceil((burst / rate) * 1000) + 1000)

return allowed
""".strip()


@dataclass(frozen=True)
class TokenBucketLimiter:
    redis: Redis
    key_prefix: str
    rate: int
    burst: int

    async def allow(self, key: str, *, now_ms: int) -> bool:
        full_key = (self.key_prefix + key).encode("utf-8")
        allowed = await self.redis.eval(_LUA, 1, full_key, now_ms, self.rate, self.burst)
        return bool(int(allowed))

    @staticmethod
    def hash_ip(ip: str) -> str:
        return hashlib.sha256(ip.encode("utf-8")).hexdigest()[:24]

END_FILE

BEGIN_FILE: src/app/infra/locks.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

import logging
import secrets
from dataclasses import dataclass

from redis.asyncio import Redis

log = logging.getLogger("app.locks")


@dataclass(frozen=True)
class RedisLock:
    redis: Redis
    key: str
    ttl_sec: int = 10

    async def acquire(self) -> str | None:
        token = secrets.token_hex(16)
        ok = await self.redis.set(self.key, token.encode("utf-8"), nx=True, ex=self.ttl_sec)
        return token if ok else None

    async def release(self, token: str) -> None:
        lua = r"""
        if redis.call("GET", KEYS[1]) == ARGV[1] then
          return redis.call("DEL", KEYS[1])
        end
        return 0
        """.strip()
        try:
            await self.redis.eval(lua, 1, self.key.encode("utf-8"), token.encode("utf-8"))
        except Exception:
            log.warning("lock_release_failed", exc_info=True)

END_FILE

BEGIN_FILE: src/app/security/init.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.

END_FILE

BEGIN_FILE: src/app/security/compare.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

import hmac


def constant_time_eq(a: str, b: str) -> bool:
    return hmac.compare_digest(a.encode("utf-8"), b.encode("utf-8"))

END_FILE

BEGIN_FILE: src/app/security/admin.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

import hashlib
import secrets
from dataclasses import dataclass

from redis.asyncio import Redis
from starlette.requests import Request
from starlette.responses import Response

from app.errors import ForbiddenError, UnauthorizedError
from app.security.compare import constant_time_eq
from app.settings import Settings

SESSION_COOKIE = "__Host-admin_session"
CSRF_COOKIE = "__Host-csrf_token"


@dataclass(frozen=True)
class AdminSession:
    sid: str
    fp: str


def _fp(token: str, secret: str) -> str:
    h = hashlib.sha256()
    h.update(secret.encode("utf-8"))
    h.update(b":")
    h.update(token.encode("utf-8"))
    return h.hexdigest()


def _session_key(prefix: str, sid: str) -> str:
    return f"{prefix}admin:sess:{sid}"


def _fails_key(prefix: str) -> str:
    return f"{prefix}admin:fails"


def _locked_key(prefix: str) -> str:
    return f"{prefix}admin:locked"


async def _check_lockout(r: Redis, prefix: str) -> None:
    locked = await r.get(_locked_key(prefix).encode("utf-8"))
    if locked:
        raise ForbiddenError(code="locked", reason="admin locked out", http_status=403)


async def _record_failure(r: Redis, prefix: str, *, max_fails: int = 5, lock_ttl_sec: int = 600) -> None:
    k = _fails_key(prefix).encode("utf-8")
    fails = await r.incr(k)
    await r.expire(k, 3600)
    if int(fails) >= max_fails:
        await r.set(_locked_key(prefix).encode("utf-8"), b"1", ex=lock_ttl_sec)


async def _reset_failures(r: Redis, prefix: str) -> None:
    await r.delete(_fails_key(prefix).encode("utf-8"))


def _cookie_params(settings: Settings) -> dict[str, object]:
    return {
        "secure": bool(settings.COOKIE_SECURE),
        "samesite": settings.COOKIE_SAMESITE,
        "path": "/",
    }


async def authenticate_admin_token(r: Redis, settings: Settings, token: str) -> AdminSession:
    await _check_lockout(r, settings.REDIS_KEY_PREFIX)

    expected = settings.ADMIN_TOKEN.strip()
    if not constant_time_eq(token.strip(), expected):
        await _record_failure(r, settings.REDIS_KEY_PREFIX)
        raise UnauthorizedError(reason="bad token")

    await _reset_failures(r, settings.REDIS_KEY_PREFIX)

    sid = secrets.token_hex(32)
    fpv = _fp(expected, settings.ADMIN_TOKEN_FINGERPRINT_SECRET.strip())
    key = _session_key(settings.REDIS_KEY_PREFIX, sid).encode("utf-8")
    await r.set(key, fpv.encode("utf-8"), ex=settings.ADMIN_SESSION_TTL_SEC)
    return AdminSession(sid=sid, fp=fpv)


async def get_admin_session(r: Redis, settings: Settings, request: Request) -> AdminSession:
    sid = request.cookies.get(SESSION_COOKIE, "")
    if not sid:
        raise UnauthorizedError(reason="missing session")

    key = _session_key(settings.REDIS_KEY_PREFIX, sid).encode("utf-8")
    raw = await r.get(key)
    if not raw:
        raise UnauthorizedError(reason="session expired")

    fpv = raw.decode("utf-8", errors="ignore")
    return AdminSession(sid=sid, fp=fpv)


def set_session_cookies(resp: Response, settings: Settings, sid: str, csrf: str) -> None:
    params = _cookie_params(settings)
    resp.set_cookie(SESSION_COOKIE, sid, httponly=True, **params)
    resp.set_cookie(CSRF_COOKIE, csrf, httponly=False, **params)


async def clear_session(r: Redis, settings: Settings, request: Request, resp: Response) -> None:
    sid = request.cookies.get(SESSION_COOKIE, "")
    if sid:
        await r.delete(_session_key(settings.REDIS_KEY_PREFIX, sid).encode("utf-8"))
    params = _cookie_params(settings)
    resp.delete_cookie(SESSION_COOKIE, path=str(params["path"]))
    resp.delete_cookie(CSRF_COOKIE, path=str(params["path"]))


def new_csrf_token() -> str:
    return secrets.token_hex(32)

END_FILE

BEGIN_FILE: src/app/security/webhook_sig.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

import hashlib
import hmac
import time

from redis.asyncio import Redis

from app.errors import ConflictError, UnauthorizedError
from app.security.compare import constant_time_eq
from app.settings import Settings


def _get_header(headers: dict[str, str], name: str) -> str:
    for k, v in headers.items():
        if k.lower() == name.lower():
            return v
    return ""


def _sig_v1(secret: str, ts: str, nonce: str, body: bytes) -> str:
    mac = hmac.new(secret.encode("utf-8"), digestmod=hashlib.sha256)
    mac.update(ts.encode("utf-8"))
    mac.update(b":")
    mac.update(nonce.encode("utf-8"))
    mac.update(b":")
    mac.update(body)
    return mac.hexdigest()


async def verify_webhook_signature(r: Redis, settings: Settings, headers: dict[str, str], body: bytes) -> None:
    sig = _get_header(headers, "x-apex-signature") or _get_header(headers, "x-signature")
    ts = _get_header(headers, "x-apex-timestamp") or _get_header(headers, "x-timestamp")
    nonce = _get_header(headers, "x-apex-nonce") or _get_header(headers, "x-nonce")

    if not sig or not ts or not nonce:
        raise UnauthorizedError(reason="missing signature headers")

    if not sig.startswith("v1="):
        raise UnauthorizedError(reason="bad signature format")

    try:
        ts_i = int(ts)
    except ValueError as e:
        raise UnauthorizedError(reason="bad timestamp") from e

    now = int(time.time())
    if abs(now - ts_i) > settings.WEBHOOK_TS_SKEW_SEC:
        raise UnauthorizedError(reason="timestamp skew")

    nonce_key = f"{settings.REDIS_KEY_PREFIX}wh:nonce:{nonce}".encode("utf-8")
    ok = await r.set(nonce_key, b"1", nx=True, ex=settings.WEBHOOK_NONCE_TTL_SEC)
    if not ok:
        raise ConflictError(reason="replay detected")

    want = sig.split("=", 1)[1]
    for secret in settings.webhook_secrets_list():
        got = _sig_v1(secret, ts, nonce, body)
        if constant_time_eq(want, got):
            return

    raise UnauthorizedError(reason="bad signature")

END_FILE

BEGIN_FILE: src/app/security/webhook.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

from app.errors import BadRequestError


def validate_currency(currency: str) -> str:
    cur = currency.strip().upper()
    if not (2 <= len(cur) <= 8) or not cur.isalnum():
        raise BadRequestError(reason="invalid currency")
    return cur

END_FILE

BEGIN_FILE: src/app/services/init.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.

END_FILE

BEGIN_FILE: src/app/services/plans.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

import time
from dataclasses import dataclass

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.models import Plan


@dataclass
class CachedPlan:
    plan: Plan
    expires_at: float


class PlanCache:
    def __init__(self, ttl_sec: float = 60.0) -> None:
        self._ttl = ttl_sec
        self._cache: dict[str, CachedPlan] = {}

    def get(self, code: str) -> Plan | None:
        now = time.time()
        c = self._cache.get(code)
        if not c:
            return None
        if c.expires_at <= now:
            self._cache.pop(code, None)
            return None
        return c.plan

    def put(self, plan: Plan) -> None:
        self._cache[plan.code] = CachedPlan(plan=plan, expires_at=time.time() + self._ttl)


async def get_plan_by_code(sess: AsyncSession, cache: PlanCache, code: str) -> Plan | None:
    code = code.strip()
    if not code:
        return None
    cached = cache.get(code)
    if cached is not None:
        return cached

    res = await sess.execute(select(Plan).where(Plan.code == code, Plan.is_active.is_(True)))
    plan = res.scalar_one_or_none()
    if plan is not None:
        cache.put(plan)
    return plan

END_FILE

BEGIN_FILE: src/app/services/subscriptions.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime, timedelta, timezone

from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession

from app.errors import BadRequestError, ConflictError
from app.models import Payment, Subscription
from app.security.webhook import validate_currency
from app.services.plans import PlanCache, get_plan_by_code


@dataclass(frozen=True)
class GrantResult:
    created: bool
    subscription_id: int


async def grant_subscription_atomic(
    sess: AsyncSession,
    cache: PlanCache,
    *,
    provider: str,
    provider_payment_id: str,
    tg_user_id: int,
    plan_code: str,
    currency: str,
    amount: int,
) -> GrantResult:
    provider = provider.strip().lower()
    plan_code = plan_code.strip()
    if not provider or not provider_payment_id or not plan_code:
        raise BadRequestError(reason="missing fields")

    cur = validate_currency(currency)

    plan = await get_plan_by_code(sess, cache, plan_code)
    if plan is None:
        raise BadRequestError(reason="unknown plan")

    if plan.currency.upper() != cur or int(plan.amount) != int(amount):
        raise BadRequestError(reason=f"amount mismatch: expected {plan.amount} {plan.currency}, got {amount} {cur}")

    now = datetime.now(timezone.utc)

    async with sess.begin():
        existing = await sess.execute(
            select(Payment).where(Payment.provider == provider, Payment.provider_payment_id == provider_payment_id)
        )
        if existing.scalar_one_or_none() is not None:
            raise ConflictError(reason="duplicate payment")

        pay = Payment(
            provider=provider,
            provider_payment_id=provider_payment_id,
            tg_user_id=tg_user_id,
            plan_code=plan.code,
            currency=cur,
            amount=amount,
            status="confirmed",
        )
        sess.add(pay)

        sub = Subscription(
            tg_user_id=tg_user_id,
            plan_code=plan.code,
            status="active",
            starts_at=now,
            ends_at=now + timedelta(days=int(plan.duration_days)),
        )
        sess.add(sub)

        try:
            await sess.flush()
        except IntegrityError as e:
            raise ConflictError(reason="idempotency conflict") from e

        return GrantResult(created=True, subscription_id=int(sub.id))

END_FILE

BEGIN_FILE: src/app/routes/init.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.

END_FILE

BEGIN_FILE: src/app/routes/health.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

import logging

from fastapi import APIRouter
from sqlalchemy import text

from app.db import get_engine
from app.infra.redis import get_redis
from app.schemas import OkResponse

log = logging.getLogger("app.health")
router = APIRouter(prefix="/v1/health", tags=["health"])


@router.get("/live", response_model=OkResponse)
async def live() -> OkResponse:
    return OkResponse()


@router.get("/ready", response_model=OkResponse)
async def ready() -> OkResponse:
    try:
        eng = get_engine()
        async with eng.connect() as conn:
            await conn.execute(text("SELECT 1"))
        r = get_redis()
        await r.ping()
        return OkResponse()
    except Exception:
        log.exception("readiness_failed")
        raise

END_FILE

BEGIN_FILE: src/app/routes/admin.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

from datetime import datetime, timezone

from fastapi import APIRouter, Depends, Request
from starlette.responses import JSONResponse

from app.errors import AppError, UnauthorizedError
from app.infra.redis import get_redis
from app.schemas import AdminLoginRequest, OkResponse
from app.security.admin import (
    clear_session,
    get_admin_session,
    new_csrf_token,
    set_session_cookies,
    authenticate_admin_token,
)
from app.settings import Settings

router = APIRouter(prefix="/v1/admin", tags=["admin"])


def _settings(request: Request) -> Settings:
    return request.app.state.settings  # type: ignore[no-any-return]


def _csrf_or_401(req: Request) -> None:
    want = req.cookies.get("__Host-csrf_token", "")
    got = req.headers.get("x-csrf-token", "")
    if not want or not got or want != got:
        raise UnauthorizedError(reason="csrf mismatch")


async def _require_admin(req: Request) -> None:
    _csrf_or_401(req)
    s = _settings(req)
    r = get_redis()
    await get_admin_session(r, s, req)


@router.post("/login", response_model=OkResponse)
async def login(req: Request, body: AdminLoginRequest) -> JSONResponse:
    s = _settings(req)
    r = get_redis()
    sess = await authenticate_admin_token(r, s, body.token)
    csrf = new_csrf_token()
    resp = JSONResponse(OkResponse().model_dump())
    set_session_cookies(resp, s, sess.sid, csrf)
    return resp


@router.post("/logout", response_model=OkResponse)
async def logout(req: Request) -> JSONResponse:
    await _require_admin(req)
    s = _settings(req)
    r = get_redis()
    resp = JSONResponse(OkResponse().model_dump())
    await clear_session(r, s, req, resp)
    return resp


@router.get("/stats", response_model=dict)
async def stats(req: Request) -> dict:
    await _require_admin(req)
    now = datetime.now(timezone.utc)
    return {
        "ok": True,
        "now": now.isoformat(),
    }


@router.exception_handler(AppError)  # type: ignore[misc]
async def app_error_handler(_: Request, exc: AppError) -> JSONResponse:
    return JSONResponse(
        status_code=exc.http_status,
        content={"ok": False, "error": {"code": exc.code, "reason": exc.reason, "request_id": "n/a"}},
    )

END_FILE

BEGIN_FILE: src/app/routes/webhooks.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

import ipaddress
import time
from typing import Any

from fastapi import APIRouter, Request
from starlette.responses import JSONResponse

from app.errors import AppError, BadRequestError, RateLimitedError
from app.infra.rate_limit import TokenBucketLimiter
from app.infra.redis import get_redis
from app.schemas import OkResponse, WebhookEvent
from app.security.webhook_sig import verify_webhook_signature
from app.services.plans import PlanCache
from app.services.subscriptions import grant_subscription_atomic
from app.settings import Settings
from app.db import session_scope

router = APIRouter(prefix="/v1/webhooks", tags=["webhooks"])

_plan_cache = PlanCache(ttl_sec=60.0)


def _parse_trusted_proxy_nets(settings: Settings) -> list[ipaddress._BaseNetwork]:
    raw = settings.TRUSTED_PROXY_NETS.strip()
    if not raw:
        return []
    nets: list[ipaddress._BaseNetwork] = []
    for part in [p.strip() for p in raw.split(",") if p.strip()]:
        nets.append(ipaddress.ip_network(part, strict=False))
    return nets


def _peer_is_trusted(peer: str, nets: list[ipaddress._BaseNetwork]) -> bool:
    try:
        ip = ipaddress.ip_address(peer)
    except ValueError:
        return False
    return any(ip in n for n in nets)


def _valid_ip(s: str) -> str | None:
    try:
        ipaddress.ip_address(s)
        return s
    except ValueError:
        return None


def _client_ip(request: Request, settings: Settings) -> str:
    peer = request.client.host if request.client else ""
    if not peer:
        return "unknown"

    nets = _parse_trusted_proxy_nets(settings)
    if not nets:
        return peer

    if not _peer_is_trusted(peer, nets):
        return peer

    xff = request.headers.get("x-forwarded-for", "")
    if not xff:
        return peer

    parts = [p.strip() for p in xff.split(",") if p.strip()]
    parts_valid = [p for p in ( _valid_ip(p) for p in parts ) if p is not None]
    if not parts_valid:
        return peer

    hops = int(settings.TRUSTED_PROXY_HOPS)
    if hops <= 0:
        return peer
    if len(parts_valid) <= hops:
        return peer

    cand = parts_valid[-hops - 1]
    return cand


def _settings(req: Request) -> Settings:
    return req.app.state.settings  # type: ignore[no-any-return]


@router.post("/{provider}", response_model=OkResponse)
async def webhook(provider: str, req: Request) -> JSONResponse:
    s = _settings(req)
    prov = provider.strip().lower()
    if prov not in s.allowed_providers_set():
        raise BadRequestError(reason="provider not allowed")

    body = await req.body()
    headers = {k: v for (k, v) in req.headers.items()}

    r = get_redis()
    await verify_webhook_signature(r, s, headers, body)

    ip = _client_ip(req, s)
    ip_hash = TokenBucketLimiter.hash_ip(ip)
    limiter = TokenBucketLimiter(
        redis=r,
        key_prefix=s.REDIS_KEY_PREFIX,
        rate=s.WEBHOOK_RL_RATE,
        burst=s.WEBHOOK_RL_BURST,
    )
    key = f"wh:rl:{prov}:{ip_hash}"
    ok = await limiter.allow(key, now_ms=int(time.time() * 1000))
    if not ok:
        raise RateLimitedError(reason="rate limited")

    try:
        data: Any = __import__("json").loads(body.decode("utf-8"))
        evt = WebhookEvent.model_validate(data)
    except Exception as e:
        raise BadRequestError(reason="invalid payload") from e

    async for sess in session_scope():
        res = await grant_subscription_atomic(
            sess,
            _plan_cache,
            provider=prov,
            provider_payment_id=evt.provider_payment_id,
            tg_user_id=evt.tg_user_id,
            plan_code=evt.plan_code,
            currency=evt.currency,
            amount=evt.amount,
        )
        return JSONResponse(OkResponse().model_dump() | {"subscription_id": res.subscription_id})

    raise RuntimeError("unreachable")


@router.exception_handler(AppError)  # type: ignore[misc]
async def app_error_handler(req: Request, exc: AppError) -> JSONResponse:
    rid = req.headers.get("x-request-id", "unknown")
    return JSONResponse(
        status_code=exc.http_status,
        content={"ok": False, "error": {"code": exc.code, "reason": exc.reason, "request_id": rid}},
    )

END_FILE

BEGIN_FILE: src/app/main.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

from contextlib import asynccontextmanager

from fastapi import FastAPI, Request
from starlette.responses import JSONResponse

from app.errors import AppError
from app.db import close_engine, create_engine
from app.infra.redis import close_redis, create_redis
from app.logging import setup_logging
from app.middleware import BodySizeLimitMiddleware, SecurityHeadersMiddleware
from app.routes.admin import router as admin_router
from app.routes.health import router as health_router
from app.routes.webhooks import router as webhooks_router
from app.settings import Settings
from app.schemas import ErrorInfo, ErrorResponse


@asynccontextmanager
async def lifespan(app: FastAPI):
    setup_logging()
    settings = Settings()
    app.state.settings = settings

    create_engine(settings)
    await create_redis(settings)
    yield
    await close_redis()
    await close_engine()


app = FastAPI(lifespan=lifespan)
app.add_middleware(BodySizeLimitMiddleware, max_bytes=Settings().MAX_BODY_BYTES)  # safe: env read only
app.add_middleware(SecurityHeadersMiddleware, app_env=Settings().APP_ENV)

app.include_router(health_router)
app.include_router(webhooks_router)
app.include_router(admin_router)


@app.exception_handler(AppError)  # type: ignore[misc]
async def app_error_handler(req: Request, exc: AppError) -> JSONResponse:
    rid = req.headers.get("x-request-id", "unknown")
    payload = ErrorResponse(error=ErrorInfo(code=exc.code, reason=exc.reason, request_id=rid)).model_dump()
    return JSONResponse(status_code=exc.http_status, content=payload)


@app.get("/v1", include_in_schema=False)
async def root() -> dict[str, object]:
    return {"ok": True}

END_FILE

BEGIN_FILE: tests/init.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.

END_FILE

BEGIN_FILE: tests/conftest.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

import os

import pytest


def pytest_configure(config: pytest.Config) -> None:
    config.addinivalue_line("markers", "integration: requires live postgres+redis via env")


@pytest.fixture(scope="session")
def integration_enabled() -> bool:
    return os.getenv("INTEGRATION", "").strip() == "1"

END_FILE

BEGIN_FILE: tests/unit/init.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.

END_FILE

BEGIN_FILE: tests/unit/test_admin_sessions.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

import pytest
from redis.asyncio import Redis

from app.security.admin import authenticate_admin_token
from app.settings import Settings


@pytest.mark.asyncio
async def test_admin_session_cookie_is_opaque(monkeypatch) -> None:
    s = Settings(
        DATABASE_URL="postgresql+asyncpg://x:x@localhost:5432/x",
        REDIS_URL="redis://localhost:6379/0",
        ADMIN_TOKEN="a" * 32,
        ADMIN_TOKEN_FINGERPRINT_SECRET="b" * 32,
        WEBHOOK_SIGNATURE_SECRETS="c" * 32,
    )
    r = Redis.from_url("redis://localhost:6379/15", decode_responses=False)
    try:
        await r.flushdb()
        sess = await authenticate_admin_token(r, s, "a" * 32)
        assert len(sess.sid) == 64
        assert sess.sid != "a" * 32
    finally:
        await r.aclose()

END_FILE

BEGIN_FILE: tests/unit/test_circuit_breaker.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

import pytest

from app.infra.resilience import CircuitBreaker


@pytest.mark.asyncio
async def test_circuit_breaker_opens_and_recovers() -> None:
    cb = CircuitBreaker(failure_threshold=2, recovery_timeout=0.01)
    calls = {"n": 0}

    async def boom():
        calls["n"] += 1
        raise RuntimeError("x")

    with pytest.raises(RuntimeError):
        await cb.call(boom)
    with pytest.raises(RuntimeError):
        await cb.call(boom)

    with pytest.raises(RuntimeError):
        await cb.call(boom)

    import asyncio

    await asyncio.sleep(0.02)

    async def ok():
        return 1

    assert await cb.call(ok) == 1

END_FILE

BEGIN_FILE: tests/unit/test_trusted_proxy_ip.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

from types import SimpleNamespace

from app.routes.webhooks import _client_ip
from app.settings import Settings


class DummyReq:
    def __init__(self, peer: str, xff: str) -> None:
        self.client = SimpleNamespace(host=peer)
        self.headers = {"x-forwarded-for": xff} if xff else {}


def test_client_ip_ignores_xff_without_trusted_nets() -> None:
    s = Settings(
        DATABASE_URL="postgresql+asyncpg://x:x@localhost:5432/x",
        REDIS_URL="redis://localhost:6379/0",
        ADMIN_TOKEN="a" * 32,
        ADMIN_TOKEN_FINGERPRINT_SECRET="b" * 32,
        WEBHOOK_SIGNATURE_SECRETS="c" * 32,
        TRUSTED_PROXY_NETS="",
        TRUSTED_PROXY_HOPS=1,
    )
    req = DummyReq(peer="8.8.8.8", xff="1.2.3.4")
    assert _client_ip(req, s) == "8.8.8.8"


def test_client_ip_uses_xff_when_peer_trusted_and_hops_configured() -> None:
    s = Settings(
        DATABASE_URL="postgresql+asyncpg://x:x@localhost:5432/x",
        REDIS_URL="redis://localhost:6379/0",
        ADMIN_TOKEN="a" * 32,
        ADMIN_TOKEN_FINGERPRINT_SECRET="b" * 32,
        WEBHOOK_SIGNATURE_SECRETS="c" * 32,
        TRUSTED_PROXY_NETS="10.0.0.0/8",
        TRUSTED_PROXY_HOPS=1,
    )
    req = DummyReq(peer="10.2.3.4", xff="1.1.1.1, 10.2.3.4")
    assert _client_ip(req, s) == "1.1.1.1"


def test_client_ip_rejects_invalid_xff_ips() -> None:
    s = Settings(
        DATABASE_URL="postgresql+asyncpg://x:x@localhost:5432/x",
        REDIS_URL="redis://localhost:6379/0",
        ADMIN_TOKEN="a" * 32,
        ADMIN_TOKEN_FINGERPRINT_SECRET="b" * 32,
        WEBHOOK_SIGNATURE_SECRETS="c" * 32,
        TRUSTED_PROXY_NETS="10.0.0.0/8",
        TRUSTED_PROXY_HOPS=1,
    )
    req = DummyReq(peer="10.2.3.4", xff="not-an-ip, also-bad, 10.2.3.4")
    assert _client_ip(req, s) == "10.2.3.4"

END_FILE

BEGIN_FILE: tests/integration/init.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.

END_FILE

BEGIN_FILE: tests/integration/test_webhook_secret_rotation.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

import hashlib
import hmac
import json
import time

import pytest
from redis.asyncio import Redis

from app.security.webhook_sig import verify_webhook_signature
from app.settings import Settings


def _sign(secret: str, ts: str, nonce: str, body: bytes) -> str:
    mac = hmac.new(secret.encode("utf-8"), digestmod=hashlib.sha256)
    mac.update(ts.encode("utf-8"))
    mac.update(b":")
    mac.update(nonce.encode("utf-8"))
    mac.update(b":")
    mac.update(body)
    return mac.hexdigest()


@pytest.mark.asyncio
@pytest.mark.integration
async def test_signature_rotation_accepts_old_and_new(integration_enabled: bool) -> None:
    if not integration_enabled:
        pytest.skip("INTEGRATION=1 required")

    s = Settings(
        DATABASE_URL="postgresql+asyncpg://x:x@localhost:5432/x",
        REDIS_URL="redis://localhost:6379/0",
        ADMIN_TOKEN="a" * 32,
        ADMIN_TOKEN_FINGERPRINT_SECRET="b" * 32,
        WEBHOOK_SIGNATURE_SECRETS=("o" * 32) + "," + ("n" * 32),
    )

    r = Redis.from_url(s.REDIS_URL, decode_responses=False)
    try:
        await r.flushdb()
        body = json.dumps(
            {
                "schema_version": 1,
                "provider_payment_id": "p1",
                "tg_user_id": 1,
                "plan_code": "pro",
                "currency": "USD",
                "amount": 100,
            }
        ).encode("utf-8")
        ts = str(int(time.time()))
        nonce = "nonce1"
        headers_old = {
            "x-apex-signature": "v1=" + _sign("o" * 32, ts, nonce, body),
            "x-apex-timestamp": ts,
            "x-apex-nonce": nonce,
        }
        await verify_webhook_signature(r, s, headers_old, body)

        nonce2 = "nonce2"
        headers_new = {
            "x-apex-signature": "v1=" + _sign("n" * 32, ts, nonce2, body),
            "x-apex-timestamp": ts,
            "x-apex-nonce": nonce2,
        }
        await verify_webhook_signature(r, s, headers_new, body)
    finally:
        await r.aclose()

END_FILE

BEGIN_FILE: tests/integration/test_body_size_limit_413.py

# Copyright (c) 2026 ApexAI Official. All rights reserved.
from __future__ import annotations

import pytest
from httpx import AsyncClient

from app.main import app


@pytest.mark.asyncio
async def test_body_size_limit_returns_structured_413(monkeypatch) -> None:
    # Force a small limit to keep the test cheap/deterministic.
    monkeypatch.setenv("MAX_BODY_BYTES", "1024")
    client = AsyncClient(app=app, base_url="http://test")
    try:
        payload = b"x" * 2048
        r = await client.post("/v1/webhooks/stripe", content=payload, headers={"content-length": str(len(payload))})
        assert r.status_code == 413
        data = r.json()
        assert data["ok"] is False
        assert data["error"]["code"] == "payload_too_large"
    finally:
        await client.aclose()

END_FILE

END_REPO

MicroBin by Dániel Szabó and the FOSS Community. Let's keep the Web compact, accessible and humane!