Python Lambda Functions: Patterns for Production
Battle-tested patterns for writing Python Lambda functions that are fast, observable, and maintainable in production environments.
Why Python on Lambda?
Python is the second most popular Lambda runtime (after Node.js), and for good reason: fast cold starts, rich ecosystem for data processing, and concise syntax. But writing production-grade Lambda functions in Python requires more discipline than a quick script.
Here are the patterns I use on every project.
Project Structure
src/
├── handlers/
│ ├── __init__.py
│ ├── process_order.py
│ └── send_notification.py
├── services/
│ ├── __init__.py
│ ├── order_service.py
│ └── notification_service.py
├── models/
│ └── order.py
├── utils/
│ ├── __init__.py
│ └── logger.py
└── requirements.txt
Each handler is a thin entry point that delegates to services. This keeps business logic testable without mocking Lambda internals.
Pattern 1: Structured Logging with AWS Lambda Powertools
from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger(service="order-service")
tracer = Tracer()
metrics = Metrics(namespace="OrderService")
@logger.inject_lambda_context(log_event=True)
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def handler(event: dict, context: LambdaContext) -> dict:
order_id = event["detail"]["orderId"]
logger.info("Processing order", extra={"order_id": order_id})
try:
result = process_order(order_id)
metrics.add_metric(name="OrderProcessed", unit=MetricUnit.Count, value=1)
return {"statusCode": 200, "body": result}
except Exception as e:
logger.exception("Failed to process order")
metrics.add_metric(name="OrderFailed", unit=MetricUnit.Count, value=1)
raise
AWS Lambda Powertools gives you structured JSON logs, X-Ray tracing, and custom CloudWatch metrics with decorators. No manual setup.
Pattern 2: Input Validation with Pydantic
Don't trust event payloads. Validate them:
from pydantic import BaseModel, validator
from typing import Optional
from datetime import datetime
class OrderEvent(BaseModel):
order_id: str
customer_id: str
amount: float
currency: str = "USD"
created_at: Optional[datetime] = None
@validator("amount")
def amount_must_be_positive(cls, v):
if v <= 0:
raise ValueError("Amount must be positive")
return v
def handler(event: dict, context) -> dict:
try:
order = OrderEvent(**event["detail"])
except Exception as e:
logger.error("Invalid event payload", extra={"error": str(e)})
return {"statusCode": 400, "body": str(e)}
return process_order(order)
Pattern 3: Connection Reuse
Initialize clients outside the handler to reuse connections across invocations:
import boto3
# Initialized once per container (warm start reuse)
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["TABLE_NAME"])
sqs = boto3.client("sqs")
def handler(event, context):
# Uses existing connections
table.put_item(Item={"PK": "order#123", "data": "..."})
sqs.send_message(
QueueUrl=os.environ["QUEUE_URL"],
MessageBody=json.dumps({"action": "notify"}),
)
This avoids creating new boto3 sessions on every invocation. For PostgreSQL, use a module-level connection with reconnect logic:
import psycopg2
_conn = None
def get_connection():
global _conn
if _conn is None or _conn.closed:
_conn = psycopg2.connect(
host=os.environ["DB_HOST"],
dbname=os.environ["DB_NAME"],
user=os.environ["DB_USER"],
password=os.environ["DB_PASSWORD"],
)
return _conn
Pattern 4: Graceful Error Handling with DLQ
from aws_lambda_powertools.utilities.batch import (
BatchProcessor,
EventType,
process_partial_response,
)
processor = BatchProcessor(event_type=EventType.SQS)
@tracer.capture_method
def record_handler(record):
payload = json.loads(record.body)
order = OrderEvent(**payload)
process_order(order)
def handler(event, context):
return process_partial_response(
event=event,
record_handler=record_handler,
processor=processor,
context=context,
)
process_partial_response from Powertools handles partial batch failures automatically. Failed records go back to the queue; successful ones are deleted.
Pattern 5: Environment Configuration
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
table_name: str
queue_url: str
log_level: str = "INFO"
environment: str = "development"
class Config:
env_prefix = ""
@lru_cache()
def get_settings() -> Settings:
return Settings()
Using lru_cache ensures settings are loaded once per container. Pydantic validates that all required environment variables are set at startup, not when a request hits.
Testing
import pytest
from unittest.mock import patch, MagicMock
def test_process_order_success():
with patch("services.order_service.table") as mock_table:
mock_table.get_item.return_value = {
"Item": {"PK": "order#123", "status": "pending"}
}
result = process_order("order#123")
assert result["status"] == "completed"
mock_table.update_item.assert_called_once()
def test_handler_invalid_event():
event = {"detail": {"order_id": "123"}} # Missing required fields
result = handler(event, None)
assert result["statusCode"] == 400
Deployment Tips
- Use Lambda layers for shared dependencies (
aws-lambda-powertools,psycopg2-binary) - Set
POWERTOOLS_SERVICE_NAMEandLOG_LEVELas environment variables - Pin dependency versions in
requirements.txt - Set reserved concurrency to protect downstream services from Lambda auto-scaling
Conclusion
Production Python Lambdas are not Jupyter notebooks. Structure your code, validate inputs, reuse connections, handle failures gracefully, and make everything observable. These patterns have saved me countless hours of debugging in production.