How to extend your MCP server with database access

Connecting your MCP server to a real database transforms it from a simple demo into a production-ready AI backend. In this guide, you’ll see exactly how to integrate PostgreSQL into your MCP server and expose it safely to an LLM.
Key takeaways
- You can connect PostgreSQL to your MCP server with asyncpg
- Resources allow LLMs to fetch live structured data
- Tools allow LLMs to insert, update, or delete records securely
- Input validation and transaction management are essential for production use
- Environment configuration keeps your credentials secure
Why connect a database to MCP
Without database access, your LLM is blind to your real application data. By connecting it, you can: Let AI answer questions based on real users, orders, tickets, etc; Automate actions like creating entries or updating records; Build intelligent internal agents without needing separate APIs; Enable context-aware AI responses using your application state; Create AI-powered analytics over your business data. This is the first step toward turning a model into a real application assistant that provides genuine business value.
What you need before starting
- A running PostgreSQL database (v12+)
- A Python MCP server (basic implementation)
- Python 3.10+ with async support
- The
asyncpg
library for database access - The
mcp-server
package (official Python SDK) - Python-dotenv for environment configuration
- Basic knowledge of SQL and async Python
Architecture overview
The architecture involves several components: (1) LLM Client: Claude or another LLM that communicates via the MCP protocol, (2) MCP Server: Your Python server exposing resources and tools, (3) Connection Pool: Manages database connections efficiently, (4) PostgreSQL: The underlying database storing your application data.
This setup follows a clean separation of concerns: Resources provide read-only access for queries, Tools enable controlled write operations, Connection pooling optimizes performance, Environment configuration keeps credentials secure.
Step 1: Install and configure database dependencies
First, install the required packages:
pip install asyncpg python-dotenv mcp-server
Create a project structure:
mcp-db-server/
├── .env # Environment variables (never commit to git)
├── requirements.txt # Dependencies
├── server.py # Main server file
├── database.py # Database connection module
├── resources/ # Database resources
│ ├── __init__.py
│ └── users.py # User-related resources
└── tools/ # Database tools
├── __init__.py
└── users.py # User-related tools
Step 2: Set up environment configuration
Create a .env
file for your database credentials:
DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=your_db_name
DB_HOST=localhost
DB_PORT=5432
Never commit this file to version control. Add it to .gitignore
:
# .gitignore
.env
__pycache__/
*.py[cod]
*$py.class
Create a database.py
file to load these environment variables:
import os
import asyncpg
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Database configuration
DB_CONFIG = {
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"),
"database": os.getenv("DB_NAME"),
"host": os.getenv("DB_HOST", "localhost"),
"port": int(os.getenv("DB_PORT", "5432")),
}
Step 3: Create database connection pooling
Extend your database.py
file to include connection pooling:
import os
import asyncpg
import logging
from dotenv import load_dotenv
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_database")
# Load environment variables
load_dotenv()
# Database configuration
DB_CONFIG = {
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"),
"database": os.getenv("DB_NAME"),
"host": os.getenv("DB_HOST", "localhost"),
"port": int(os.getenv("DB_PORT", "5432")),
}
# Global pool variable
db_pool = None
async def init_db():
"""Initialize the database connection pool."""
global db_pool
try:
db_pool = await asyncpg.create_pool(
**DB_CONFIG,
min_size=1,
max_size=10,
command_timeout=60,
timeout=10, # Connection acquisition timeout
)
logger.info("Database connection pool established")
# Test the connection
async with db_pool.acquire() as connection:
version = await connection.fetchval("SELECT version();")
logger.info(f"Connected to PostgreSQL: {version}")
return db_pool
except Exception as e:
logger.error(f"Failed to create database pool: {e}")
raise
async def close_db():
"""Close the database connection pool."""
global db_pool
if db_pool:
await db_pool.close()
logger.info("Database connection pool closed")
This gives you: A properly configured connection pool, Environment variable usage for security, Logging for monitoring, Pool size optimization, Connection timeout handling, Explicit close function for clean shutdowns.
Step 4: Expose a read-only resource
Create resources/users.py
to expose user data:
from typing import List, Dict, Any, Optional
import logging
from database import db_pool
logger = logging.getLogger("mcp_database.resources.users")
async def fetch_recent_users(limit: int = 20) -> List[Dict[str, Any]]:
"""
Fetch the most recent users from the database.
Args:
limit: Maximum number of users to return (default: 20)
Returns:
List of user objects
"""
try:
if not db_pool:
logger.error("Database pool not initialized")
return {"error": "Database connection not available"}
async with db_pool.acquire() as connection:
# Use a parameterized query for safety
query = """
SELECT id, username, email, created_at
FROM users
ORDER BY created_at DESC
LIMIT $1;
"""
rows = await connection.fetch(query, limit)
# Convert to dictionaries and handle datetime serialization
users = []
for row in rows:
user = dict(row)
# Convert datetime to ISO format string for JSON serialization
if "created_at" in user and user["created_at"]:
user["created_at"] = user["created_at"].isoformat()
users.append(user)
logger.info(f"Fetched {len(users)} recent users")
return users
except Exception as e:
logger.error(f"Error fetching recent users: {e}")
return {"error": f"Database error: {str(e)}"}
Now, update your server.py
to register this resource:
import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db
from resources.users import fetch_recent_users
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_server")
# Create MCP server
server = MCPServer()
# Register the resource
@server.resource(
name="recent_users",
description="Fetch the most recent users from the database."
)
async def recent_users_resource():
return await fetch_recent_users(limit=20)
async def main():
try:
# Initialize the database
await init_db()
# Start the server
logger.info("Starting MCP server...")
await server.start()
except Exception as e:
logger.error(f"Server error: {e}")
finally:
# Close database connections on shutdown
await close_db()
if __name__ == "__main__":
asyncio.run(main())
Step 5: Implement advanced querying capabilities
Create a resource that allows more flexible querying with parameters:
async def fetch_users_by_criteria(
department: Optional[str] = None,
role: Optional[str] = None,
active: Optional[bool] = True,
limit: int = 20
) -> List[Dict[str, Any]]:
"""
Fetch users matching specific criteria.
Args:
department: Filter by department (optional)
role: Filter by role (optional)
active: Filter by active status (default: True)
limit: Maximum results to return (default: 20)
Returns:
List of matching user objects
"""
try:
if not db_pool:
logger.error("Database pool not initialized")
return {"error": "Database connection not available"}
# Build dynamic query
conditions = ["active = $1"]
params = [active]
param_count = 1
if department:
param_count += 1
conditions.append(f"department = ${param_count}")
params.append(department)
if role:
param_count += 1
conditions.append(f"role = ${param_count}")
params.append(role)
# Build the final query
query = f"""
SELECT id, username, email, department, role, created_at
FROM users
WHERE {' AND '.join(conditions)}
ORDER BY created_at DESC
LIMIT ${param_count + 1};
"""
params.append(limit)
async with db_pool.acquire() as connection:
rows = await connection.fetch(query, *params)
# Convert to dictionaries and handle datetime serialization
users = []
for row in rows:
user = dict(row)
if "created_at" in user and user["created_at"]:
user["created_at"] = user["created_at"].isoformat()
users.append(user)
logger.info(f"Fetched {len(users)} users matching criteria")
return users
except Exception as e:
logger.error(f"Error fetching users by criteria: {e}")
return {"error": f"Database error: {str(e)}"}
Register this as a parameterized resource:
@server.resource(
name="users_by_criteria",
description="Fetch users matching specific criteria like department or role."
)
async def users_by_criteria_resource(data: dict):
return await fetch_users_by_criteria(
department=data.get("department"),
role=data.get("role"),
active=data.get("active", True),
limit=data.get("limit", 20)
)
This allows the LLM to request specific subsets of users based on business needs.
Step 6: Create a secure tool to insert new records
Create tools/users.py
for write operations:
from typing import Dict, Any, Optional
from pydantic import BaseModel, EmailStr, Field, validator
import logging
import re
from database import db_pool
logger = logging.getLogger("mcp_database.tools.users")
class CreateUserRequest(BaseModel):
"""Validation model for user creation requests."""
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
department: Optional[str] = "General"
role: Optional[str] = "User"
@validator('username')
def username_alphanumeric(cls, v):
if not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('Username must be alphanumeric')
return v
async def create_user(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Create a new user in the database.
Args:
data: User data containing username, email, etc.
Returns:
Response with status and user info
"""
try:
# Validate the input data with Pydantic
user_data = CreateUserRequest(**data)
if not db_pool:
logger.error("Database pool not initialized")
return {
"status": "error",
"message": "Database connection not available"
}
async with db_pool.acquire() as connection:
# Check if user already exists
existing_user = await connection.fetchrow(
"SELECT id FROM users WHERE username = $1 OR email = $2",
user_data.username,
user_data.email
)
if existing_user:
return {
"status": "error",
"message": "User with this username or email already exists"
}
# Insert the new user
query = """
INSERT INTO users (username, email, department, role)
VALUES ($1, $2, $3, $4)
RETURNING id;
"""
user_id = await connection.fetchval(
query,
user_data.username,
user_data.email,
user_data.department,
user_data.role
)
logger.info(f"Created new user: {user_data.username} (ID: {user_id})")
return {
"status": "success",
"message": f"User {user_data.username} created successfully",
"user_id": user_id
}
except Exception as e:
logger.error(f"Error creating user: {e}")
return {
"status": "error",
"message": f"Failed to create user: {str(e)}"
}
Register this tool in server.py
:
from tools.users import create_user
# ... existing code ...
# Register the tool
@server.tool(
name="create_user",
description="Create a new user in the database."
)
async def create_user_tool(data: dict):
return await create_user(data)
Key security features added: Pydantic validation with clear constraints, Email validation using EmailStr, Username format validation with regex, Duplicate checking before insertion, Clear error messages, Comprehensive logging, Parameterized queries to prevent SQL injection.
Step 7: Transaction management
For operations that require multiple database changes, use transactions:
async def transfer_user_to_department(
user_id: int,
new_department: str
) -> Dict[str, Any]:
"""
Transfer a user to a new department, recording the change in the audit log.
Args:
user_id: ID of the user to transfer
new_department: Name of the target department
Returns:
Status of the operation
"""
try:
if not db_pool:
return {"error": "Database connection not available"}
async with db_pool.acquire() as connection:
# Start a transaction
async with connection.transaction():
# Get current department
current_dept = await connection.fetchval(
"SELECT department FROM users WHERE id = $1",
user_id
)
if not current_dept:
return {"error": "User not found"}
# Update the user's department
await connection.execute(
"UPDATE users SET department = $1 WHERE id = $2",
new_department,
user_id
)
# Record the change in audit log
await connection.execute(
"""
INSERT INTO user_audit_log
(user_id, field_changed, old_value, new_value)
VALUES ($1, $2, $3, $4)
""",
user_id,
"department",
current_dept,
new_department
)
logger.info(f"Transferred user {user_id} from {current_dept} to {new_department}")
return {
"status": "success",
"message": f"User transferred from {current_dept} to {new_department}"
}
except Exception as e:
logger.error(f"Error transferring user: {e}")
return {"error": f"Transfer failed: {str(e)}"}
Register this as a tool:
@server.tool(
name="transfer_user",
description="Transfer a user to a new department."
)
async def transfer_user_tool(data: dict):
user_id = data.get("user_id")
new_department = data.get("new_department")
if not user_id or not new_department:
return {"error": "Missing user_id or new_department"}
return await transfer_user_to_department(user_id, new_department)
This ensures that both operations (update user + add audit log) succeed or fail together.
Step 8: Full server code example
Here’s a complete example bringing everything together:
import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db, db_pool
from resources.users import fetch_recent_users, fetch_users_by_criteria
from tools.users import create_user, transfer_user_to_department
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_server")
# Create MCP server
server = MCPServer(
name="DatabaseMCPServer",
version="1.0.0",
description="MCP server with PostgreSQL database access"
)
# Register resources
@server.resource(
name="recent_users",
description="Fetch the most recent users from the database."
)
async def recent_users_resource():
return await fetch_recent_users(limit=20)
@server.resource(
name="users_by_criteria",
description="Fetch users matching specific criteria like department or role."
)
async def users_by_criteria_resource(data: dict):
return await fetch_users_by_criteria(
department=data.get("department"),
role=data.get("role"),
active=data.get("active", True),
limit=data.get("limit", 20)
)
# Register tools
@server.tool(
name="create_user",
description="Create a new user in the database."
)
async def create_user_tool(data: dict):
return await create_user(data)
@server.tool(
name="transfer_user",
description="Transfer a user to a new department."
)
async def transfer_user_tool(data: dict):
user_id = data.get("user_id")
new_department = data.get("new_department")
if not user_id or not new_department:
return {"error": "Missing user_id or new_department"}
return await transfer_user_to_department(user_id, new_department)
async def main():
try:
# Initialize the database
await init_db()
# Start the server
logger.info("Starting MCP server...")
await server.start()
except Exception as e:
logger.error(f"Server error: {e}")
finally:
# Close database connections on shutdown
await close_db()
if __name__ == "__main__":
asyncio.run(main())
Security considerations
When connecting your MCP server to a database, follow these security best practices:
-
Use a dedicated database user with limited permissions:
CREATE USER mcp_user WITH PASSWORD 'secure_password'; GRANT SELECT, INSERT ON users TO mcp_user; -- Grant only what's necessary
-
Validate all inputs using Pydantic models with strict validation rules.
-
Use parameterized queries exclusively to prevent SQL injection.
-
Implement authentication for your MCP server:
@server.middleware async def auth_middleware(request, next_handler): token = request.headers.get("Authorization") if not token or not verify_token(token): return {"error": "Unauthorized"} return await next_handler(request)
-
Implement rate limiting to prevent abuse:
# Simple in-memory rate limiter request_counts = {} @server.middleware async def rate_limit_middleware(request, next_handler): client_id = request.client.host current_time = time.time() # Clear old entries request_counts = {k: v for k, v in request_counts.items() if v["timestamp"] > current_time - 60} if client_id in request_counts: if request_counts[client_id]["count"] > 100: # 100 requests per minute return {"error": "Rate limit exceeded"} request_counts[client_id]["count"] += 1 else: request_counts[client_id] = {"count": 1, "timestamp": current_time} return await next_handler(request)
-
Implement request logging for auditing:
@server.middleware async def logging_middleware(request, next_handler): logger.info(f"Request: {request.method} {request.path} from {request.client.host}") response = await next_handler(request) return response
-
Set up database query timeouts to prevent long-running queries from affecting server performance.
Performance optimization
-
Connection pooling is already implemented, but tune the parameters based on your workload:
db_pool = await asyncpg.create_pool( **DB_CONFIG, min_size=5, # Set higher for high-load scenarios max_size=20, # Adjust based on your database server capacity statement_cache_size=100, # Cache prepared statements max_inactive_connection_lifetime=300 # Seconds before recycling idle connections )
-
Create database indexes for frequently queried fields:
CREATE INDEX idx_users_department ON users(department); CREATE INDEX idx_users_created_at ON users(created_at DESC);
-
Implement result caching for frequently accessed, rarely changing data:
from functools import lru_cache from datetime import datetime, timedelta # Cache that expires after 5 minutes cache_time = None cached_result = None async def fetch_departments_with_caching(): global cache_time, cached_result # Check if cache is valid current_time = datetime.now() if cache_time and cached_result and current_time - cache_time < timedelta(minutes=5): return cached_result # Cache miss - fetch from database async with db_pool.acquire() as connection: result = await connection.fetch("SELECT * FROM departments") # Update cache cache_time = current_time cached_result = [dict(row) for row in result] return cached_result
-
Use JSONB for complex data that doesn’t need to be queried extensively:
CREATE TABLE user_preferences ( user_id INTEGER PRIMARY KEY, preferences JSONB NOT NULL );
-
Pagination for large result sets:
async def fetch_paginated_users(page: int = 1, page_size: int = 20): offset = (page - 1) * page_size async with db_pool.acquire() as connection: # Get total count total = await connection.fetchval("SELECT COUNT(*) FROM users") # Get paginated results rows = await connection.fetch( "SELECT * FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2", page_size, offset ) return { "users": [dict(row) for row in rows], "pagination": { "total": total, "page": page, "page_size": page_size, "pages": (total + page_size - 1) // page_size } }
Testing database interactions
Create a tests
directory with a test file:
# tests/test_database.py
import asyncio
import pytest
import os
from dotenv import load_dotenv
import asyncpg
# Load test environment variables
load_dotenv(".env.test")
# Testing database configuration
TEST_DB_CONFIG = {
"user": os.getenv("TEST_DB_USER"),
"password": os.getenv("TEST_DB_PASSWORD"),
"database": os.getenv("TEST_DB_NAME"),
"host": os.getenv("TEST_DB_HOST", "localhost"),
"port": int(os.getenv("TEST_DB_PORT", "5432")),
}
@pytest.fixture
async def db_pool():
"""Create a test database pool."""
pool = await asyncpg.create_pool(**TEST_DB_CONFIG)
yield pool
await pool.close()
@pytest.fixture
async def setup_test_data(db_pool):
"""Set up test data before tests and clean up after."""
async with db_pool.acquire() as conn:
# Create test tables
await conn.execute("""
CREATE TEMPORARY TABLE users (
id SERIAL PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
department TEXT NOT NULL DEFAULT 'General',
role TEXT NOT NULL DEFAULT 'User',
active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
)
""")
# Insert test users
await conn.executemany(
"""
INSERT INTO users (username, email, department, role)
VALUES ($1, $2, $3, $4)
""",
[
("test1", "test1@example.com", "Engineering", "Developer"),
("test2", "test2@example.com", "Marketing", "Manager"),
("test3", "test3@example.com", "Sales", "Representative")
]
)
yield
# Cleanup happens automatically for temporary tables
@pytest.mark.asyncio
async def test_fetch_recent_users(db_pool, setup_test_data):
"""Test that fetching recent users works correctly."""
from resources.users import fetch_recent_users
# Monkeypatch db_pool
import database
database.db_pool = db_pool
# Execute the function
result = await fetch_recent_users(limit=10)
# Assertions
assert isinstance(result, list)
assert len(result) == 3
assert result[0]["username"] in ["test1", "test2", "test3"]
assert "email" in result[0]
assert "department" in result[0]
@pytest.mark.asyncio
async def test_create_user(db_pool, setup_test_data):
"""Test that creating a user works correctly."""
from tools.users import create_user
# Monkeypatch db_pool
import database
database.db_pool = db_pool
# Test data
user_data = {
"username": "newuser",
"email": "newuser@example.com",
"department": "Finance",
"role": "Analyst"
}
# Execute the function
result = await create_user(user_data)
# Assertions
assert result["status"] == "success"
# Verify in database
async with db_pool.acquire() as conn:
user = await conn.fetchrow(
"SELECT * FROM users WHERE username = $1",
"newuser"
)
assert user is not None
assert user["email"] == "newuser@example.com"
assert user["department"] == "Finance"
Run tests with: pytest -xvs tests/
Deployment considerations
-
Use environment-specific configuration files:
.env.development
,.env.staging
,.env.production
-
Set up database migrations for schema changes:
pip install alembic alembic init migrations
-
Deploy with Docker for consistency:
FROM python:3.10-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD ["python", "server.py"]
-
Set up health checks:
@server.route("/health") async def health_check(): try: async with db_pool.acquire() as connection: await connection.fetchval("SELECT 1") return {"status": "healthy", "database": "connected"} except Exception as e: return {"status": "unhealthy", "error": str(e)}
-
Monitor query performance:
-- In PostgreSQL CREATE EXTENSION pg_stat_statements; -- To analyze slow queries SELECT query, calls, total_time, mean_time FROM pg_stat_statements ORDER BY mean_time DESC LIMIT 10;
-
Set up database backups:
# Daily backup script pg_dump -U user -d database -F c -f /backups/db_$(date +%Y%m%d).dump
Conclusion
Connecting your MCP server to a PostgreSQL database transforms it from a simple demo into a production-ready AI backend. With careful attention to security, performance, and reliability, you can safely expose your database to language models, enabling powerful AI-driven workflows. Key principles to remember: Resources for read operations, Tools for write operations, Validation for all inputs, Transactions for consistency, Connection pooling for performance, Environment variables for security, Structured error handling for reliability. By following these practices, you enable LLMs to work alongside your existing database applications in a safe, controlled way — unlocking new possibilities for AI-assisted workflows and data analysis. In the next guide, we’ll show how to connect your existing API backend via MCP without rewriting business logic.
FAQs
Yes. You can create multiple resource and tool handlers, each connecting to a different database or schema.
You should add try/except handling to return a clear error to the LLM instead of crashing.
Expose only selected read or write operations via resources and tools. Never give the LLM unrestricted database access.