so. many. changes. (sorry)

This commit is contained in:
Micha R. Albert 2025-07-17 11:54:48 -04:00
parent d445a13646
commit 4a4d5fe4dd
Signed by: mra
SSH key fingerprint: SHA256:vjiZInsq3FRnDJk1YYWFhC/N62SAmVmY5H5wvViHhdg
20 changed files with 1692 additions and 161 deletions

View file

@ -2,7 +2,7 @@
.git
.gitignore
# Python
# Python cache and build artifacts
__pycache__
*.pyc
*.pyo
@ -16,6 +16,8 @@ pip-delete-this-directory.txt
.coverage.*
.pytest_cache
htmlcov
*.egg-info/
.eggs/
# Virtual environments
venv/
@ -43,16 +45,26 @@ Thumbs.db
*.db
*.sqlite
*.sqlite3
.env.local
.env.development
.env.test
.env
.env.*
node_modules/
dist/
build/
dump.rdb
# Documentation (keep Dockerfile.dev for development builds)
# Documentation and extra Dockerfiles
DOCKER.md
docs/
Dockerfile.slim
Dockerfile.distroless
Dockerfile.ultra-minimal
Dockerfile.micro
Dockerfile.nano
Dockerfile.minimal
docker-compose*.yml
test_*.py
tests/
DEVELOPMENT.md
# Logs
*.log
@ -61,3 +73,36 @@ logs/
# Temporary files
tmp/
temp/
# Development tools
.mypy_cache/
.ruff_cache/
.black
.isort.cfg
mypy.ini
setup.cfg
tox.ini
.pre-commit-config.yaml
.flake8
.bandit
.safety
.hypothesis/
.nox/
# Additional exclusions for ultra-minimal images
*.orig
*.rej
*.bak
*.backup
*.tmp
*.temp
\#*\#
/.emacs.desktop
/.emacs.desktop.lock
*.elc
auto-save-list
tramp
.\#*
*.sublime-project
*.sublime-workspace
*.code-workspace

View file

@ -4,6 +4,7 @@
SLACK_CLIENT_ID=your_slack_client_id_here
SLACK_CLIENT_SECRET=your_slack_client_secret_here
SLACK_SIGNING_SECRET=your_slack_signing_secret_here
SLACK_BOLT_IS_BROKEN_SO_THIS_CANT_BE_CALLED_CLIENT_SECRET=your_slack_client_secret_here
# Airtable Configuration
AIRTABLE_PAT=your_airtable_personal_access_token_here
@ -12,7 +13,6 @@ AIRTABLE_SUBMISSIONS_TABLE=your_submissions_table_id_here
AIRTABLE_USERS_TABLE=your_users_table_id_here
AIRTABLE_SESSIONS_TABLE=your_sessions_table_id_here
AIRTABLE_ITEMS_TABLE=your_items_table_id_here
AIRTABLE_ITEM_ADDONS_TABLE=your_item_addons_table_id_here
AIRTABLE_ITEM_INSTANCES_TABLE=your_item_instances_table_id_here
# Application Settings

View file

