← Back to Blog

Python Lambda Functions: Patterns for Production

Dat NguyenOctober 202511 min readPython

Battle-tested patterns for writing Python Lambda functions that are fast, observable, and maintainable in production environments.

PythonAWSLambdaServerlessBest Practices

Why Python on Lambda?

Python is the second most popular Lambda runtime (after Node.js), and for good reason: fast cold starts, rich ecosystem for data processing, and concise syntax. But writing production-grade Lambda functions in Python requires more discipline than a quick script.

Here are the patterns I use on every project.

Project Structure

src/
├── handlers/
│   ├── __init__.py
│   ├── process_order.py
│   └── send_notification.py
├── services/
│   ├── __init__.py
│   ├── order_service.py
│   └── notification_service.py
├── models/
│   └── order.py
├── utils/
│   ├── __init__.py
│   └── logger.py
└── requirements.txt

Each handler is a thin entry point that delegates to services. This keeps business logic testable without mocking Lambda internals.

Pattern 1: Structured Logging with AWS Lambda Powertools

from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger(service="order-service")
tracer = Tracer()
metrics = Metrics(namespace="OrderService")

@logger.inject_lambda_context(log_event=True)
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def handler(event: dict, context: LambdaContext) -> dict:
    order_id = event["detail"]["orderId"]
    logger.info("Processing order", extra={"order_id": order_id})

    try:
        result = process_order(order_id)
        metrics.add_metric(name="OrderProcessed", unit=MetricUnit.Count, value=1)
        return {"statusCode": 200, "body": result}
    except Exception as e:
        logger.exception("Failed to process order")
        metrics.add_metric(name="OrderFailed", unit=MetricUnit.Count, value=1)
        raise

AWS Lambda Powertools gives you structured JSON logs, X-Ray tracing, and custom CloudWatch metrics with decorators. No manual setup.

Pattern 2: Input Validation with Pydantic

Don't trust event payloads. Validate them:

from pydantic import BaseModel, validator
from typing import Optional
from datetime import datetime

class OrderEvent(BaseModel):
    order_id: str
    customer_id: str
    amount: float
    currency: str = "USD"
    created_at: Optional[datetime] = None

    @validator("amount")
    def amount_must_be_positive(cls, v):
        if v <= 0:
            raise ValueError("Amount must be positive")
        return v

def handler(event: dict, context) -> dict:
    try:
        order = OrderEvent(**event["detail"])
    except Exception as e:
        logger.error("Invalid event payload", extra={"error": str(e)})
        return {"statusCode": 400, "body": str(e)}

    return process_order(order)

Pattern 3: Connection Reuse

Initialize clients outside the handler to reuse connections across invocations:

import boto3

# Initialized once per container (warm start reuse)
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["TABLE_NAME"])
sqs = boto3.client("sqs")

def handler(event, context):
    # Uses existing connections
    table.put_item(Item={"PK": "order#123", "data": "..."})
    sqs.send_message(
        QueueUrl=os.environ["QUEUE_URL"],
        MessageBody=json.dumps({"action": "notify"}),
    )

This avoids creating new boto3 sessions on every invocation. For PostgreSQL, use a module-level connection with reconnect logic:

import psycopg2

_conn = None

def get_connection():
    global _conn
    if _conn is None or _conn.closed:
        _conn = psycopg2.connect(
            host=os.environ["DB_HOST"],
            dbname=os.environ["DB_NAME"],
            user=os.environ["DB_USER"],
            password=os.environ["DB_PASSWORD"],
        )
    return _conn

Pattern 4: Graceful Error Handling with DLQ

from aws_lambda_powertools.utilities.batch import (
    BatchProcessor,
    EventType,
    process_partial_response,
)

processor = BatchProcessor(event_type=EventType.SQS)

@tracer.capture_method
def record_handler(record):
    payload = json.loads(record.body)
    order = OrderEvent(**payload)
    process_order(order)

def handler(event, context):
    return process_partial_response(
        event=event,
        record_handler=record_handler,
        processor=processor,
        context=context,
    )

process_partial_response from Powertools handles partial batch failures automatically. Failed records go back to the queue; successful ones are deleted.

Pattern 5: Environment Configuration

from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    table_name: str
    queue_url: str
    log_level: str = "INFO"
    environment: str = "development"

    class Config:
        env_prefix = ""

@lru_cache()
def get_settings() -> Settings:
    return Settings()

Using lru_cache ensures settings are loaded once per container. Pydantic validates that all required environment variables are set at startup, not when a request hits.

Testing

import pytest
from unittest.mock import patch, MagicMock

def test_process_order_success():
    with patch("services.order_service.table") as mock_table:
        mock_table.get_item.return_value = {
            "Item": {"PK": "order#123", "status": "pending"}
        }

        result = process_order("order#123")

        assert result["status"] == "completed"
        mock_table.update_item.assert_called_once()

def test_handler_invalid_event():
    event = {"detail": {"order_id": "123"}}  # Missing required fields
    result = handler(event, None)
    assert result["statusCode"] == 400

Deployment Tips

  • Use Lambda layers for shared dependencies (aws-lambda-powertools, psycopg2-binary)
  • Set POWERTOOLS_SERVICE_NAME and LOG_LEVEL as environment variables
  • Pin dependency versions in requirements.txt
  • Set reserved concurrency to protect downstream services from Lambda auto-scaling

Conclusion

Production Python Lambdas are not Jupyter notebooks. Structure your code, validate inputs, reuse connections, handle failures gracefully, and make everything observable. These patterns have saved me countless hours of debugging in production.