Back

How to extend your MCP server with database access

How to extend your MCP server with database access

Connecting your MCP server to a real database transforms it from a simple demo into a production-ready AI backend. In this guide, you’ll see exactly how to integrate PostgreSQL into your MCP server and expose it safely to an LLM.

Key takeaways

  • You can connect PostgreSQL to your MCP server with asyncpg
  • Resources allow LLMs to fetch live structured data
  • Tools allow LLMs to insert, update, or delete records securely
  • Input validation and transaction management are essential for production use
  • Environment configuration keeps your credentials secure

Why connect a database to MCP

Without database access, your LLM is blind to your real application data. By connecting it, you can: Let AI answer questions based on real users, orders, tickets, etc; Automate actions like creating entries or updating records; Build intelligent internal agents without needing separate APIs; Enable context-aware AI responses using your application state; Create AI-powered analytics over your business data. This is the first step toward turning a model into a real application assistant that provides genuine business value.

What you need before starting

  • A running PostgreSQL database (v12+)
  • A Python MCP server (basic implementation)
  • Python 3.10+ with async support
  • The asyncpg library for database access
  • The mcp-server package (official Python SDK)
  • Python-dotenv for environment configuration
  • Basic knowledge of SQL and async Python

Architecture overview

The architecture involves several components: (1) LLM Client: Claude or another LLM that communicates via the MCP protocol, (2) MCP Server: Your Python server exposing resources and tools, (3) Connection Pool: Manages database connections efficiently, (4) PostgreSQL: The underlying database storing your application data.

This setup follows a clean separation of concerns: Resources provide read-only access for queries, Tools enable controlled write operations, Connection pooling optimizes performance, Environment configuration keeps credentials secure.

Step 1: Install and configure database dependencies

First, install the required packages:

pip install asyncpg python-dotenv mcp-server

Create a project structure:

mcp-db-server/
├── .env                  # Environment variables (never commit to git)
├── requirements.txt      # Dependencies
├── server.py             # Main server file
├── database.py           # Database connection module
├── resources/            # Database resources
│   ├── __init__.py
│   └── users.py          # User-related resources
└── tools/                # Database tools
    ├── __init__.py
    └── users.py          # User-related tools

Step 2: Set up environment configuration

Create a .env file for your database credentials:

DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=your_db_name
DB_HOST=localhost
DB_PORT=5432

Never commit this file to version control. Add it to .gitignore:

# .gitignore
.env
__pycache__/
*.py[cod]
*$py.class

Create a database.py file to load these environment variables:

import os
import asyncpg
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Database configuration
DB_CONFIG = {
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "host": os.getenv("DB_HOST", "localhost"),
    "port": int(os.getenv("DB_PORT", "5432")),
}

Step 3: Create database connection pooling

Extend your database.py file to include connection pooling:

import os
import asyncpg
import logging
from dotenv import load_dotenv

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_database")

# Load environment variables
load_dotenv()

# Database configuration
DB_CONFIG = {
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "host": os.getenv("DB_HOST", "localhost"),
    "port": int(os.getenv("DB_PORT", "5432")),
}

# Global pool variable
db_pool = None

async def init_db():
    """Initialize the database connection pool."""
    global db_pool
    try:
        db_pool = await asyncpg.create_pool(
            **DB_CONFIG,
            min_size=1,
            max_size=10,
            command_timeout=60,
            timeout=10,  # Connection acquisition timeout
        )
        logger.info("Database connection pool established")
        # Test the connection
        async with db_pool.acquire() as connection:
            version = await connection.fetchval("SELECT version();")
            logger.info(f"Connected to PostgreSQL: {version}")
        return db_pool
    except Exception as e:
        logger.error(f"Failed to create database pool: {e}")
        raise

async def close_db():
    """Close the database connection pool."""
    global db_pool
    if db_pool:
        await db_pool.close()
        logger.info("Database connection pool closed")

