Claude
Skills
Sign in
Back

psycopg

Included with Lifetime
$97 forever

PostgreSQL adapter for Python - customer support tech enablement for database operations, query optimization, and data management

databasepostgresqldatabasepsycopg3asyncconnection-poolingdata-migrationcustomer-supportbackend

What this skill does


# Psycopg Skill - Customer Support Tech Enablement

## Overview

This skill provides comprehensive guidance on using **psycopg3** (and psycopg2 where applicable) for PostgreSQL database operations in customer support contexts. It covers everything from basic connections to advanced features like connection pooling, async operations, bulk data transfers, and performance optimization.

## Target Audience

- Customer Support Engineers working with backend systems
- Technical Support teams handling data migrations
- Support teams analyzing customer data and usage patterns
- Backend developers building support tools and dashboards
- QA engineers writing integration tests for PostgreSQL-backed applications

## Core Competencies

When using this skill, you will be able to:

1. **Establish robust database connections** with proper error handling and retry logic
2. **Implement connection pooling** for high-performance multi-threaded applications
3. **Execute async database operations** for non-blocking I/O in modern Python applications
4. **Perform bulk data operations** efficiently using COPY and batch insert techniques
5. **Write secure parameterized queries** to prevent SQL injection
6. **Handle transactions** with proper commit/rollback semantics
7. **Optimize query performance** using prepared statements and binary protocols
8. **Work with JSON/JSONB data** for flexible customer data storage
9. **Implement retry patterns** for resilient database connections
10. **Test database code** effectively using pytest and fixtures

## Installation and Setup

### Installing Psycopg3

For new projects, always use psycopg3:

```bash
# Basic installation (pure Python implementation)
pip install psycopg

# With binary package for better performance
pip install "psycopg[binary]"

# With connection pool support
pip install "psycopg[pool]"

# Complete installation with all extras
pip install "psycopg[binary,pool]"

# For C implementation (fastest, requires build tools)
pip install "psycopg[c]"
```

### Installing Psycopg2 (Legacy Projects)

If you're maintaining legacy code:

```bash
# Binary package (recommended for most cases)
pip install psycopg2-binary

# Source package (requires PostgreSQL dev libraries)
pip install psycopg2
```

### Environment Setup for Customer Support

Create a `.env` file for database credentials:

```bash
# Production support database (read-only)
SUPPORT_DB_HOST=support-db.company.com
SUPPORT_DB_PORT=5432
SUPPORT_DB_NAME=support_analytics
SUPPORT_DB_USER=support_readonly
SUPPORT_DB_PASSWORD=secure_password_here

# Development/testing database
DEV_DB_HOST=localhost
DEV_DB_PORT=5432
DEV_DB_NAME=support_dev
DEV_DB_USER=dev_user
DEV_DB_PASSWORD=dev_password
```

Load with python-dotenv:

```python
from dotenv import load_dotenv
import os

load_dotenv()

DB_CONFIG = {
    'host': os.getenv('SUPPORT_DB_HOST'),
    'port': os.getenv('SUPPORT_DB_PORT'),
    'dbname': os.getenv('SUPPORT_DB_NAME'),
    'user': os.getenv('SUPPORT_DB_USER'),
    'password': os.getenv('SUPPORT_DB_PASSWORD'),
}
```

## Connection Management

### Basic Connection (Psycopg3)

```python
import psycopg

# Using connection string
conn = psycopg.connect(
    "postgresql://user:password@localhost:5432/support_db"
)

# Using parameters (recommended for flexibility)
conn = psycopg.connect(
    host="localhost",
    port=5432,
    dbname="support_db",
    user="support_user",
    password="secure_password",
    autocommit=False,  # Explicit transaction management
    connect_timeout=10  # Timeout in seconds
)

# Always use context manager for automatic cleanup
with psycopg.connect(**DB_CONFIG) as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT version()")
        print(cur.fetchone()[0])
# Connection automatically closed
```

### Connection with Retry Logic

For resilient customer support applications:

```python
import psycopg
import time
import logging
from typing import Optional

logger = logging.getLogger(__name__)

def connect_with_retry(
    max_attempts: int = 5,
    retry_delay: float = 2.0,
    backoff_factor: float = 2.0,
    **conn_params
) -> Optional[psycopg.Connection]:
    """
    Establish database connection with exponential backoff retry.

    Args:
        max_attempts: Maximum number of connection attempts
        retry_delay: Initial delay between retries (seconds)
        backoff_factor: Multiplier for exponential backoff
        **conn_params: Connection parameters for psycopg.connect()

    Returns:
        Connection object or None if all attempts fail
    """
    attempt = 0
    current_delay = retry_delay

    while attempt < max_attempts:
        try:
            conn = psycopg.connect(**conn_params)
            logger.info(f"Database connection established on attempt {attempt + 1}")
            return conn
        except psycopg.OperationalError as e:
            attempt += 1
            if attempt >= max_attempts:
                logger.error(f"Failed to connect after {max_attempts} attempts: {e}")
                raise

            logger.warning(
                f"Connection attempt {attempt} failed: {e}. "
                f"Retrying in {current_delay:.1f}s..."
            )
            time.sleep(current_delay)
            current_delay *= backoff_factor

    return None

# Usage in customer support tools
try:
    conn = connect_with_retry(
        host="support-db.company.com",
        dbname="support_analytics",
        user="support_user",
        password=os.getenv("DB_PASSWORD"),
        max_attempts=5
    )
    with conn:
        # Perform operations
        pass
except psycopg.OperationalError:
    logger.critical("Database unavailable - alerting on-call team")
    # Send alert to monitoring system
```

## Connection Pooling

### Synchronous Connection Pool

Essential for high-performance support tools handling multiple requests:

```python
from psycopg_pool import ConnectionPool
import logging

logging.basicConfig(level=logging.INFO)
logging.getLogger("psycopg.pool").setLevel(logging.INFO)

# Create a global connection pool
pool = ConnectionPool(
    conninfo="postgresql://user:password@localhost/support_db",
    min_size=2,          # Minimum connections to maintain
    max_size=10,         # Maximum connections allowed
    max_idle=300,        # Close idle connections after 5 minutes
    max_lifetime=3600,   # Recycle connections after 1 hour
    timeout=30,          # Wait up to 30s for a connection
    check=ConnectionPool.check_connection,  # Validate connections
    open=True            # Open pool immediately
)

# Wait for minimum connections to be established
pool.wait()

# Use in application code
def get_customer_tickets(customer_id: int):
    """Fetch all tickets for a customer."""
    with pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                "SELECT ticket_id, subject, status, created_at "
                "FROM tickets WHERE customer_id = %s "
                "ORDER BY created_at DESC",
                (customer_id,)
            )
            return cur.fetchall()

# Monitor pool health
def check_pool_health():
    """Get pool statistics for monitoring."""
    stats = pool.get_stats()
    return {
        'pool_size': stats.get('pool_size', 0),
        'available': stats.get('pool_available', 0),
        'waiting': stats.get('requests_waiting', 0),
        'requests_total': stats.get('requests_num', 0),
        'requests_queued': stats.get('requests_queued', 0),
        'connection_errors': stats.get('connections_errors', 0)
    }

# Cleanup on application shutdown
import atexit
atexit.register(pool.close)
```

### Asynchronous Connection Pool

For async FastAPI/Starlette support applications:

```python
from psycopg_pool import AsyncConnectionPool
from contextlib import asynccontextmanager
from fastapi import FastAPI
import logging

logger = logging.getLogger(__name__)

# Create async pool
async_pool = AsyncConnectionPool(
    conninfo="postgresql://user:password@

Related in database