@ -1,47 +1,69 @@
# Use Python 3.13 slim image
FROM python:3.13-slim
# Ultra-minimal Docker image using distroless
FROM python:3.13-slim AS builder
# Set working directory
WORKDIR /app
# Install system dependencies and build tools
RUN apt-get update && apt-get install -y \
curl \
# Install build dependencies and Python
RUN apt-get update && apt-get install -y --no-install-recommends \
python3-venv \
gcc \
g++ \
build-essential \
python3-dev \
libc6-dev \
zlib1g \
&& rm -rf /var/lib/apt/lists/*
# Install Hatch
RUN pip install --no-cache-dir hatch
WORKDIR /app
# Copy project files
# Create virtual environment in /usr/local
RUN python3 -m venv /usr/local
ENV PATH="/usr/local/bin:$PATH"
# Install hatch and generate requirements
RUN pip install --no-cache-dir hatch
COPY pyproject.toml ./
COPY LICENSE ./
COPY README.md ./
RUN hatch dep show requirements > requirements.txt
RUN pip install --no-cache-dir --compile -r requirements.txt
# Copy source and build
COPY src/ ./src/
COPY templates/ ./templates/
COPY LICENSE README.md ./
RUN hatch build -t wheel
RUN pip install --no-cache-dir --compile dist/*.whl
# Install project and dependencies using Hatch
RUN hatch build -t wheel && \
pip install --no-cache-dir dist/*.whl && \
rm -rf dist/ build/
# Clean up build artifacts and unnecessary files
RUN find /usr/local -name "*.pyc" -delete && \
find /usr/local -name "__pycache__" -type d -exec rm -rf {} + && \
find /usr/local -name "*.pyo" -delete && \
find /usr/local -name "tests" -type d -exec rm -rf {} + && \
find /usr/local -name "test" -type d -exec rm -rf {} + && \
find /usr/local -name "*.egg-info" -type d -exec rm -rf {} + && \
find /usr/local -name "*.dist-info" -type d -exec rm -rf {} + && \
rm -rf /usr/local/share/man /usr/local/share/doc
# Create non-root user for security
RUN useradd --create-home --shell /bin/bash app \
&& chown -R app:app /app
USER app
RUN rm -rf /usr/local/lib/python3.13/site-packages/pip* && \
rm -rf /usr/local/lib/python3.13/site-packages/virtualenv* && \
rm -rf /usr/local/lib/python3.13/site-packages/hatch* && \
rm -rf /usr/local/lib/python3.13/site-packages/hatchling
# Set environment variable to indicate container environment
ENV DOCKER_CONTAINER=1
RUN rm -rf /usr/local/bin/uv
# Using distroless as a main runtime image
FROM gcr.io/distroless/cc-debian12:nonroot
# Copy Python interpreter and the package from the builder stage
COPY --from=builder /usr/local /usr/local
COPY --from=builder /usr/lib/x86_64-linux-gnu/libz.so.1.2.13 /usr/lib/x86_64-linux-gnu/libz.so.1
COPY --from=builder /usr/lib/x86_64-linux-gnu/libsqlite3.so.0.8.6 /usr/lib/x86_64-linux-gnu/libsqlite3.so.0
COPY --from=builder /app/templates /app/templates/
# Set environment variables
ENV PATH="/usr/local/bin:$PATH"
ENV PYTHONPATH="/usr/local/lib/python3.13/site-packages"
ENV PYTHONUNBUFFERED=1
# Run as non-root
USER nonroot
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/ || exit 1
EXPOSE 80
# Run the application
CMD ["uvicorn", "random_access.main:app", "--host", "0.0.0.0", "--port", "8000"]
ENTRYPOINT ["/usr/local/bin/python", "-m", "uvicorn", "random_access.main:app", "--host", "0.0.0.0", "--port", "80"]

View file

@ -2,28 +2,24 @@ services:
api:
build: .
ports:
- "8000:8000"
- "8000:80"
environment:
# Redis configuration
- REDIS_HOST=valkey
- REDIS_PORT=6379
# Docker environment flag
- DOCKER_CONTAINER=1
# Override environment for production
- ENVIRONMENT=production
env_file: .env
depends_on:
- valkey
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/"]
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/')"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
valkey:
image: valkey/valkey:7-alpine
image: valkey/valkey:8-alpine
restart: unless-stopped
healthcheck:
test: ["CMD", "valkey-cli", "ping"]

View file

@ -10,7 +10,6 @@ keywords = ["fastapi", "api", "authentication", "redis", "airtable", "async"]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.12",
@ -22,18 +21,14 @@ classifiers = [
dependencies = [
"fastapi~=0.115.12",
"uvicorn[standard]~=0.34.2",
"click~=8.2.1",
"argon2-cffi~=23.1.0",
"tortoise-orm[accel]~=0.25.0",
"slack-bolt~=1.23.0",
"python-dotenv==1.1.0",
"aiohttp~=3.12.11",
"pyairtable~=3.1.1",
"python-jose[cryptography]~=3.5.0",
"valkey[libvalkey]~=6.1.0",
"slowapi~=0.1.9",
"aiocache[redis]~=0.12.3",
"pydantic-settings~=2.10.1"
"pydantic-settings~=2.10.1",
"jinja2~=3.1.6",
"python-multipart~=0.0.20",
]
requires-python = ">=3.12"
@ -64,9 +59,6 @@ docs = [
"mkdocs-material>=9.0.0",
]
[project.scripts]
random-access-server = "random_access.cli:cli"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

View file

@ -8,7 +8,8 @@ from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from pyairtable.formulas import match
from random_access.settings import settings
from database import get_session_by_token_cached
from settings import settings
# Create HTTPBearer security scheme
security = HTTPBearer(
@ -25,8 +26,6 @@ def hash_token(token: str) -> str:
async def get_session_by_token(token: str, sessions_table) -> dict | None:
"""Get a session by its hashed token (now using cached version)."""
from random_access.database import get_session_by_token_cached
return await get_session_by_token_cached(token, sessions_table)

View file

@ -1,18 +0,0 @@
import click
import uvicorn
from random_access.main import app
@click.group()
def cli():
"""Random Access Server CLI."""
pass
@cli.command()
@click.option("--host", default="127.0.0.1", help="Host to bind to.")
@click.option("--port", default=8000, help="Port to bind to.")
def run(host, port):
"""Run the FastAPI app."""
uvicorn.run(app, host=host, port=port)

View file

@ -12,7 +12,7 @@ from aiocache.serializers import PickleSerializer
from pyairtable import Api as AirtableApi
from pyairtable.formulas import match
from random_access.settings import settings
from settings import settings
logger = logging.getLogger("uvicorn.error")
@ -22,8 +22,8 @@ write_queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue()
# Expected table schemas for validation
EXPECTED_SCHEMAS = {
"users": {
"required_fields": ["Slack ID", "Display Name", "Email"],
"optional_fields": ["Last Login", "Created"],
"required_fields": ["Slack ID", "First Name", "Last Name", "Email"],
"optional_fields": ["Last Login", "Registered At"],
},
"sessions": {
"required_fields": ["Token"],
@ -41,10 +41,6 @@ EXPECTED_SCHEMAS = {
"required_fields": ["ID", "User", "Item"],
"optional_fields": ["Acquired"],
},
"item_addons": {
"required_fields": ["Name"],
"optional_fields": ["Description", "Item"],
},
}
@ -56,7 +52,6 @@ def get_airtable_base():
def get_table(base, name: str):
"""Get a specific Airtable table."""
table = getattr(settings, f"airtable_{name}_table")
print(table)
return base.table(table)
@ -287,6 +282,117 @@ async def get_item_by_id(item_id: str, items_table) -> dict | None:
return None
# SLACK COMMAND FUNCTIONS (Short TTL for real-time feel)
@cached(
ttl=10, # 10 seconds for Slack commands
cache=Cache.REDIS, # type: ignore
serializer=PickleSerializer(),
endpoint=settings.redis_host,
port=settings.redis_port,
namespace="slack_commands",
key_builder=lambda f, *args, **kwargs: _generate_cache_key(
f.__name__, *args, **kwargs
),
)
async def get_all_games(submissions_table) -> list[dict]:
"""Get all games from submissions table (cached for Slack)."""
logger.info("Fetching all games from Airtable for Slack command")
try:
games = submissions_table.all()
return [dict(game) for game in games]
except Exception as e:
logger.error(f"Error fetching games for Slack: {e}")
return []
@cached(
ttl=10, # 10 seconds for Slack commands
cache=Cache.REDIS, # type: ignore
serializer=PickleSerializer(),
endpoint=settings.redis_host,
port=settings.redis_port,
namespace="slack_commands",
key_builder=lambda f, *args, **kwargs: _generate_cache_key(
f.__name__, *args, **kwargs
),
)
async def get_user_by_slack_id(slack_user_id: str, users_table) -> dict | None:
"""Get user by Slack ID (cached for Slack commands)."""
logger.info(f"Fetching user by Slack ID {slack_user_id} for Slack command")
try:
user = users_table.first(formula=match({"Slack ID": slack_user_id}))
return dict(user) if user else None
except Exception as e:
logger.error(f"Error fetching user by Slack ID: {e}")
return None
@cached(
ttl=10, # 10 seconds for Slack commands
cache=Cache.REDIS, # type: ignore
serializer=PickleSerializer(),
endpoint=settings.redis_host,
port=settings.redis_port,
namespace="slack_commands",
key_builder=lambda f, *args, **kwargs: _generate_cache_key(
f.__name__, *args, **kwargs
),
)
async def get_user_sessions(user_id: str, sessions_table) -> list[dict]:
"""Get all sessions for a user (cached for Slack commands)."""
logger.info(f"Fetching sessions for user {user_id} for Slack command")
try:
all_sessions = sessions_table.all()
user_sessions = []
for session in all_sessions:
user_field = session.get("fields", {}).get("User", [])
if user_field and user_id in user_field:
user_sessions.append(dict(session))
return user_sessions
except Exception as e:
logger.error(f"Error fetching user sessions: {e}")
return []
@cached(
ttl=10, # 10 seconds for Slack commands
cache=Cache.REDIS, # type: ignore
serializer=PickleSerializer(),
endpoint=settings.redis_host,
port=settings.redis_port,
namespace="slack_commands",
key_builder=lambda f, *args, **kwargs: _generate_cache_key(
f.__name__, *args, **kwargs
),
)
async def get_detailed_user_items_for_slack(
user_id: str, item_instances_table, items_table
) -> list[dict]:
"""Get detailed user items with item info for Slack commands (cached)."""
logger.info(f"Fetching detailed user items for user {user_id} for Slack command")
try:
# Get user's item instances
user_instances = await get_user_items(user_id, item_instances_table)
# Get detailed item info for each instance
detailed_items = []
for instance in user_instances:
item_ids = instance.get("fields", {}).get("Item", [])
if item_ids:
item_id = item_ids[0]
item = await get_item_by_id(item_id, items_table)
if item:
detailed_items.append({"instance": instance, "item": item})
return detailed_items
except Exception as e:
logger.error(f"Error fetching detailed user items for Slack: {e}")
return []
# WRITE OPERATIONS (Queued)
@ -332,6 +438,14 @@ async def add_item_to_user(item_id: str, user_id: str, item_instances_table):
logger.error(f"Error adding item {item_id} to user {user_id}: {e}")
async def create_user(fields: dict, users_table):
"""Create a new user in Airtable (immediate write for registration)."""
# User creation needs immediate response for registration flow, so we do it synchronously
user = users_table.create(fields=fields)
logger.info(f"Created user: {user['id']}")
return user
async def update_user_last_login(user_id: str, users_table):
"""Update user's last login timestamp (queued write)."""
fields = {"Last Login": datetime.datetime.now().isoformat()}
@ -416,3 +530,61 @@ async def airtable_write_worker():
def get_write_queue() -> asyncio.Queue:
"""Get the global write queue."""
return write_queue
async def check_display_name_exists(display_name: str, users_table) -> bool:
"""Check if a display name already exists in the users table."""
logger.info(f"Checking if display name '{display_name}' exists")
try:
# Search for existing user with the same display name
existing_user = users_table.first(formula=match({"Display Name": display_name}))
return existing_user is not None
except Exception as e:
logger.error(f"Error checking display name: {e}")
# Return True to be safe - assume it exists if we can't check
return True
async def cleanup_expired_sessions_worker(sessions_table):
"""Background worker to clean up expired sessions."""
# Import here to avoid circular imports (auth imports database)
from auth_utils import is_session_expired
# Run cleanup immediately at startup
logger.info("Starting initial expired session cleanup at startup")
await _cleanup_expired_sessions(sessions_table, is_session_expired)
# Then run cleanup every hour
while True:
try:
await asyncio.sleep(3600) # Wait 1 hour
logger.info("Starting scheduled expired session cleanup")
await _cleanup_expired_sessions(sessions_table, is_session_expired)
except Exception as e:
logger.error(f"Error in cleanup_expired_sessions_worker: {e}")
await asyncio.sleep(300) # Wait 5 minutes before retrying on error
async def _cleanup_expired_sessions(sessions_table, is_session_expired_func):
"""Internal function to perform the actual session cleanup."""
try:
# Get all sessions
sessions = sessions_table.all()
expired_sessions = []
for session in sessions:
if is_session_expired_func(session):
expired_sessions.append(session['id'])
# Delete expired sessions
if expired_sessions:
logger.info(f"Deleting {len(expired_sessions)} expired sessions")
for session_id in expired_sessions:
await queue_airtable_write("delete", sessions_table, record_id=session_id)
else:
logger.info("No expired sessions found")
except Exception as e:
logger.error(f"Error during session cleanup: {e}")
raise

View file

@ -0,0 +1,208 @@
"""
Comprehensive logging configuration for the Random Access API.
Integrates with uvicorn's existing logging system and adds enhanced features.
Log Level Strategy:
- ERROR: Server errors (5xx), exceptions, critical failures
- WARNING: Client errors (4xx), slow requests (>1s), rate limits, security issues
- INFO: Application lifecycle (startup/shutdown), important events
- DEBUG: Request/response details, database operations, detailed flow
The default is INFO level with standard format for clean, focused logging.
Use VERBOSE_LOGGING=true and LOG_LEVEL=DEBUG for detailed troubleshooting.
"""
import json
import logging
import logging.handlers
from pathlib import Path
from typing import Any, Dict
from settings import settings
class JSONFormatter(logging.Formatter):
"""JSON formatter for structured logging."""
def format(self, record: logging.LogRecord) -> str:
"""Format log record as JSON."""
log_entry = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# Add exception info if present
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
# Add extra fields from the record
for key, value in record.__dict__.items():
if key not in {
"name", "msg", "args", "levelname", "levelno", "pathname", "filename",
"module", "lineno", "funcName", "created", "msecs", "relativeCreated",
"thread", "threadName", "processName", "process", "getMessage",
"exc_info", "exc_text", "stack_info"
}:
log_entry[key] = value
return json.dumps(log_entry, default=str)
class VerboseFormatter(logging.Formatter):
"""Verbose formatter with detailed context information."""
def format(self, record: logging.LogRecord) -> str:
"""Format log record with verbose details."""
# Standard message
message = super().format(record)
# Add verbose details if enabled
if settings.verbose_logging:
details = [
f"[{record.name}]",
f"PID:{record.process}",
f"Thread:{record.thread}",
f"{record.pathname}:{record.lineno}",
]
# Add any extra context from the record
extras = []
for key, value in record.__dict__.items():
if key.startswith('ctx_'): # Custom context fields
extras.append(f"{key[4:]}={value}")
if extras:
details.extend(extras)
verbose_info = " | ".join(details)
message = f"{message}\n 📍 {verbose_info}"
return message
def setup_logging() -> None:
"""Configure enhanced logging that integrates with uvicorn's existing system."""
# Get log level
log_level = getattr(logging, settings.log_level.upper(), logging.INFO)
# Get the uvicorn error logger that the app already uses
uvicorn_logger = logging.getLogger("uvicorn.error")
uvicorn_logger.setLevel(log_level)
# Configure uvicorn access logger but don't interfere with its formatter
uvicorn_access_logger = logging.getLogger("uvicorn.access")
if settings.verbose_logging:
uvicorn_access_logger.setLevel(logging.INFO)
else:
uvicorn_access_logger.setLevel(logging.WARNING)
# Create our own access logger for custom logging
custom_access_logger = logging.getLogger("random_access.access")
custom_access_logger.setLevel(log_level)
# Add file handler to uvicorn loggers if specified
if settings.log_file:
log_path = Path(settings.log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
# Choose formatter based on format setting
if settings.log_format.lower() == "json":
formatter = JSONFormatter()
elif settings.log_format.lower() == "verbose":
formatter = VerboseFormatter(
fmt="%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
else: # standard
formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
file_handler = logging.handlers.RotatingFileHandler(
filename=log_path,
maxBytes=settings.log_max_size,
backupCount=settings.log_backup_count,
encoding="utf-8"
)
file_handler.setLevel(log_level)
file_handler.setFormatter(formatter)
# Add file handler to uvicorn and custom loggers
uvicorn_logger.addHandler(file_handler)
custom_access_logger.addHandler(file_handler)
# Configure specific third-party loggers
configure_third_party_loggers()
# Log startup information using uvicorn logger
uvicorn_logger.info("Enhanced logging configured", extra={
"ctx_level": settings.log_level,
"ctx_format": settings.log_format,
"ctx_verbose": settings.verbose_logging,
"ctx_file": settings.log_file or "console only"
})
def configure_third_party_loggers() -> None:
"""Configure logging levels for third-party libraries."""
# Set appropriate levels for third-party loggers
third_party_levels = {
"fastapi": logging.INFO,
"slowapi": logging.WARNING,
"aiohttp": logging.WARNING,
"urllib3": logging.WARNING,
"slack_bolt": logging.INFO if settings.verbose_logging else logging.WARNING,
"pyairtable": logging.INFO if settings.verbose_logging else logging.WARNING,
}
for logger_name, level in third_party_levels.items():
logging.getLogger(logger_name).setLevel(level)
def get_logger(name: str = "uvicorn.error") -> logging.Logger:
"""Get a logger - defaults to uvicorn.error for consistency with existing code."""
return logging.getLogger(name)
def log_request_context(request_id: str, **context: Any) -> Dict[str, Any]:
"""Create context dict for request logging."""
return {
"ctx_request_id": request_id,
**{f"ctx_{k}": v for k, v in context.items()}
}
def log_database_context(table: str, operation: str, **context: Any) -> Dict[str, Any]:
"""Create context dict for database operation logging."""
return {
"ctx_table": table,
"ctx_operation": operation,
**{f"ctx_{k}": v for k, v in context.items()}
}
def log_auth_context(user_id: str | None = None, session_id: str | None = None, **context: Any) -> Dict[str, Any]:
"""Create context dict for authentication logging."""
ctx = {}
if user_id:
ctx["ctx_user_id"] = user_id
if session_id:
ctx["ctx_session_id"] = session_id
ctx.update({f"ctx_{k}": v for k, v in context.items()})
return ctx
def log_performance_context(operation: str, duration_ms: float, **context: Any) -> Dict[str, Any]:
"""Create context dict for performance logging."""
return {
"ctx_operation": operation,
"ctx_duration_ms": round(duration_ms, 2),
**{f"ctx_{k}": v for k, v in context.items()}
}

View file

@ -2,57 +2,71 @@
import asyncio
import logging
import time
import uuid
from collections import namedtuple
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi import FastAPI, HTTPException, Request, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.responses import JSONResponse
from slack_bolt.adapter.fastapi.async_handler import AsyncSlackRequestHandler
from slowapi import Limiter
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
from random_access.database import (
from database import (
airtable_write_worker,
cleanup_expired_sessions_worker,
get_airtable_base,
get_table,
validate_all_schemas,
)
from random_access.routes.auth import create_auth_router
from random_access.routes.items import create_items_router, create_user_items_router
from random_access.routes.system import create_system_router
from random_access.routes.users import create_users_router
from random_access.security import SecurityHeaders, get_client_ip
from random_access.settings import settings
from random_access.slack_integration import create_slack_app, setup_slack_handlers
from logging_config import setup_logging, get_logger, log_request_context, log_performance_context
from routes.auth import create_auth_router
from routes.items import create_items_router, create_user_items_router
from routes.system import create_system_router
from routes.users import create_users_router
from security import SecurityHeaders, get_client_ip
from settings import settings
from slack_integration import create_slack_app, setup_slack_handlers
# Setup enhanced logging that integrates with uvicorn
setup_logging()
Result = namedtuple("Result", "content, status")
logger = logging.getLogger("uvicorn.error")
logger = get_logger() # This will return uvicorn.error logger
access_logger = get_logger("random_access.access") # Use our custom access logger
# Initialize rate limiter
limiter = Limiter(key_func=get_client_ip)
# Initialize Airtable
logger.debug("Initializing Airtable connection", extra=log_request_context("startup"))
at_base = get_airtable_base()
SUBMISSIONS = get_table(at_base, "submissions")
USERS = get_table(at_base, "users")
SESSIONS = get_table(at_base, "sessions")
ITEMS = get_table(at_base, "items")
ITEM_ADDONS = get_table(at_base, "item_addons")
ITEM_INSTANCES = get_table(at_base, "item_instances")
logger.debug("Airtable tables initialized", extra=log_request_context("startup"))
logger.debug("Setting up Slack integration", extra=log_request_context("startup"))
slack = create_slack_app()
setup_slack_handlers(slack)
setup_slack_handlers(slack, SESSIONS, USERS, SUBMISSIONS, ITEMS, ITEM_INSTANCES)
slack_handler = AsyncSlackRequestHandler(slack)
logger.debug("Slack integration ready", extra=log_request_context("startup"))
@asynccontextmanager
async def lifespan(app: FastAPI):
async def lifespan(_: FastAPI):
"""Application lifespan manager."""
logger.info("🚀 Random Access API starting up", extra=log_request_context("startup", environment=settings.environment))
# Validate table schemas on startup
logger.info("Validating Airtable schemas...")
logger.debug("Validating Airtable schemas...")
schema_valid = await validate_all_schemas()
if not schema_valid:
@ -65,8 +79,18 @@ async def lifespan(app: FastAPI):
)
# Start the write worker
logger.debug("Starting Airtable write worker...")
asyncio.create_task(airtable_write_worker())
# Start the session cleanup worker
logger.debug("Starting session cleanup worker...")
asyncio.create_task(cleanup_expired_sessions_worker(SESSIONS))
logger.info("✅ Random Access API ready", extra=log_request_context("startup", status="ready"))
yield
logger.info("🛑 Random Access API shutting down", extra=log_request_context("shutdown"))
app = FastAPI(
@ -76,6 +100,192 @@ app = FastAPI(
description="API for Random Access game integration",
)
# Add trusted host middleware if in production
if settings.is_production:
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["*"], # Configure with your actual domain(s) in production
)
# Add proxy middleware to handle X-Forwarded-* headers
@app.middleware("http")
async def proxy_headers_middleware(request: Request, call_next):
"""Handle proxy headers for correct IP, scheme, and host detection."""
if settings.is_production or settings.trust_proxy_headers:
# Trust proxy headers in production or when explicitly enabled
forwarded_proto = request.headers.get("X-Forwarded-Proto")
forwarded_host = request.headers.get("X-Forwarded-Host")
if forwarded_proto:
# Update the URL scheme if behind a proxy
request.scope["scheme"] = forwarded_proto
if forwarded_host:
# Update the host header if behind a proxy
request.scope["headers"] = [
(name, value) if name != b"host" else (name, forwarded_host.encode())
for name, value in request.scope["headers"]
]
response = await call_next(request)
return response
# Request logging and performance middleware
@app.middleware("http")
async def request_logging_middleware(request: Request, call_next):
"""Log requests with performance metrics and context."""
request_id = str(uuid.uuid4())[:8]
client_ip = get_client_ip(request)
start_time = time.time()
# Log request start only in verbose/debug mode
if settings.verbose_logging or settings.log_level.upper() == "DEBUG":
access_logger.debug(
f"Request started: {request.method} {request.url.path}",
extra=log_request_context(
request_id,
method=request.method,
path=request.url.path,
client_ip=client_ip,
user_agent=request.headers.get("user-agent", "")
)
)
# Store request ID in request state for use in handlers
request.state.request_id = request_id
try:
response = await call_next(request)
duration_ms = (time.time() - start_time) * 1000
# Log request completion with appropriate levels
if response.status_code >= 500:
# Server errors - always log as ERROR
access_logger.error(
f"Request failed: {request.method} {request.url.path} - {response.status_code}",
extra=log_performance_context(
f"{request.method} {request.url.path}",
duration_ms,
request_id=request_id,
status_code=response.status_code,
client_ip=client_ip
)
)
elif response.status_code == 404:
# Not found - log as DEBUG (usually bots, missing favicon, etc.)
access_logger.debug(
f"Request not found: {request.method} {request.url.path} - {response.status_code}",
extra=log_performance_context(
f"{request.method} {request.url.path}",
duration_ms,
request_id=request_id,
status_code=response.status_code,
client_ip=client_ip
)
)
elif response.status_code == 429:
# Rate limited - don't log here, already handled by rate limit handler
pass
elif response.status_code >= 400:
# Other client errors - log as WARNING
access_logger.warning(
f"Request failed: {request.method} {request.url.path} - {response.status_code}",
extra=log_performance_context(
f"{request.method} {request.url.path}",
duration_ms,
request_id=request_id,
status_code=response.status_code,
client_ip=client_ip
)
)
elif duration_ms > 1000:
# Slow requests (>1s) - log as WARNING
access_logger.warning(
f"Slow request: {request.method} {request.url.path} - {response.status_code} ({duration_ms:.0f}ms)",
extra=log_performance_context(
f"{request.method} {request.url.path}",
duration_ms,
request_id=request_id,
status_code=response.status_code,
client_ip=client_ip
)
)
elif settings.verbose_logging or settings.log_level.upper() == "DEBUG":
# Normal requests - only log in debug/verbose mode
access_logger.debug(
f"Request completed: {request.method} {request.url.path} - {response.status_code} ({duration_ms:.0f}ms)",
extra=log_performance_context(
f"{request.method} {request.url.path}",
duration_ms,
request_id=request_id,
status_code=response.status_code,
client_ip=client_ip
)
)
# Add request ID to response headers for tracing
response.headers["X-Request-ID"] = request_id
return response
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
# Don't log tracebacks for expected HTTP exceptions
is_http_exception = isinstance(e, (HTTPException, RateLimitExceeded))
should_include_traceback = (
not is_http_exception and
(settings.verbose_logging or settings.log_level.upper() == "DEBUG")
)
# Use appropriate log level based on exception type
if isinstance(e, RateLimitExceeded):
# Rate limit - already handled by exception handler, just log basic info
logger.debug(
f"Rate limit triggered: {request.method} {request.url.path}",
extra=log_request_context(
request_id,
method=request.method,
path=request.url.path,
client_ip=client_ip,
duration_ms=duration_ms
)
)
elif isinstance(e, HTTPException):
# HTTP exceptions - log based on status code
log_level = logger.warning if e.status_code < 500 else logger.error
log_level(
f"HTTP exception: {request.method} {request.url.path} - {e.status_code}: {e.detail}",
extra=log_request_context(
request_id,
method=request.method,
path=request.url.path,
client_ip=client_ip,
error=e.detail,
status_code=e.status_code,
duration_ms=duration_ms
),
exc_info=should_include_traceback
)
else:
# Unexpected exceptions - always log as error
logger.error(
f"Request exception: {request.method} {request.url.path} - {str(e)}",
extra=log_request_context(
request_id,
method=request.method,
path=request.url.path,
client_ip=client_ip,
error=str(e),
duration_ms=duration_ms
),
exc_info=should_include_traceback
)
raise
# Security middleware
app.state.limiter = limiter
@ -84,6 +294,20 @@ app.state.limiter = limiter
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
"""Custom rate limit exceeded handler."""
client_ip = get_client_ip(request)
request_id = getattr(request.state, 'request_id', 'unknown')
logger.warning(
f"Rate limit exceeded for {request.method} {request.url.path}",
extra=log_request_context(
request_id,
client_ip=client_ip,
method=request.method,
path=request.url.path,
rate_limit_detail=exc.detail
)
)
return JSONResponse(
status_code=429,
content={"detail": f"Rate limit exceeded: {exc.detail}"},
@ -92,17 +316,17 @@ async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
getattr(exc, "limit", settings.rate_limit_requests)
),
"Retry-After": "60", # Default retry after 60 seconds
"X-Request-ID": request_id,
},
)
app.add_middleware(SlowAPIMiddleware)
# CORS middleware - allows all origins for game compatibility but with secure settings
app.add_middleware(
CORSMiddleware,
allow_origins=settings.origins_list, # ["*"] for development, specific domains for production
allow_credentials=False, # Don't allow credentials with wildcards for security
allow_origins=settings.origins_list,
allow_credentials=False,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Authorization", "Content-Type", "X-Requested-With"],
expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"],
@ -135,8 +359,21 @@ async def limit_request_size(request: Request, call_next):
if content_length:
content_length = int(content_length)
if content_length > settings.max_request_size:
from fastapi import HTTPException, status
client_ip = get_client_ip(request)
request_id = getattr(request.state, 'request_id', 'unknown')
logger.warning(
f"Request too large: {content_length} bytes > {settings.max_request_size} bytes",
extra=log_request_context(
request_id,
client_ip=client_ip,
method=request.method,
path=request.url.path,
content_length=content_length,
max_allowed=settings.max_request_size
)
)
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f"Request too large. Maximum size: {settings.max_request_size} bytes",
@ -150,8 +387,8 @@ async def limit_request_size(request: Request, call_next):
routers = [
create_auth_router(SESSIONS, USERS, SUBMISSIONS, slack),
create_users_router(SESSIONS, USERS),
create_items_router(SESSIONS, USERS, ITEMS, ITEM_ADDONS, ITEM_INSTANCES),
create_user_items_router(SESSIONS, USERS, ITEMS, ITEM_ADDONS, ITEM_INSTANCES),
create_items_router(SESSIONS, USERS, ITEMS, ITEM_INSTANCES),
create_user_items_router(SESSIONS, USERS, ITEMS, ITEM_INSTANCES),
create_system_router(slack_handler),
]

View file

@ -1,34 +1,88 @@
"""API routes for authentication endpoints."""
import datetime
import hashlib
import hmac
import re
from pathlib import Path
from typing import Literal
from urllib.parse import urlencode
from fastapi import APIRouter, HTTPException, Query, Request, status
from fastapi import APIRouter, Form, HTTPException, Query, Request, status
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, Field
from slowapi import Limiter
from random_access.auth import get_session_by_token, hash_token
from random_access.database import (
from auth_utils import (
decode_oidc_state,
get_session_by_token,
hash_token,
is_session_expired,
)
from database import (
check_display_name_exists,
create_session,
create_user,
get_game_record,
get_user_record,
update_user_and_session,
)
from random_access.security import (
from security import (
create_safe_error_response,
generate_secure_token,
get_client_ip,
validate_airtable_id,
)
from random_access.settings import settings
from random_access.slack_integration import get_slack_user_id
from settings import settings
from slack_integration import get_slack_user_id
# Rate limiter for auth endpoints
limiter = Limiter(key_func=get_client_ip)
# Templates for registration flow
templates = Jinja2Templates(directory="templates")
def _create_signed_slack_id(slack_user_id: str) -> str:
"""Create a cryptographically signed Slack ID to prevent tampering."""
# Create HMAC signature using the slack user ID and our secret
signature = hmac.new(
settings.slack_signing_secret.encode(),
slack_user_id.encode(),
hashlib.sha256,
).hexdigest()
# Return format: slackId.signature
return f"{slack_user_id}.{signature}"
def _verify_signed_slack_id(signed_slack_id: str) -> str | None:
"""Verify a signed Slack ID and return the actual Slack ID if valid."""
try:
# Split the signed ID
parts = signed_slack_id.split(".")
if len(parts) != 2:
return None
slack_user_id, provided_signature = parts
# Recreate the expected signature
expected_signature = hmac.new(
settings.slack_signing_secret.encode(),
slack_user_id.encode(),
hashlib.sha256,
).hexdigest()
# Use constant-time comparison to prevent timing attacks
if hmac.compare_digest(expected_signature, provided_signature):
return slack_user_id
return None
except Exception:
return None
# Pydantic models for OpenAPI documentation
class AuthStatusResponse(BaseModel):
@ -79,7 +133,12 @@ Provide a `game_id` query parameter to get a token that can be used with the `/a
hashed_token = hash_token(secure_token)
session = await create_session(
{"Game": [validated_game_id], "Token": hashed_token}, sessions_table
{
"Game": [validated_game_id],
"Token": hashed_token,
"Created": datetime.datetime.now().isoformat()
},
sessions_table
)
# Create game hash for state validation
@ -181,8 +240,6 @@ This endpoint is called automatically by Slack after user authorization. It comp
detail="Missing code or state in callback",
)
from random_access.auth import decode_oidc_state
game_id, token, session_rec_id = await decode_oidc_state(state, sessions_table)
user_id = await get_slack_user_id(code, slack_app)
user_rec = await get_user_record(user_id, users_table)
@ -244,13 +301,20 @@ Returns `{"status": "ok"}` for valid authenticated sessions, `{"status": "error"
try:
session = await get_session_by_token(token.split(".")[0], sessions_table)
print(session)
# Check if session exists, has a game, and has a user (complete auth)
if (
session
and session["fields"].get("Game")
and session["fields"].get("User")
):
# Check if session has expired
if is_session_expired(session):
return JSONResponse(
{"status": "error", "message": "Session has expired"},
status_code=status.HTTP_401_UNAUTHORIZED,
)
return JSONResponse({"status": "ok"})
except Exception:
# Safe error handling - don't leak internal errors
@ -261,4 +325,301 @@ Returns `{"status": "ok"}` for valid authenticated sessions, `{"status": "error"
status_code=status.HTTP_400_BAD_REQUEST,
)
@router.get(
"/register",
response_class=HTMLResponse,
summary="Registration page",
description="Display registration form with optional Slack authentication",
responses={
200: {"description": "Registration form displayed"},
302: {"description": "Redirect to Slack OAuth for authentication"},
429: {"description": "Rate limit exceeded"},
},
)
@limiter.limit(f"{settings.rate_limit_requests}/minute")
async def register_page(
request: Request,
slackId: str = Query(None, description="Signed Slack user ID (only from OAuth callback)"),
error: str = Query(None, description="Error message to display"),
):
"""Display registration form, optionally with Slack authentication."""
verified_slack_id = None
if slackId:
# Verify the cryptographic signature of the Slack ID
# This prevents users from tampering with the Slack ID parameter
verified_slack_id = _verify_signed_slack_id(slackId)
if not verified_slack_id:
# SECURITY: Invalid or tampered Slack ID - deny access
# This prevents impersonation attempts where someone tries to
# register with a different Slack ID than what OAuth provided
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or tampered authentication token. Access denied.",
)
if verified_slack_id:
# Valid Slack ID from OAuth - show form with Slack connection
return templates.TemplateResponse(
"register.html",
{
"request": request,
"slack_user_id": verified_slack_id,
"signed_slack_id": slackId, # Pass the full signed version for form submission
"error": error,
},
)
else:
# No Slack ID - show form without Slack integration
return templates.TemplateResponse(
"register.html",
{
"request": request,
"slack_user_id": None,
"error": error,
},
)
@router.get(
"/register/callback",
summary="Handle Slack OAuth callback for registration",
description="Handle OAuth callback from Slack during registration",
responses={
200: {"description": "Registration success page displayed"},
302: {"description": "Redirect to registration form with Slack user ID"},
400: {"description": "Missing or invalid authorization code"},
429: {"description": "Rate limit exceeded"},
},
)
@limiter.limit(f"{settings.rate_limit_requests}/minute")
async def register_callback(
request: Request,
code: str = Query(..., description="Authorization code provided by Slack"),
state: str = Query(..., description="State parameter"),
):
"""Handle Slack OAuth callback for registration."""
if not code or state != "registration":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid registration callback",
)
try:
# Get Slack user ID
redirect_uri = f"{settings.app_base_url}/auth/register/callback"
slack_user_id = await get_slack_user_id(code, slack_app, redirect_uri)
# Check if user already exists
try:
existing_user = await get_user_record(slack_user_id, users_table)
if existing_user:
# User already exists, show registration complete
user_fields = existing_user.get("fields", {})
return templates.TemplateResponse(
"register_success.html",
{
"request": request,
"name": user_fields.get("Name", "Unknown"),
"display_name": user_fields.get("Display Name", "Unknown"),
"email": user_fields.get("Email", "Unknown"),
"slack_user_id": slack_user_id,
},
)
except ValueError:
# User doesn't exist, continue with registration
pass
# Redirect to registration form with signed Slack user ID
signed_slack_id = _create_signed_slack_id(slack_user_id)
return RedirectResponse(
url=f"/auth/register?slackId={signed_slack_id}",
status_code=status.HTTP_302_FOUND
)
except Exception as e:
return RedirectResponse(
url=f"/auth/register?error={str(e)}",
status_code=status.HTTP_302_FOUND
)
@router.get(
"/register/slack",
response_class=RedirectResponse,
summary="Start Slack OAuth for registration",
description="Initiate Slack OAuth flow for user registration",
responses={
302: {"description": "Redirect to Slack OAuth authorization page"},
429: {"description": "Rate limit exceeded"},
},
)
@limiter.limit(f"{settings.rate_limit_requests}/minute")
async def register_slack_start(request: Request):
"""Start Slack OAuth flow for registration."""
params = {
"response_type": "code",
"scope": "openid profile email",
"client_id": settings.slack_client_id,
"state": "registration",
"redirect_uri": settings.app_base_url + "/auth/register/callback",
}
return RedirectResponse(
"https://slack.com/openid/connect/authorize/?" + urlencode(params)
)
@router.post(
"/register",
response_class=HTMLResponse,
summary="Complete registration",
description="Process registration form submission",
responses={
200: {"description": "Registration successful"},
400: {"description": "Registration failed - validation errors"},
429: {"description": "Rate limit exceeded"},
},
)
@limiter.limit(f"{settings.rate_limit_requests}/minute")
async def register_complete(
request: Request,
name: str = Form(..., description="Full name"),
display_name: str = Form(..., description="Display name"),
email: str = Form(..., description="Email address"),
slack_user_id: str = Form(None, description="Slack user ID"),
):
"""Complete user registration."""
# Validate and extract actual Slack ID if provided
actual_slack_id = None
if slack_user_id:
# Verify the signature - slack_user_id should be signed from OAuth
if '.' in slack_user_id and len(slack_user_id.split('.')) == 2:
# This looks like a signed slack ID, verify it
actual_slack_id = _verify_signed_slack_id(slack_user_id)
if not actual_slack_id:
# SECURITY: Invalid signature - this indicates tampering
# Someone is trying to submit a form with a forged Slack ID
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or tampered authentication. Access denied.",
)
else:
# Plain slack ID (shouldn't happen in normal flow)
# This could be an attempt to bypass our signing mechanism
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid authentication format. Access denied.",
)
# Validate inputs
errors = []
# Name validation
if not name or len(name.strip()) < 2:
errors.append("Name must be at least 2 characters long")
# Display name validation
if not display_name or len(display_name.strip()) < 2:
errors.append("Display name must be at least 2 characters long")
elif len(display_name.strip()) > 50:
errors.append("Display name must be less than 50 characters")
# Email validation
email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not email or not re.match(email_regex, email):
errors.append("Please enter a valid email address")
# Check for auth requirements
if actual_slack_id:
# Slack authentication path - validate that this Slack ID is legitimate
# We should only trust Slack IDs that came through our OAuth flow
# For additional security, check if this Slack ID already exists
try:
existing_slack_user = await get_user_record(actual_slack_id, users_table)
if existing_slack_user:
errors.append("This Slack account is already registered.")
auth_valid = False
else:
auth_valid = True
except ValueError:
# No existing user with this Slack ID, which is good
auth_valid = True
else:
# Email authentication path (dev mode)
if email != "not-a-real-email@example.com":
errors.append("Email verification not yet implemented. Use not-a-real-email@example.com for testing")
auth_valid = False
else:
auth_valid = True
# Check if display name already exists
if not errors:
display_name_exists = await check_display_name_exists(display_name.strip(), users_table)
if display_name_exists:
errors.append("Display name already taken. Please choose a different one.")
# If there are errors, show the form again
if errors:
# Need to recreate signed slack ID for the form
signed_slack_id_for_form = None
if actual_slack_id:
signed_slack_id_for_form = _create_signed_slack_id(actual_slack_id)
return templates.TemplateResponse(
"register.html",
{
"request": request,
"slack_user_id": actual_slack_id,
"signed_slack_id": signed_slack_id_for_form,
"name": name,
"display_name": display_name,
"email": email,
"error": "; ".join(errors),
},
)
# Create user record
try:
user_fields = {
"Name": name.strip(),
"Display Name": display_name.strip(),
"Email": email.strip(),
"Registered At": datetime.datetime.now().isoformat(),
}
# Add Slack ID if provided
if actual_slack_id:
user_fields["Slack ID"] = actual_slack_id
# Create user
user = await create_user(user_fields, users_table)
# Show success page
return templates.TemplateResponse(
"register_success.html",
{
"request": request,
"name": name.strip(),
"display_name": display_name.strip(),
"email": email.strip(),
"slack_user_id": actual_slack_id,
},
)
except Exception as e:
# Need to recreate signed slack ID for the form
signed_slack_id_for_form = None
if actual_slack_id:
signed_slack_id_for_form = _create_signed_slack_id(actual_slack_id)
return templates.TemplateResponse(
"register.html",
{
"request": request,
"slack_user_id": actual_slack_id,
"signed_slack_id": signed_slack_id_for_form,
"name": name,
"display_name": display_name,
"email": email,
"error": f"Registration failed: {str(e)}",
},
)
return router

View file

@ -8,22 +8,22 @@ from fastapi.security import HTTPAuthorizationCredentials
from pydantic import BaseModel, Field, field_validator
from slowapi import Limiter
from random_access.auth import extract_and_validate_auth, get_auth_credentials
from random_access.database import (
from auth_utils import extract_and_validate_auth, get_auth_credentials
from database import (
add_item_to_user,
get_all_items,
get_item_by_id,
)
from random_access.database import get_user_items as get_user_items_cached
from random_access.database import (
from database import get_user_items as get_user_items_cached
from database import (
invalidate_user_items_cache,
)
from random_access.security import (
from security import (
create_safe_error_response,
get_client_ip,
validate_airtable_id,
)
from random_access.settings import settings
from settings import settings
# Rate limiter for item endpoints
limiter = Limiter(key_func=get_client_ip)
@ -44,7 +44,7 @@ class ItemResponse(BaseModel):
class UserItemResponse(BaseModel):
"""Response model for user's item (simplified flat structure)."""
"""Response model for user's item."""
item_id: str = Field(..., description="Unique identifier for the item")
name: str | None = Field(None, description="Display name of the item")
@ -122,7 +122,7 @@ class DetailedItemResponse(BaseModel):
def create_items_router(
sessions_table, users_table, items_table, item_addons_table, item_instances_table
sessions_table, users_table, items_table, item_instances_table
) -> APIRouter:
"""Create and configure the items router."""
router = APIRouter(prefix="/items", tags=["items"])
@ -365,7 +365,7 @@ Returns all available information about the item including name, type, level, ra
def create_user_items_router(
sessions_table, users_table, items_table, item_addons_table, item_instances_table
sessions_table, users_table, items_table, item_instances_table
) -> APIRouter:
"""Create router for user-specific item endpoints."""
router = APIRouter(prefix="/users/me", tags=["user-items"])
@ -451,9 +451,14 @@ Returns the user's complete inventory with detailed information for each item in
}
)
# Combine first and last name for display
first_name = user["fields"].get("First Name", "")
last_name = user["fields"].get("Last Name", "")
user_name = f"{first_name} {last_name}".strip() or None
return {
"user_id": user["id"],
"user_name": user["fields"].get("Display Name"),
"user_name": user_name,
"total_items": len(user_items),
"items": user_items,
}

View file

@ -5,8 +5,8 @@ from pydantic import BaseModel, Field
from slack_bolt.adapter.fastapi.async_handler import AsyncSlackRequestHandler
from slowapi import Limiter
from random_access.security import get_client_ip
from random_access.settings import settings
from security import get_client_ip
from settings import settings
# Rate limiter for system endpoints
limiter = Limiter(key_func=get_client_ip)

View file

@ -1,14 +1,13 @@
"""API routes for user endpoints."""
"""API routes for user management endpoints."""
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials
from pydantic import BaseModel, Field
from slowapi import Limiter
from random_access.auth import extract_and_validate_auth, get_auth_credentials
from random_access.database import get_user_record
from random_access.security import get_client_ip
from random_access.settings import settings
from auth_utils import extract_and_validate_auth, get_auth_credentials
from security import get_client_ip
from settings import settings
# Rate limiter for user endpoints
limiter = Limiter(key_func=get_client_ip)
@ -18,7 +17,15 @@ class UserResponse(BaseModel):
"""Response model for user data."""
id: str = Field(..., description="Unique identifier for the user")
display_name: str | None = Field(None, description="User's display name")
first_name: str | None = Field(None, description="User's first name")
last_name: str | None = Field(None, description="User's last name")
slack_id: str | None = Field(None, description="User's Slack ID")
email: str | None = Field(None, description="User's email address")
created: str | None = Field(
None, description="ISO timestamp when the user account was created"
)
first_name: str | None = Field(None, description="User's first name")
last_name: str | None = Field(None, description="User's last name")
slack_id: str | None = Field(None, description="User's Slack ID")
email: str | None = Field(None, description="User's email address")
created: str | None = Field(
@ -36,7 +43,7 @@ def create_users_router(sessions_table, users_table) -> APIRouter:
summary="Get authenticated user's profile information",
description="""Get profile information for the authenticated user.
Returns the user's ID, display name, Slack ID, email address, and account creation date.
Returns the user's ID, first name, last name, Slack ID, email address, and account creation date.
**Authentication:** Bearer token required""",
responses={
@ -46,9 +53,10 @@ Returns the user's ID, display name, Slack ID, email address, and account creati
"application/json": {
"example": {
"id": "usr456",
"display_name": "PlayerOne",
"first_name": "Jane",
"last_name": "Doe",
"slack_id": "U1234567890",
"email": "player@example.com",
"email": "jane.doe@example.com",
"created": "2025-01-01T12:00:00Z",
}
}
@ -76,10 +84,11 @@ Returns the user's ID, display name, Slack ID, email address, and account creati
return {
"id": user["id"],
"display_name": user["fields"].get("Display Name"),
"first_name": user["fields"].get("First Name"),
"last_name": user["fields"].get("Last Name"),
"slack_id": user["fields"].get("Slack ID"),
"email": user["fields"].get("Email"),
"created": user["fields"].get("Created"),
"created": user["fields"].get("Registered At"),
}
return router

View file

@ -6,7 +6,7 @@ from typing import Any
from fastapi import HTTPException, Request, status
from random_access.settings import settings
from settings import settings
# Validation patterns
AIRTABLE_ID_PATTERN = re.compile(r"^rec[A-Za-z0-9]{14}$")
@ -87,15 +87,26 @@ def sanitize_airtable_formula_input(value: str) -> str:
def get_client_ip(request: Request) -> str:
"""Get client IP address, considering proxy headers."""
# Check for forwarded IP (common in production behind load balancers)
forwarded_for = request.headers.get("X-Forwarded-For")
if forwarded_for:
# Take the first IP in the chain
return forwarded_for.split(",")[0].strip()
# In production behind a trusted proxy, prefer proxy headers
if settings.is_production or settings.trust_proxy_headers:
# Check for forwarded IP (common in production behind load balancers)
forwarded_for = request.headers.get("X-Forwarded-For")
if forwarded_for:
# Take the first IP in the chain (the original client)
client_ip = forwarded_for.split(",")[0].strip()
# Basic validation to ensure it looks like an IP
if client_ip and not client_ip.startswith("unknown"):
return client_ip
real_ip = request.headers.get("X-Real-IP")
if real_ip:
return real_ip
# Check for real IP (nginx alternative)
real_ip = request.headers.get("X-Real-IP")
if real_ip and not real_ip.startswith("unknown"):
return real_ip
# Check for CloudFlare connecting IP
cf_connecting_ip = request.headers.get("CF-Connecting-IP")
if cf_connecting_ip:
return cf_connecting_ip
# Fallback to direct connection IP
return request.client.host if request.client else "unknown"

View file

@ -29,7 +29,7 @@ class Settings(BaseSettings):
airtable_base: str
slack_signing_secret: str
slack_client_id: str
slack_client_secret: str
slack_bolt_is_broken_so_this_cant_be_called_client_secret: str
slack_bot_token: str
app_base_url: str
game_id_salt: str
@ -38,7 +38,6 @@ class Settings(BaseSettings):
airtable_users_table: str
airtable_items_table: str
airtable_sessions_table: str
airtable_item_addons_table: str
airtable_item_instances_table: str
# Security settings
@ -46,10 +45,19 @@ class Settings(BaseSettings):
max_request_size: int = 1048576 # 1MB default
rate_limit_requests: int = 20 # requests per minute per IP
allowed_origins: str = "*" # Comma-separated list or "*" for development
trust_proxy_headers: bool = False # Whether to trust X-Forwarded-For headers
# Session security
session_ttl_hours: int = 24 # Session expires after 24 hours
# Logging settings
log_level: str = "INFO" # DEBUG, INFO, WARNING, ERROR, CRITICAL
log_format: str = "standard" # standard, json, verbose
log_file: str = "" # Empty string means no file logging, only console
log_max_size: int = 10485760 # 10MB default for log rotation
log_backup_count: int = 5 # Number of backup log files to keep
verbose_logging: bool = False # Enable verbose debug logging
# Redis/Valkey settings - prioritize explicit env vars, fall back to container detection
redis_host: str = environ.get("REDIS_HOST") or (
"valkey" if environ.get("DOCKER_CONTAINER") else "localhost"

View file

@ -1,12 +1,57 @@
"""Slack integration handlers and utilities."""
import logging
import re
from datetime import datetime
from logging import Logger
from zoneinfo import ZoneInfo
from slack_bolt.async_app import AsyncAck, AsyncApp, AsyncRespond, AsyncSay
from slack_bolt.response import BoltResponse
from random_access.settings import settings
from database import (
get_all_games,
get_detailed_user_items_for_slack,
get_game_record,
get_item_by_id,
get_user_by_slack_id,
get_user_sessions,
)
from settings import settings
def get_ordinal_suffix(day: int) -> str:
"""Get the ordinal suffix for a day (1st, 2nd, 3rd, 4th, etc.)."""
if 10 <= day % 100 <= 20:
return "th"
else:
return {1: "st", 2: "nd", 3: "rd"}.get(day % 10, "th")
def format_datetime_pretty(iso_datetime: str) -> str:
"""Format an ISO datetime string to a pretty UTC format."""
try:
# Parse the ISO datetime (handles both Z and +00:00 formats)
if iso_datetime.endswith("Z"):
dt = datetime.fromisoformat(iso_datetime.replace("Z", "+00:00"))
else:
dt = datetime.fromisoformat(iso_datetime)
# Convert to UTC
utc_dt = dt.astimezone(ZoneInfo("UTC"))
# Format the date
day = utc_dt.day
month = utc_dt.strftime("%B") # Full month name
year = utc_dt.year
time_str = utc_dt.strftime("%H:%M") # 24-hour format
# Get ordinal suffix
ordinal = get_ordinal_suffix(day)
return f"{month} {day}{ordinal}, {year} at {time_str} UTC"
except Exception:
return iso_datetime # Return original if parsing fails
def create_slack_app() -> AsyncApp:
@ -16,13 +61,14 @@ def create_slack_app() -> AsyncApp:
)
async def get_slack_user_id(code: str, slack_app: AsyncApp) -> str:
async def get_slack_user_id(code: str, slack_app: AsyncApp, redirect_uri: str | None = None) -> str:
"""Get Slack user ID from OAuth code."""
redirect_uri = f"{settings.app_base_url}/auth/callback"
if redirect_uri is None:
redirect_uri = f"{settings.app_base_url}/auth/callback"
token_resp = await slack_app.client.openid_connect_token(
client_id=settings.slack_client_id,
client_secret=settings.slack_client_secret,
client_secret=settings.slack_bolt_is_broken_so_this_cant_be_called_client_secret,
code=code,
redirect_uri=redirect_uri,
)
@ -38,7 +84,7 @@ async def get_slack_user_id(code: str, slack_app: AsyncApp) -> str:
return str(slack_user_id)
def setup_slack_handlers(slack_app: AsyncApp):
def setup_slack_handlers(slack_app: AsyncApp, sessions_table=None, users_table=None, submissions_table=None, items_table=None, item_instances_table=None):
"""Set up Slack event handlers."""
@slack_app.event("app_mention") # pyright:ignore[reportUnknownMemberType]
@ -51,8 +97,448 @@ def setup_slack_handlers(slack_app: AsyncApp):
pass
@slack_app.command("/random-access") # pyright:ignore[reportUnknownMemberType]
async def handle_command(ack: AsyncAck, body: BoltResponse, respond: AsyncRespond):
async def handle_command(ack: AsyncAck, body: BoltResponse, respond: AsyncRespond, command):
await ack()
subcommand = dict(body).get("text", "").strip() # type: ignore
# Note: Removed debug print for security - use proper logging in production
await respond("hewowo")
# Get the user ID from the command
user_id: str = command["user_id"]
text: str = command["text"].strip()
full_command = f"/random-access {text}".strip()
# Create attribution line
attribution = f"<@{user_id}> used `{full_command}`\n\n"
# Parse the command
if not text or text.lower() in ["help", "commands", "h", "?", "about"]:
await respond(
attribution +
"Available commands:\n"
"- `/random-access [help|commands|info|h|?|about]`: Show this help message\n"
"- `/random-access register`: Get registration link\n"
"- `/random-access info`: Get your in-game status\n"
"- `/random-access info [item|i] <item_name>`: Get details about a specific item\n"
"- `/random-access info [game|g] <game_id>`: Get details about a specific game\n"
"- `/random-access list items`: List your items\n"
"- `/random-access list games`: List available games\n"
)
return
# Parse command parts
parts = text.split() # Don't convert to lowercase to preserve case-sensitive IDs
parts_lower = [part.lower() for part in parts] # Separate lowercase version for command parsing
try:
if parts_lower[0] == "register":
await handle_register_command(respond, user_id, attribution)
elif parts_lower[0] == "info":
if len(parts) == 1:
await handle_info_command(respond, user_id, users_table, sessions_table, item_instances_table, items_table, attribution)
elif len(parts) >= 3 and parts_lower[1] in ["item", "i"]:
item_id = parts[2] # Use original case for ID
await handle_item_info_command(respond, item_id, items_table, attribution)
elif len(parts) >= 3 and parts_lower[1] in ["game", "g"]:
game_id = parts[2] # Use original case for ID
await handle_game_info_command(respond, game_id, submissions_table, attribution)
else:
await respond(attribution + "Invalid info command. Use `/random-access help` for usage.")
elif len(parts) >= 2 and parts_lower[0] == "list":
if parts_lower[1] == "items":
await handle_list_items_command(respond, user_id, users_table, item_instances_table, items_table, attribution)
elif parts_lower[1] == "games":
await handle_list_games_command(respond, submissions_table, attribution)
else:
await respond(attribution + "Invalid list command. Use `/random-access help` for usage.")
else:
await respond(attribution + "Unknown command. Type `/random-access help` for available commands.")
except Exception as e:
logging.error(f"Error handling Slack command: {e}")
await respond(attribution + "Sorry, there was an error processing your command. Please try again later.")
async def handle_register_command(respond: AsyncRespond, user_id: str, attribution: str):
"""Handle the register command."""
register_url = f"{settings.app_base_url}/auth/register?slack_user_id={user_id}"
await respond(
attribution +
f"🔗 Click here to register and connect your Slack account to Random Access:\n{register_url}\n\n"
"After registering, you'll be able to view your cross-game items and stats!"
)
async def handle_info_command(respond: AsyncRespond, slack_user_id: str, users_table, sessions_table, item_instances_table, items_table, attribution: str):
"""Handle the info command (user status)."""
try:
# Get user info
user = await get_user_by_slack_id(slack_user_id, users_table)
if not user:
await respond(
attribution +
"❌ You're not registered yet! Use `/random-access register` to get started."
)
return
# Get user stats
user_fields = user.get("fields", {})
display_name = user_fields.get("Display Name", "Unknown")
email = user_fields.get("Email", "Not set")
last_login_raw = user_fields.get("Last Login", "Never")
# Format last login date
if last_login_raw and last_login_raw != "Never":
last_login = format_datetime_pretty(last_login_raw)
else:
last_login = "Never"
# Get session count
sessions = await get_user_sessions(user["id"], sessions_table)
session_count = len(sessions)
# Get item count
detailed_items = await get_detailed_user_items_for_slack(user["id"], item_instances_table, items_table)
item_count = len(detailed_items)
# Count by rarity
rarity_counts = {}
for item_data in detailed_items:
item = item_data.get("item", {})
rarity = item.get("fields", {}).get("Rarity", "Unknown")
# Convert rarity to string and handle numeric rarities
if isinstance(rarity, (int, float)):
rarity_key = f"Level {rarity}"
else:
rarity_key = str(rarity) if rarity else "Unknown"
rarity_counts[rarity_key] = rarity_counts.get(rarity_key, 0) + 1
# Format rarity breakdown
rarity_text = ""
if rarity_counts:
rarity_text = "\n" + "\n".join([f"{rarity}: {count}" for rarity, count in rarity_counts.items()])
await respond(
attribution +
f"👤 *{display_name}*\n"
f"📧 Email: {email}\n"
f"🕐 Last Login: {last_login}\n"
f"🎮 Game Sessions: {session_count}\n"
f"🎒 Total Items: {item_count}{rarity_text}\n\n"
f"Use `/random-access list items` to see your full inventory!"
)
except Exception as e:
logging.error(f"Error in info command: {e}")
await respond(attribution + "❌ Error retrieving your information. Please try again later.")
async def handle_item_info_command(respond: AsyncRespond, item_id: str, items_table, attribution: str):
"""Handle the item info command using Slack Block Kit with embedded images."""
try:
# Validate item ID format (basic check)
if not item_id.startswith("rec") or len(item_id) < 10:
await respond({
"text": attribution + "❌ Invalid item identifier format."
})
return
item = await get_item_by_id(item_id, items_table)
if not item:
await respond({
"text": attribution + "❌ Item not found."
})
return
fields = item.get("fields", {})
name = fields.get("Name", "Unknown Item")
item_type = fields.get("Type", "Unknown")
level = fields.get("Level", "Unknown")
rarity = fields.get("Rarity", "Unknown")
description = fields.get("Description", "No description available")
game_name = fields.get("Game Name (from Games)", ["Unknown Game"])
image_attachments = fields.get("Image", [])
# Handle game name (it's a list from lookup)
if isinstance(game_name, list) and len(game_name) > 0:
game_name = game_name[0]
elif not isinstance(game_name, str):
game_name = "Unknown Game"
# Get rarity emoji and display text
rarity_display = rarity
if isinstance(rarity, (int, float)):
rarity_display = f"Level {rarity}"
rarity_emoji = "" # Default for numeric rarities
else:
rarity_emoji = {
"common": "", "uncommon": "🟢", "rare": "🔵",
"epic": "🟣", "legendary": "🟡", "mythic": "🔴"
}.get(str(rarity).lower(), "")
# Create blocks
blocks = []
# Attribution block
if attribution.strip():
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": attribution.strip()
}
})
blocks.append({"type": "divider"})
# Item header with image (if available)
header_text = f"{rarity_emoji} *{name}*"
# Get image URL if available
image_url = None
if image_attachments and isinstance(image_attachments, list) and len(image_attachments) > 0:
# Get the first image attachment
first_image = image_attachments[0]
if isinstance(first_image, dict) and "url" in first_image:
image_url = first_image["url"]
if image_url:
# Header with image
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": header_text
},
"accessory": {
"type": "image",
"image_url": image_url,
"alt_text": f"Image of {name}"
}
})
else:
# Header without image
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": header_text
}
})
# Item details in fields
fields_block = {
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*Type:*\n{item_type}"
},
{
"type": "mrkdwn",
"text": f"*Level:*\n{level}"
},
{
"type": "mrkdwn",
"text": f"*Rarity:*\n{rarity_display}"
},
{
"type": "mrkdwn",
"text": f"*Game:*\n{game_name}"
}
]
}
blocks.append(fields_block)
# Description
if description and description != "No description available":
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Description:*\n{description}"
}
})
# If there's an image but we couldn't embed it, show it as a separate image block
if image_url:
blocks.append({
"type": "image",
"image_url": image_url,
"alt_text": f"Full image of {name}",
"title": {
"type": "plain_text",
"text": name
}
})
await respond({
"text": f"{attribution.strip()}\n{rarity_emoji} {name}", # Fallback text
"blocks": blocks
})
except Exception as e:
logging.error(f"Error in item info command: {e}")
await respond({
"text": attribution + "❌ Error retrieving item information. Please try again later."
})
async def handle_game_info_command(respond: AsyncRespond, game_id: str, submissions_table, attribution: str):
"""Handle the game info command."""
try:
# Validate game ID format
if not game_id.startswith("rec") or len(game_id) < 10:
await respond(attribution + "❌ Invalid game ID format. Game IDs should start with 'rec' and be at least 10 characters long.")
return
game = await get_game_record(game_id, submissions_table)
if not game:
await respond(attribution + f"❌ Game with ID `{game_id}` not found.")
return
fields = game.get("fields", {})
name = fields.get("Game Name", "Unknown Game")
description = fields.get("Description", "No description available")
created_raw = fields.get("Created", "Unknown")
# Format created date
if created_raw and created_raw != "Unknown":
created = format_datetime_pretty(created_raw)
else:
created = "Unknown"
await respond(
attribution +
f"🎮 *{name}*\n"
f"📝 {description}\n"
f"📅 Created: {created}\n"
f"🆔 ID: `{game_id}`"
)
except Exception as e:
logging.error(f"Error in game info command: {e}")
await respond(attribution + "❌ Error retrieving game information. Please try again later.")
async def handle_list_items_command(respond: AsyncRespond, slack_user_id: str, users_table, item_instances_table, items_table, attribution: str):
"""Handle the list items command."""
try:
# Get user
user = await get_user_by_slack_id(slack_user_id, users_table)
if not user:
await respond(
attribution +
"❌ You're not registered yet! Use `/random-access register` to get started."
)
return
# Get user's items
detailed_items = await get_detailed_user_items_for_slack(user["id"], item_instances_table, items_table)
if not detailed_items:
await respond(
attribution +
"🎒 Your inventory is empty!\n"
"Start playing games that use Random Access to earn items across the ecosystem."
)
return
# Group by game
games = {}
for item_data in detailed_items:
item = item_data.get("item", {})
item_fields = item.get("fields", {})
game_name = item_fields.get("Game Name (from Games)", ["Unknown Game"])
if isinstance(game_name, list) and len(game_name) > 0:
game_name = game_name[0]
elif not isinstance(game_name, str):
game_name = "Unknown Game"
if game_name not in games:
games[game_name] = []
name = item_fields.get("Name", "Unknown Item")
rarity = item_fields.get("Rarity", "common")
item_type = item_fields.get("Type", "item")
item_id = item.get("id", "unknown")
# Handle numeric rarity values
if isinstance(rarity, (int, float)):
rarity_display = f"Level {rarity}"
rarity_emoji = "" # Default for numeric rarities
else:
rarity_display = str(rarity).lower()
# Get rarity emoji
rarity_emoji = {
"common": "", "uncommon": "🟢", "rare": "🔵",
"epic": "🟣", "legendary": "🟡", "mythic": "🔴"
}.get(rarity_display, "")
games[game_name].append(f"{rarity_emoji} {name} ({item_type})")
# Format response
response = attribution + f"🎒 *Your Inventory* ({len(detailed_items)} items)\n\n"
for game_name, items in games.items():
response += f"🎮 *{game_name}* ({len(items)} items):\n"
for item in items[:10]: # Limit to 10 items per game to avoid message limits
response += f"{item}\n"
if len(items) > 10:
response += f" • ... and {len(items) - 10} more\n"
response += "\n"
# Truncate if too long (Slack has message limits)
if len(response) > 2900: # Leave room for footer
response = response[:2900] + "...\n\n"
response += "💡 Use `/random-access info item <item_name>` for detailed item info"
await respond(response)
except Exception as e:
logging.error(f"Error in list items command: {e}")
await respond(attribution + "❌ Error retrieving your items. Please try again later.")
async def handle_list_games_command(respond: AsyncRespond, submissions_table, attribution: str):
"""Handle the list games command."""
try:
games = await get_all_games(submissions_table)
if not games:
await respond(
attribution +
"🎮 No games are currently available in the Random Access ecosystem.\n"
"Check back later as more games join the network!"
)
return
response = attribution + f"🎮 *Available Games* ({len(games)} total)\n\n"
for game in games[:20]: # Limit to avoid message size limits
fields = game.get("fields", {})
name = fields.get("Game Name", "Unknown Game")
description = fields.get("Description", "")
game_id = game.get("id", "")
# Truncate description if too long
if description and len(description) > 100:
description = description[:97] + "..."
response += f"• *{name}*"
if description:
response += f"\n {description}"
response += f"\n ID: `{game_id}`\n\n"
if len(games) > 20:
response += f"... and {len(games) - 20} more games!\n\n"
response += "💡 Use `/random-access info game <game_id>` for detailed game info"
await respond(response)
except Exception as e:
logging.error(f"Error in list games command: {e}")
await respond(attribution + "❌ Error retrieving games list. Please try again later.")

0
templates/register.html Normal file
View file

View file

View file

@ -4,8 +4,6 @@ Test script to verify security features are working correctly.
"""
import asyncio
import json
import time
import aiohttp