This gives you: A properly configured connection pool, Environment variable usage for security, Logging for monitoring, Pool size optimization, Connection timeout handling, Explicit close function for clean shutdowns.

Step 4: Expose a read-only resource

Create resources/users.py to expose user data:

from typing import List, Dict, Any, Optional
import logging
from database import db_pool

logger = logging.getLogger("mcp_database.resources.users")

async def fetch_recent_users(limit: int = 20) -> List[Dict[str, Any]]:
    """
    Fetch the most recent users from the database.
    
    Args:
        limit: Maximum number of users to return (default: 20)
        
    Returns:
        List of user objects
    """
    try:
        if not db_pool:
            logger.error("Database pool not initialized")
            return {"error": "Database connection not available"}
            
        async with db_pool.acquire() as connection:
            # Use a parameterized query for safety
            query = """
                SELECT id, username, email, created_at 
                FROM users 
                ORDER BY created_at DESC 
                LIMIT $1;
            """
            rows = await connection.fetch(query, limit)
            
            # Convert to dictionaries and handle datetime serialization
            users = []
            for row in rows:
                user = dict(row)
                # Convert datetime to ISO format string for JSON serialization
                if "created_at" in user and user["created_at"]:
                    user["created_at"] = user["created_at"].isoformat()
                users.append(user)
                
            logger.info(f"Fetched {len(users)} recent users")
            return users
    except Exception as e:
        logger.error(f"Error fetching recent users: {e}")
        return {"error": f"Database error: {str(e)}"}

Now, update your server.py to register this resource:

import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db
from resources.users import fetch_recent_users

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_server")

# Create MCP server
server = MCPServer()

# Register the resource
@server.resource(
    name="recent_users", 
    description="Fetch the most recent users from the database."
)
async def recent_users_resource():
    return await fetch_recent_users(limit=20)

async def main():
    try:
        # Initialize the database
        await init_db()
        
        # Start the server
        logger.info("Starting MCP server...")
        await server.start()
    except Exception as e:
        logger.error(f"Server error: {e}")
    finally:
        # Close database connections on shutdown
        await close_db()

if __name__ == "__main__":
    asyncio.run(main())

Step 5: Implement advanced querying capabilities

Create a resource that allows more flexible querying with parameters:

async def fetch_users_by_criteria(
    department: Optional[str] = None,
    role: Optional[str] = None,
    active: Optional[bool] = True,
    limit: int = 20
) -> List[Dict[str, Any]]:
    """
    Fetch users matching specific criteria.
    
    Args:
        department: Filter by department (optional)
        role: Filter by role (optional)
        active: Filter by active status (default: True)
        limit: Maximum results to return (default: 20)
        
    Returns:
        List of matching user objects
    """
    try:
        if not db_pool:
            logger.error("Database pool not initialized")
            return {"error": "Database connection not available"}
            
        # Build dynamic query
        conditions = ["active = $1"]
        params = [active]
        param_count = 1
        
        if department:
            param_count += 1
            conditions.append(f"department = ${param_count}")
            params.append(department)
            
        if role:
            param_count += 1
            conditions.append(f"role = ${param_count}")
            params.append(role)
            
        # Build the final query
        query = f"""
            SELECT id, username, email, department, role, created_at 
            FROM users 
            WHERE {' AND '.join(conditions)}
            ORDER BY created_at DESC 
            LIMIT ${param_count + 1};
        """
        params.append(limit)
        
        async with db_pool.acquire() as connection:
            rows = await connection.fetch(query, *params)
            
            # Convert to dictionaries and handle datetime serialization
            users = []
            for row in rows:
                user = dict(row)
                if "created_at" in user and user["created_at"]:
                    user["created_at"] = user["created_at"].isoformat()
                users.append(user)
                
            logger.info(f"Fetched {len(users)} users matching criteria")
            return users
    except Exception as e:
        logger.error(f"Error fetching users by criteria: {e}")
        return {"error": f"Database error: {str(e)}"}

Register this as a parameterized resource:

