feat: implement PostgreSQL NOTIFY triggers for real-time queue updates and...
Merge Request
Overview
This MR implements PostgreSQL NOTIFY triggers for real-time queue updates and optimizes database connection handling and session management. It addresses persistent database connection pool exhaustion and session leakage issues by fine-tuning SQLAlchemy pool parameters, ensuring deterministic session closure in middleware, eliminating redundant engine initializations, and consolidating database sessions during WebSocket broadcasts.
What does this MR do and why?
feat: implement PostgreSQL NOTIFY triggers for real-time queue updates and optimize session management in auth and WebSocket layers.
The motivation behind this change was to resolve frequent database connection pool exhaustion under load. Previous implementations leaked sessions in middleware, instantiated duplicate database engines, and opened multiple concurrent connections during WebSocket broadcasts. This caused the application to crash or stall during high-concurrency scenarios. This approach optimizes the SQLAlchemy configuration and unifies connection strategies across the app to resolve these issues.
Changes Made
-
app/core/database.py: Fine-tuned SQLAlchemy pool configurations (pool_size=10,max_overflow=20,pool_use_lifo=True,pool_recycle=1800,pool_timeout=5) for optimal connection reuse, efficient expiry of stale connections, and fail-fast behavior. -
app/core/database_triggers.py: Refactored to utilize the centralized SQLAlchemyengineinstead of instantiating a new one, eliminating redundant connection pools. -
app/main.py: Added database trigger initialization into the FastAPI startup event with graceful error handling to support pg_notify events. -
app/middleware/auth_middleware.py: Replaced the generator-basedget_db()with directSessionLocal()usage, wrapped in atry...finally: db.close()block to guarantee session closure and prevent session leakage. -
app/websockets/socketio_queue_manager.py:- Extracted shared query logic into a
_build_queue_datahelper method. - Refactored
broadcast_queue_updateto open exactly ONE database session shared across all rooms, drastically reducing DB pool pressure (from 3 concurrent connections down to 1 per broadcast).
- Extracted shared query logic into a
Technical Details
- Root Cause: Inefficient database session handling in the auth middleware and SocketIO broadcaster, combined with sub-optimal engine pool settings, led to stale connections, resource leaks, and eventual pool exhaustion.
-
Fix Details:
- We enforced explicit session lifecycle management (
finally: db.close()) in the auth middleware. - Reduced the number of connections required for WebSocket broadcasts by sharing a single DB session across all rooms.
- We adjusted pool parameters to allow burst traffic to quickly close connections (
max_overflow=20),pool_timeout=5to fail fast with aQueuePoolerror instead of stalling, andpool_use_lifo=Trueto prioritize hot connections and allow idle connections to expire naturally viapool_recycle.
- We enforced explicit session lifecycle management (
Type of Change
-
🐛 Bug fix (non-breaking change that fixes an issue) -
✨ New feature (non-breaking change that adds functionality) -
💥 Breaking change (fix or feature that would cause existing functionality to change) -
📝 Documentation update -
♻ ️ Refactor (no functional changes) -
⚡ Performance improvement -
🧪 Test update -
🔧 Configuration change -
🚨 Security fix -
🗑 ️ Deprecation (removing deprecated code)
Related Issues / References
- Resolves database connection pool exhaustion issues.
Screenshots or Screen Recordings
N/A
How to Validate Locally
- Start the application locally (
npm run devor equivalent python server command). - Monitor database connections using PostgreSQL
pg_stat_activity. Verify that the number of active connections remains stable and does not leak over time, even after hitting endpoints that utilize theAuthMiddleware. - Verify that queue updates are successfully broadcasted to connected WebSocket clients across all rooms without errors.
- Try to emit
queue_updateevents via the WebSocket manager and confirm it opens only 1 database session for the entire broadcast instead of 3.
Testing Done
-
API endpoint tests passing -
Local manual testing of websocket functionality
Test Cases Covered:
| Scenario | Expected Result | Status |
|---|---|---|
| Rapid websocket broadcast events emitted | DB connections stay within pool bounds | |
| Concurrent API requests with AuthMiddleware | DB sessions are properly closed and recycled | |
| App restart / cold start | DB triggers initialize gracefully via startup event |
Test Commands Run:
pytest
Code Quality Checklist
Code Standards
-
Code follows project conventions (naming, structure, formatting) -
No debug statements or commented-out code left (unless necessary and intended) -
No unused imports, variables, or functions -
No duplicate code (DRY principle followed) -
Type hints are properly defined (no Anyunless justified and no mypy type check errors) -
Ruff checks pass
Python & FastAPI Best Practices
-
Functions follow single-responsibility principle -
Async/await used correctly (no blocking calls in async functions) -
Dependency injection used appropriately -
Pydantic models used for request/response validation -
SQLAlchemy queries are optimized (no N+1 queries) -
Error handling is comprehensive (try/except with proper logging)
API Design
-
RESTful conventions followed -
Proper HTTP status codes returned -
Input validation implemented -
Authentication/authorization enforced -
Role Base access control used for user restriction -
API documentation (docstrings) updated
Database & Migrations
-
Database migrations created (if schema changed) -
Database migrations version is pointing to the latest version (and version name follows project conventions) -
Migrations are reversible (migrations contain downgrade scripts) -
Indexes added for frequently queried fields -
No raw SQL queries (using SQLAlchemy ORM) -
Data integrity constraints maintained
Security
-
No sensitive data logged (passwords, tokens, PII) -
SQL injection prevention verified (ORM used) -
Input sanitization implemented -
Authentication tokens handled securely -
CORS settings appropriate
Error Handling
-
Errors are caught and handled gracefully -
User-friendly error messages returned -
Errors are logged appropriately -
HTTP error responses follow API standards
Documentation
-
Code comments explain complex logic (not what, but why)
Known Limitations / Technical Debt
- If the DB is temporarily unreachable during the FastAPI startup event, the triggers may not initialize properly, which will suppress live pg_notify events but won't crash the API.
Additional Notes
- Reviewers should focus on the connection pool configurations in
app/core/database.pyand the shared session implementation insocketio_queue_manager.pyas they have the largest performance impact on the system.
MR Acceptance Checklist
Quality & Correctness
-
Code works as intended and solves the stated problem -
No bugs introduced (existing functionality not broken) -
Edge cases handled appropriately
Maintainability
-
Code is readable and well-organized -
Code is testable and well-tested -
Follows project patterns and conventions
Acceptance Review
-
Reviewed by at least 1 teammate -
Reviewed by product owner