Controller-Service-Repository Pattern¶
Overview¶
This document describes the three-layer architecture pattern used in v4 API routes. This pattern provides clear separation of concerns, improved testability, and consistent error handling across all domains.
Architecture Layers¶
Controller (routes-new/v4/*.py)¶
Responsibility: HTTP request/response handling
What it does: - Receives HTTP requests - Extracts and validates request parameters - Calls service methods - Translates domain exceptions to HTTP exceptions - Returns HTTP responses
What it does NOT do: - Business logic - Database access - Transaction management
Example:
@post("/register")
async def register_endpoint(data: EmailRegisterRequest, auth_service: AuthService) -> Response:
try:
user, token = await auth_service.register(data)
return Response(user, status_code=HTTP_201_CREATED)
except EmailAlreadyExistsError as e:
raise CustomHTTPException(detail=e.message, status_code=HTTP_400_BAD_REQUEST)
except RateLimitExceededError as e:
raise CustomHTTPException(detail=e.message, status_code=HTTP_429_TOO_MANY_REQUESTS)
Service (services/*_service.py)¶
Responsibility: Business logic and orchestration
What it does: - Implements business rules - Validates input according to domain rules - Orchestrates repository calls - Manages transaction boundaries - Translates repository exceptions to domain exceptions - Can publish messages to RabbitMQ
What it does NOT do: - Direct SQL queries (uses repository) - HTTP-specific logic (status codes, headers)
Example:
async def register(self, data: EmailRegisterRequest) -> tuple[AuthUserResponse, str]:
# Validation
self.validate_email(data.email)
self.validate_password(data.password)
# Check business rule
if await self._repo.check_email_exists(data.email):
raise EmailAlreadyExistsError(data.email)
# Pre-compute
password_hash = self.hash_password(data.password)
token, token_hash = self.generate_token()
# Transaction
async with self._pool.acquire() as conn:
async with conn.transaction():
user_id = await self._repo.generate_next_user_id(conn=conn)
await self._repo.create_core_user(user_id, data.username, conn=conn)
await self._repo.create_email_auth(user_id, data.email, password_hash, conn=conn)
return user, token
Repository (repository/*_repository.py)¶
Responsibility: Data access
What it does: - Executes SQL queries - Accepts optional connection for transaction participation - Catches asyncpg exceptions and raises repository exceptions - Returns data as dicts or primitives
What it does NOT do: - Business logic - Validation - Transaction management (participates but doesn't manage)
Example:
async def create_email_auth(
self,
user_id: int,
email: str,
password_hash: str,
*,
conn: Connection | None = None,
) -> None:
_conn = self._get_connection(conn)
try:
await _conn.execute(
"INSERT INTO users.email_auth (user_id, email, password_hash) VALUES ($1, $2, $3)",
user_id, email, password_hash
)
except asyncpg.UniqueViolationError as e:
constraint = extract_constraint_name(e)
raise UniqueConstraintViolation(constraint, "users.email_auth", str(e))
Exception Flow¶
Exceptions flow upward through layers, being translated at each level:
Database Error (asyncpg)
↓
Repository Exception (UniqueConstraintViolation, ForeignKeyViolation)
↓
Domain Exception (EmailAlreadyExistsError, InvalidCredentialsError)
↓
HTTP Exception (CustomHTTPException with status code)
Repository Exceptions (repository/exceptions.py)¶
Base: RepositoryError
Subtypes:
- UniqueConstraintViolation - Unique constraint violated
- ForeignKeyViolation - Foreign key constraint violated
- CheckConstraintViolation - Check constraint violated
Domain Exceptions (services/exceptions/<domain>.py)¶
Base: DomainError (from utilities/exceptions.py)
Domain-specific bases (e.g., AuthError, MapError)
Domain-specific errors (e.g., EmailAlreadyExistsError, MapNotFoundError)
HTTP Exceptions¶
CustomHTTPException with appropriate status code
Dependency Injection¶
DI flows: State → Repository → Service → Controller
Provider functions:
# Repository
async def provide_auth_repository(state: State) -> AuthRepository:
return AuthRepository(state.db_pool)
# Service
async def provide_auth_service(state: State, auth_repo: AuthRepository) -> AuthService:
return AuthService(state.db_pool, state, auth_repo)
Router configuration:
router = Router(
path="/v4/auth",
route_handlers=[...],
dependencies={
"auth_repo": Provide(provide_auth_repository),
"auth_service": Provide(provide_auth_service),
},
)
Transaction Management¶
Services manage transactions. Repositories participate via optional conn parameter.
Pattern:
# In service
async with self._pool.acquire() as conn:
async with conn.transaction():
await self._repo.method1(param1, conn=conn)
await self._repo.method2(param2, conn=conn)
# If any repo method raises, transaction rolls back
Base Classes¶
BaseRepository¶
class BaseRepository:
def __init__(self, pool: Pool) -> None:
self._pool = pool
def _get_connection(self, conn: Connection | None = None) -> Connection | Pool:
return conn or self._pool
BaseService¶
class BaseService:
def __init__(self, pool: Pool, state: State) -> None:
self._pool = pool
self._state = state
async def publish_message(self, routing_key: str, data: msgspec.Struct, ...) -> JobStatusResponse:
# RabbitMQ publishing logic
Migrating a New Domain¶
1. Create Domain Exceptions¶
services/exceptions/<domain>.py:
from utilities.exceptions import DomainError
class MapError(DomainError):
"""Base for map domain errors."""
class MapNotFoundError(MapError):
def __init__(self, map_id: int):
super().__init__("Map not found.", map_id=map_id)
2. Create Repository¶
repository/<domain>_repository.py:
from .base import BaseRepository
from .exceptions import UniqueConstraintViolation, extract_constraint_name
class MapRepository(BaseRepository):
async def get_map_by_id(self, map_id: int, *, conn: Connection | None = None) -> dict | None:
_conn = self._get_connection(conn)
row = await _conn.fetchrow("SELECT * FROM maps.maps WHERE id = $1", map_id)
return dict(row) if row else None
3. Create Service¶
services/<domain>_service.py:
from .base import BaseService
from repository.<domain>_repository import MapRepository
from .exceptions.<domain> import MapNotFoundError
class MapService(BaseService):
def __init__(self, pool: Pool, state: State, map_repo: MapRepository):
super().__init__(pool, state)
self._map_repo = map_repo
async def get_map(self, map_id: int) -> MapResponse:
map_data = await self._map_repo.get_map_by_id(map_id)
if not map_data:
raise MapNotFoundError(map_id)
return MapResponse(**map_data)
4. Create Routes¶
routes-new/v4/<domain>.py:
from litestar import Router, get
from services.<domain>_service import MapService
from services.exceptions.<domain> import MapNotFoundError
@get("/{map_id:int}")
async def get_map_endpoint(map_id: int, map_service: MapService) -> Response:
try:
map_data = await map_service.get_map(map_id)
return Response(map_data, status_code=HTTP_200_OK)
except MapNotFoundError as e:
raise CustomHTTPException(detail=e.message, status_code=HTTP_404_NOT_FOUND)
router = Router(
path="/v4/maps",
route_handlers=[get_map_endpoint],
dependencies={
"map_repo": Provide(provide_map_repository),
"map_service": Provide(provide_map_service),
},
)
5. Register Routes¶
In app.py, v4 routes are auto-discovered via routes_new.v4.__init__.py.
Testing Strategy¶
Test Location: All v4 tests live in apps/api/tests-v4/ (separate from v3 tests in apps/api/tests/)
Running Tests:
- just test-api - Runs v3 tests only (apps/api/tests/)
- just test-api-v4 - Runs v4 tests only (apps/api/tests-v4/)
Repository Tests (tests-v4/repository/)¶
Test data access with real database:
async def test_create_email_auth(auth_repo):
user_id = await auth_repo.generate_next_user_id()
await auth_repo.create_core_user(user_id, "testuser")
await auth_repo.create_email_auth(user_id, "[email protected]", "hash")
# Verify in database
async with auth_repo._pool.acquire() as conn:
row = await conn.fetchrow("SELECT * FROM users.email_auth WHERE user_id = $1", user_id)
assert row is not None
Service Tests (tests-v4/services/)¶
Mock repository, test business logic:
@pytest.mark.asyncio
async def test_register_validates_email(auth_service, mock_repo):
mock_repo.check_email_exists = AsyncMock(return_value=False)
with pytest.raises(EmailValidationError):
await auth_service.register(EmailRegisterRequest(
email="invalid",
password="ValidPass1!",
username="test"
))
mock_repo.check_email_exists.assert_not_called()
Route Tests (tests-v4/routes/)¶
Test HTTP layer with real service (integration tests):
@pytest.mark.asyncio
async def test_register_endpoint_returns_201(test_client):
response = await test_client.post("/v4/auth/register", json={
"email": "[email protected]",
"username": "testuser",
"password": "Test123!@#"
})
assert response.status_code == 201
Hybrid Domain Example: Change Requests¶
The change_requests domain demonstrates a simple write-enabled domain:
Characteristics: - Mix of reads and writes - NO validation (data pre-validated by bot) - NO events (notifications handled externally) - NO transactions (single-table operations) - Light business logic (permission check with string comparison)
Code Metrics: - Repository: ~150 lines (6 methods) - Service: ~100 lines (6 methods, 1 with logic) - Controller: ~120 lines (6 endpoints) - Tests: ~300 lines total
Business Logic Example:
# Service handles permission check logic
async def check_permission(self, thread_id: int, user_id: int, code: str) -> bool:
creator_mentions = await self._repo.fetch_creator_mentions(thread_id, code)
if not creator_mentions:
return False
return str(user_id) in creator_mentions # String comparison
Write Pattern:
# Repository raises FK violations
async def create_request(...) -> None:
try:
await _conn.execute(query, ...)
except asyncpg.ForeignKeyViolationError as e:
raise ForeignKeyViolationError(...)
# Service passes through (no translation needed, internal API)
async def create_request(self, data: Request) -> None:
await self._repo.create_request(...)
# Controller returns None with 201 status
async def create_endpoint(self, data: Request, service: Service) -> Response[None]:
await service.create_request(data)
return Response(None, status_code=HTTP_201_CREATED)
Migration effort: ~2 hours for 6 endpoints
Read-Only Domain Example: Community¶
The community domain demonstrates the simplest v4 pattern:
Characteristics: - No validation (query parameters only) - No events (no async operations) - No transactions (single queries) - Service is pure pass-through with SDK conversion
Code Metrics: - Repository: 759 lines (12 methods, all SQL) - Service: 150 lines (12 pass-through methods) - Controller: 248 lines (12 endpoint methods) - Tests: ~200 lines total
Pattern:
# Repository: Return dicts
async def fetch_something(self, *, conn: Connection | None = None) -> list[dict]:
_conn = self._get_connection(conn)
rows = await _conn.fetch(query)
return [dict(row) for row in rows]
# Service: Convert to SDK
async def get_something(self) -> list[SomeResponse]:
rows = await self._repo.fetch_something()
return msgspec.convert(rows, list[SomeResponse])
# Controller: Pass through
async def get_something_endpoint(self, service: Service) -> list[SomeResponse]:
return await service.get_something()
Testing: - Repository tests use real database (verify SQL) - Service tests use mocks (verify conversion) - Route tests use real database (integration)
Migration effort: ~4 hours for 12 endpoints
Key Principles¶
- DRY: Share patterns across domains
- YAGNI: Don't add features until needed
- Explicit over implicit: Clear error handling, no magic
- Testability: Each layer independently testable
- Consistency: Same user-facing errors as v3
Constraint Error Mappings¶
Keep constraint mappings in services for reference:
UNIQUE_CONSTRAINT_MESSAGES = {
"email_auth_email_key": "An account with this email already exists.",
}
FK_CONSTRAINT_MESSAGES = {
"sessions_user_id_fkey": "User does not exist.",
}
These ensure v4 returns same error messages as v3.