@server.resource(
    name="users_by_criteria", 
    description="Fetch users matching specific criteria like department or role."
)
async def users_by_criteria_resource(data: dict):
    return await fetch_users_by_criteria(
        department=data.get("department"),
        role=data.get("role"),
        active=data.get("active", True),
        limit=data.get("limit", 20)
    )

This allows the LLM to request specific subsets of users based on business needs.

Step 6: Create a secure tool to insert new records

Create tools/users.py for write operations:

from typing import Dict, Any, Optional
from pydantic import BaseModel, EmailStr, Field, validator
import logging
import re
from database import db_pool

logger = logging.getLogger("mcp_database.tools.users")

class CreateUserRequest(BaseModel):
    """Validation model for user creation requests."""
    username: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    department: Optional[str] = "General"
    role: Optional[str] = "User"
    
    @validator('username')
    def username_alphanumeric(cls, v):
        if not re.match(r'^[a-zA-Z0-9_]+$', v):
            raise ValueError('Username must be alphanumeric')
        return v

async def create_user(data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Create a new user in the database.
    
    Args:
        data: User data containing username, email, etc.
        
    Returns:
        Response with status and user info
    """
    try:
        # Validate the input data with Pydantic
        user_data = CreateUserRequest(**data)
        
        if not db_pool:
            logger.error("Database pool not initialized")
            return {
                "status": "error",
                "message": "Database connection not available"
            }
        
        async with db_pool.acquire() as connection:
            # Check if user already exists
            existing_user = await connection.fetchrow(
                "SELECT id FROM users WHERE username = $1 OR email = $2",
                user_data.username,
                user_data.email
            )
            
            if existing_user:
                return {
                    "status": "error",
                    "message": "User with this username or email already exists"
                }
            
            # Insert the new user
            query = """
                INSERT INTO users (username, email, department, role) 
                VALUES ($1, $2, $3, $4)
                RETURNING id;
            """
            
            user_id = await connection.fetchval(
                query,
                user_data.username,
                user_data.email,
                user_data.department,
                user_data.role
            )
            
            logger.info(f"Created new user: {user_data.username} (ID: {user_id})")
            
            return {
                "status": "success",
                "message": f"User {user_data.username} created successfully",
                "user_id": user_id
            }
    except Exception as e:
        logger.error(f"Error creating user: {e}")
        return {
            "status": "error",
            "message": f"Failed to create user: {str(e)}"
        }

Register this tool in server.py:

from tools.users import create_user

# ... existing code ...

# Register the tool
@server.tool(
    name="create_user",
    description="Create a new user in the database."
)
async def create_user_tool(data: dict):
    return await create_user(data)

Key security features added: Pydantic validation with clear constraints, Email validation using EmailStr, Username format validation with regex, Duplicate checking before insertion, Clear error messages, Comprehensive logging, Parameterized queries to prevent SQL injection.

Step 7: Transaction management

For operations that require multiple database changes, use transactions:

async def transfer_user_to_department(
    user_id: int,
    new_department: str
) -> Dict[str, Any]:
    """
    Transfer a user to a new department, recording the change in the audit log.
    
    Args:
        user_id: ID of the user to transfer
        new_department: Name of the target department
        
    Returns:
        Status of the operation
    """
    try:
        if not db_pool:
            return {"error": "Database connection not available"}
            
        async with db_pool.acquire() as connection:
            # Start a transaction
            async with connection.transaction():
                # Get current department
                current_dept = await connection.fetchval(
                    "SELECT department FROM users WHERE id = $1",
                    user_id
                )
                
                if not current_dept:
                    return {"error": "User not found"}
                
                # Update the user's department
                await connection.execute(
                    "UPDATE users SET department = $1 WHERE id = $2",
                    new_department,
                    user_id
                )
                
                # Record the change in audit log
                await connection.execute(
                    """
                    INSERT INTO user_audit_log 
                    (user_id, field_changed, old_value, new_value) 
                    VALUES ($1, $2, $3, $4)
                    """,
                    user_id,
                    "department",
                    current_dept,
                    new_department
                )
                
                logger.info(f"Transferred user {user_id} from {current_dept} to {new_department}")
                
                return {
                    "status": "success",
                    "message": f"User transferred from {current_dept} to {new_department}"
                }
    except Exception as e:
        logger.error(f"Error transferring user: {e}")
        return {"error": f"Transfer failed: {str(e)}"}

Register this as a tool:

@server.tool(
    name="transfer_user",
    description="Transfer a user to a new department."
)
async def transfer_user_tool(data: dict):
    user_id = data.get("user_id")
    new_department = data.get("new_department")
    
    if not user_id or not new_department:
        return {"error": "Missing user_id or new_department"}
        
    return await transfer_user_to_department(user_id, new_department)

This ensures that both operations (update user + add audit log) succeed or fail together.

Step 8: Full server code example

Here’s a complete example bringing everything together:

import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db, db_pool
from resources.users import fetch_recent_users, fetch_users_by_criteria
from tools.users import create_user, transfer_user_to_department

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_server")

# Create MCP server
server = MCPServer(
    name="DatabaseMCPServer",
    version="1.0.0",
    description="MCP server with PostgreSQL database access"
)

# Register resources
@server.resource(
    name="recent_users", 
    description="Fetch the most recent users from the database."
)
async def recent_users_resource():
    return await fetch_recent_users(limit=20)

@server.resource(
    name="users_by_criteria", 
    description="Fetch users matching specific criteria like department or role."
)
async def users_by_criteria_resource(data: dict):
    return await fetch_users_by_criteria(
        department=data.get("department"),
        role=data.get("role"),
        active=data.get("active", True),
        limit=data.get("limit", 20)
    )

# Register tools
@server.tool(
    name="create_user",
    description="Create a new user in the database."
)
async def create_user_tool(data: dict):
    return await create_user(data)

@server.tool(
    name="transfer_user",
    description="Transfer a user to a new department."
)
async def transfer_user_tool(data: dict):
    user_id = data.get("user_id")
    new_department = data.get("new_department")
    
    if not user_id or not new_department:
        return {"error": "Missing user_id or new_department"}
        
    return await transfer_user_to_department(user_id, new_department)

async def main():
    try:
        # Initialize the database
        await init_db()
        
        # Start the server
        logger.info("Starting MCP server...")
        await server.start()
    except Exception as e:
        logger.error(f"Server error: {e}")
    finally:
        # Close database connections on shutdown
        await close_db()

if __name__ == "__main__":
    asyncio.run(main())

Security considerations

When connecting your MCP server to a database, follow these security best practices:

  1. Use a dedicated database user with limited permissions:

    CREATE USER mcp_user WITH PASSWORD 'secure_password';
    GRANT SELECT, INSERT ON users TO mcp_user;
    -- Grant only what's necessary
    
  2. Validate all inputs using Pydantic models with strict validation rules.

  3. Use parameterized queries exclusively to prevent SQL injection.

  4. Implement authentication for your MCP server:

    @server.middleware
    async def auth_middleware(request, next_handler):
        token = request.headers.get("Authorization")
        if not token or not verify_token(token):
            return {"error": "Unauthorized"}
        return await next_handler(request)
    
  5. Implement rate limiting to prevent abuse:

    # Simple in-memory rate limiter
    request_counts = {}
    
    @server.middleware
    async def rate_limit_middleware(request, next_handler):
        client_id = request.client.host
        current_time = time.time()
        
        # Clear old entries
        request_counts = {k: v for k, v in request_counts.items() 
                         if v["timestamp"] > current_time - 60}
        
        if client_id in request_counts:
            if request_counts[client_id]["count"] > 100:  # 100 requests per minute
                return {"error": "Rate limit exceeded"}
            request_counts[client_id]["count"] += 1
        else:
            request_counts[client_id] = {"count": 1, "timestamp": current_time}
            
        return await next_handler(request)
    
  6. Implement request logging for auditing:

    @server.middleware
    async def logging_middleware(request, next_handler):
        logger.info(f"Request: {request.method} {request.path} from {request.client.host}")
        response = await next_handler(request)
        return response
    
  7. Set up database query timeouts to prevent long-running queries from affecting server performance.

Performance optimization

  1. Connection pooling is already implemented, but tune the parameters based on your workload:

    db_pool = await asyncpg.create_pool(
        **DB_CONFIG,
        min_size=5,       # Set higher for high-load scenarios
        max_size=20,      # Adjust based on your database server capacity
        statement_cache_size=100,  # Cache prepared statements
        max_inactive_connection_lifetime=300  # Seconds before recycling idle connections
    )
    
  2. Create database indexes for frequently queried fields:

    CREATE INDEX idx_users_department ON users(department);
    CREATE INDEX idx_users_created_at ON users(created_at DESC);
    
  3. Implement result caching for frequently accessed, rarely changing data:

    from functools import lru_cache
    from datetime import datetime, timedelta
    
    # Cache that expires after 5 minutes
    cache_time = None
    cached_result = None
    
    async def fetch_departments_with_caching():
        global cache_time, cached_result
        
        # Check if cache is valid
        current_time = datetime.now()
        if cache_time and cached_result and current_time - cache_time < timedelta(minutes=5):
            return cached_result
            
        # Cache miss - fetch from database
        async with db_pool.acquire() as connection:
            result = await connection.fetch("SELECT * FROM departments")
            
        # Update cache
        cache_time = current_time
        cached_result = [dict(row) for row in result]
        
        return cached_result
    
  4. Use JSONB for complex data that doesn’t need to be queried extensively:

    CREATE TABLE user_preferences (
        user_id INTEGER PRIMARY KEY,
        preferences JSONB NOT NULL
    );
    
  5. Pagination for large result sets:

    async def fetch_paginated_users(page: int = 1, page_size: int = 20):
        offset = (page - 1) * page_size
        
        async with db_pool.acquire() as connection:
            # Get total count
            total = await connection.fetchval("SELECT COUNT(*) FROM users")
            
            # Get paginated results
            rows = await connection.fetch(
                "SELECT * FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2",
                page_size, offset
            )
            
            return {
                "users": [dict(row) for row in rows],
                "pagination": {
                    "total": total,
                    "page": page,
                    "page_size": page_size,
                    "pages": (total + page_size - 1) // page_size
                }
            }
    

Testing database interactions

Create a tests directory with a test file:

# tests/test_database.py
import asyncio
import pytest
import os
from dotenv import load_dotenv
import asyncpg

# Load test environment variables
load_dotenv(".env.test")

# Testing database configuration
TEST_DB_CONFIG = {
    "user": os.getenv("TEST_DB_USER"),
    "password": os.getenv("TEST_DB_PASSWORD"),
    "database": os.getenv("TEST_DB_NAME"),
    "host": os.getenv("TEST_DB_HOST", "localhost"),
    "port": int(os.getenv("TEST_DB_PORT", "5432")),
}

@pytest.fixture
async def db_pool():
    """Create a test database pool."""
    pool = await asyncpg.create_pool(**TEST_DB_CONFIG)
    yield pool
    await pool.close()

@pytest.fixture
async def setup_test_data(db_pool):
    """Set up test data before tests and clean up after."""
    async with db_pool.acquire() as conn:
        # Create test tables
        await conn.execute("""
            CREATE TEMPORARY TABLE users (
                id SERIAL PRIMARY KEY,
                username TEXT UNIQUE NOT NULL,
                email TEXT UNIQUE NOT NULL,
                department TEXT NOT NULL DEFAULT 'General',
                role TEXT NOT NULL DEFAULT 'User',
                active BOOLEAN NOT NULL DEFAULT TRUE,
                created_at TIMESTAMP NOT NULL DEFAULT NOW()
            )
        """)
        
        # Insert test users
        await conn.executemany(
            """
            INSERT INTO users (username, email, department, role) 
            VALUES ($1, $2, $3, $4)
            """,
            [
                ("test1", "test1@example.com", "Engineering", "Developer"),
                ("test2", "test2@example.com", "Marketing", "Manager"),
                ("test3", "test3@example.com", "Sales", "Representative")
            ]
        )
    
    yield
    
    # Cleanup happens automatically for temporary tables

@pytest.mark.asyncio
async def test_fetch_recent_users(db_pool, setup_test_data):
    """Test that fetching recent users works correctly."""
    from resources.users import fetch_recent_users
    
    # Monkeypatch db_pool
    import database
    database.db_pool = db_pool
    
    # Execute the function
    result = await fetch_recent_users(limit=10)
    
    # Assertions
    assert isinstance(result, list)
    assert len(result) == 3
    assert result[0]["username"] in ["test1", "test2", "test3"]
    assert "email" in result[0]
    assert "department" in result[0]

@pytest.mark.asyncio
async def test_create_user(db_pool, setup_test_data):
    """Test that creating a user works correctly."""
    from tools.users import create_user
    
    # Monkeypatch db_pool
    import database
    database.db_pool = db_pool
    
    # Test data
    user_data = {
        "username": "newuser",
        "email": "newuser@example.com",
        "department": "Finance",
        "role": "Analyst"
    }
    
    # Execute the function
    result = await create_user(user_data)
    
    # Assertions
    assert result["status"] == "success"
    
    # Verify in database
    async with db_pool.acquire() as conn:
        user = await conn.fetchrow(
            "SELECT * FROM users WHERE username = $1",
            "newuser"
        )
        
    assert user is not None
    assert user["email"] == "newuser@example.com"
    assert user["department"] == "Finance"

Run tests with: pytest -xvs tests/

Deployment considerations

  1. Use environment-specific configuration files: .env.development, .env.staging, .env.production

  2. Set up database migrations for schema changes:

    pip install alembic
    alembic init migrations
    
  3. Deploy with Docker for consistency:

    FROM python:3.10-slim
    
    WORKDIR /app
    
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    
    COPY . .
    
    CMD ["python", "server.py"]
    
  4. Set up health checks:

    @server.route("/health")
    async def health_check():
        try:
            async with db_pool.acquire() as connection:
                await connection.fetchval("SELECT 1")
            return {"status": "healthy", "database": "connected"}
        except Exception as e:
            return {"status": "unhealthy", "error": str(e)}
    
  5. Monitor query performance:

    -- In PostgreSQL
    CREATE EXTENSION pg_stat_statements;
    
    -- To analyze slow queries
    SELECT query, calls, total_time, mean_time
    FROM pg_stat_statements
    ORDER BY mean_time DESC
    LIMIT 10;
    
  6. Set up database backups:

    # Daily backup script
    pg_dump -U user -d database -F c -f /backups/db_$(date +%Y%m%d).dump
    

Conclusion

Connecting your MCP server to a PostgreSQL database transforms it from a simple demo into a production-ready AI backend. With careful attention to security, performance, and reliability, you can safely expose your database to language models, enabling powerful AI-driven workflows. Key principles to remember: Resources for read operations, Tools for write operations, Validation for all inputs, Transactions for consistency, Connection pooling for performance, Environment variables for security, Structured error handling for reliability. By following these practices, you enable LLMs to work alongside your existing database applications in a safe, controlled way — unlocking new possibilities for AI-assisted workflows and data analysis. In the next guide, we’ll show how to connect your existing API backend via MCP without rewriting business logic.

FAQs

Yes. You can create multiple resource and tool handlers, each connecting to a different database or schema.

You should add try/except handling to return a clear error to the LLM instead of crashing.

Expose only selected read or write operations via resources and tools. Never give the LLM unrestricted database access.

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers