Serverless Architecture Patterns: Building Scalable Applications with AWS Lambda
Master serverless architecture patterns and build highly scalable, cost-effective applications using AWS Lambda and associated services.
Serverless Architecture Patterns: Building Scalable Applications with AWS Lambda
When I started working with serverless architectures about six years ago, most clients approached Lambda functions as simple, isolated pieces of code that handled basic tasks. Today, I'm helping organizations build entire application ecosystems using sophisticated serverless patterns that rival traditional architectures in complexity while delivering superior scalability and cost efficiency.
The shift hasn't been subtle. In recent client engagements, I've seen serverless applications handling millions of requests daily, processing terabytes of streaming data, and powering complex business workflows that previously required dedicated server farms. What changed wasn't just the technology - it was our understanding of how to architect serverless systems for production scale.
This guide walks through the serverless patterns I've implemented across different client environments, from fintech startups processing payment transactions to enterprise retailers managing inventory systems. You'll see the actual code, infrastructure templates, and architectural decisions that work in production, along with the mistakes I've watched teams make and how to avoid them.
Understanding Serverless Architecture in Practice
Most developers think of serverless as "functions that run without servers," which misses the bigger picture. Serverless architecture is about building applications using managed services that automatically scale, charge per use, and remove infrastructure management overhead.
In my consulting work, I've found that successful serverless implementations share three characteristics:
Event-driven design - Every component responds to events rather than maintaining persistent connections. A user uploads an image, triggering a chain of functions that resize, optimize, and store the image while updating multiple databases and sending notifications.
Managed service integration - Instead of running databases, message queues, or caching layers on EC2 instances, serverless applications use DynamoDB, SQS, and ElastiCache. The operational overhead disappears, and scaling becomes automatic.
Stateless execution - Functions don't maintain state between invocations. This seems limiting until you realize it enables instant scaling and eliminates an entire class of distributed system problems.
The clients who struggle with serverless are usually trying to port existing architectures directly to Lambda. The ones who succeed redesign their applications around these principles from the ground up.
Core AWS Services for Serverless Applications
Before diving into specific patterns, let's establish the AWS service ecosystem that powers modern serverless applications.
AWS Lambda forms the compute foundation, but it's the integration with other services that creates powerful architectures. Lambda functions can be triggered by over 20 different event sources, from HTTP requests through API Gateway to stream records from Kinesis.
API Gateway handles HTTP requests, authentication, rate limiting, and request transformation. It's not just a proxy - it's a full-featured API management service that can validate requests, transform responses, and implement complex routing logic.
DynamoDB provides millisecond-latency NoSQL storage with automatic scaling. The key insight I share with clients is that DynamoDB isn't just a database replacement - it's a different way of thinking about data storage that eliminates most scaling and performance concerns.
Event services like SNS, SQS, and EventBridge create the messaging backbone. These services enable loose coupling between components and provide reliability guarantees that would be complex to implement in traditional architectures.
Storage and streaming services including S3 and Kinesis handle everything from static assets to real-time data processing. The integration between these services and Lambda creates processing pipelines that scale automatically based on data volume.
The magic happens when these services work together. A single user action might trigger an API Gateway request, execute a Lambda function, update DynamoDB records, publish SNS messages, and start Kinesis data processing - all with automatic scaling and pay-per-use pricing.
Event-Driven Processing Pattern
The event-driven processing pattern has become the backbone of most serverless applications I design. Rather than traditional request-response architectures, event-driven systems use asynchronous messaging to decouple components and improve resilience.
Understanding the Pattern
Event-driven architecture works by having services publish events when something interesting happens, and other services subscribe to those events to trigger their own processing. This creates a chain of autonomous services that can operate independently and scale based on their specific workload.
In a recent e-commerce client project, we replaced a monolithic order processing system with an event-driven serverless architecture. When a customer places an order, the system generates multiple events: order created, payment processed, inventory updated, shipping initiated, and customer notified. Each event triggers different Lambda functions that handle specific business logic.
The benefits become apparent under load. During Black Friday traffic, the payment processing functions scaled to handle thousands of concurrent transactions while the slower shipping functions processed events from a queue at their own pace. No single bottleneck could bring down the entire system.
Implementation Strategy
The implementation starts with identifying the events your system needs to handle and the actions each event should trigger. I typically map these out with clients before writing any code, because the event design determines the entire architecture.
For our user registration example, we'll implement a complete workflow: user registration triggers account creation, welcome email, profile setup, and analytics tracking. Each step happens asynchronously through event publishing.
Here's the API Gateway Lambda function that starts the registration process:
import json
import boto3
import uuid
from datetime import datetime
import logging
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Initialize AWS clients
dynamodb = boto3.resource('dynamodb')
sns = boto3.client('sns')
def lambda_handler(event, context):
"""
Handle user registration requests from API Gateway
"""
try:
# Parse request body
request_body = json.loads(event['body'])
# Validate required fields
required_fields = ['email', 'password', 'first_name', 'last_name']
for field in required_fields:
if field not in request_body:
return {
'statusCode': 400,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'error': f'Missing required field: {field}'
})
}
# Generate user ID and timestamps
user_id = str(uuid.uuid4())
timestamp = datetime.utcnow().isoformat()
# Store user in DynamoDB
users_table = dynamodb.Table('Users')
user_data = {
'user_id': user_id,
'email': request_body['email'],
'first_name': request_body['first_name'],
'last_name': request_body['last_name'],
'status': 'pending',
'created_at': timestamp,
'updated_at': timestamp
}
# Store password hash (in production, use proper password hashing)
import hashlib
password_hash = hashlib.sha256(request_body['password'].encode()).hexdigest()
user_data['password_hash'] = password_hash
users_table.put_item(Item=user_data)
# Publish user registration event
event_data = {
'event_type': 'user_registered',
'user_id': user_id,
'email': request_body['email'],
'first_name': request_body['first_name'],
'last_name': request_body['last_name'],
'timestamp': timestamp
}
sns_topic_arn = 'arn:aws:sns:us-east-1:123456789012:user-events'
sns.publish(
TopicArn=sns_topic_arn,
Subject='User Registered',
Message=json.dumps(event_data),
MessageAttributes={
'event_type': {
'DataType': 'String',
'StringValue': 'user_registered'
}
}
)
logger.info(f"User registered successfully: {user_id}")
return {
'statusCode': 201,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'message': 'User registered successfully',
'user_id': user_id
})
}
except Exception as e:
logger.error(f"Error registering user: {str(e)}")
return {
'statusCode': 500,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'error': 'Internal server error'
})
}
pythonThis registration function demonstrates several key patterns I use across client projects. First, it handles the immediate business requirement (storing user data) synchronously, then publishes events for downstream processing. This ensures the user gets a quick response while background tasks happen asynchronously.
The SNS event includes message attributes that enable message filtering. Different Lambda functions can subscribe to the same topic but only process events they care about.
Event Processing Functions
Once the registration event is published, multiple functions can process it independently. Here's the welcome email function that subscribes to user registration events:
import json
import boto3
import logging
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
logger = logging.getLogger()
logger.setLevel(logging.INFO)
ses = boto3.client('ses')
def lambda_handler(event, context):
"""
Process user registration events and send welcome emails
"""
try:
# Parse SNS message
for record in event['Records']:
sns_message = json.loads(record['Sns']['Message'])
# Only process user registration events
if sns_message['event_type'] != 'user_registered':
continue
# Extract user data
user_id = sns_message['user_id']
email = sns_message['email']
first_name = sns_message['first_name']
# Create welcome email
subject = f"Welcome to our platform, {first_name}!"
html_body = f"""
<html>
<body>
<h2>Welcome {first_name}!</h2>
<p>Thank you for joining our platform. We're excited to have you aboard.</p>
<p>Your account has been created and you can now:</p>
<ul>
<li>Complete your profile setup</li>
<li>Explore our features</li>
<li>Connect with other users</li>
</ul>
<p>If you have any questions, don't hesitate to contact our support team.</p>
<p>Best regards,<br>The Team</p>
</body>
</html>
"""
text_body = f"""
Welcome {first_name}!
Thank you for joining our platform. We're excited to have you aboard.
Your account has been created and you can now:
- Complete your profile setup
- Explore our features
- Connect with other users
If you have any questions, don't hesitate to contact our support team.
Best regards,
The Team
"""
# Send email using SES
response = ses.send_email(
Source='[email protected]',
Destination={
'ToAddresses': [email]
},
Message={
'Subject': {
'Data': subject,
'Charset': 'UTF-8'
},
'Body': {
'Text': {
'Data': text_body,
'Charset': 'UTF-8'
},
'Html': {
'Data': html_body,
'Charset': 'UTF-8'
}
}
}
)
logger.info(f"Welcome email sent to {email} for user {user_id}")
# Track email sent event
dynamodb = boto3.resource('dynamodb')
email_log_table = dynamodb.Table('EmailLog')
email_log_table.put_item(Item={
'email': email,
'user_id': user_id,
'email_type': 'welcome',
'status': 'sent',
'sent_at': sns_message['timestamp'],
'ses_message_id': response['MessageId']
})
except Exception as e:
logger.error(f"Error sending welcome email: {str(e)}")
# In production, you might want to send to a dead letter queue
raise
pythonThis email function demonstrates how event-driven functions should be designed. It's focused on a single responsibility, includes proper error handling, and tracks its actions for auditing. If the email service is temporarily unavailable, SNS can retry the function automatically.
Image Processing Pipeline
Event-driven patterns really shine in media processing workflows. Here's an image processing function that handles user profile photo uploads:
import json
import boto3
import logging
from PIL import Image
import io
import uuid
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3 = boto3.client('s3')
sns = boto3.client('sns')
def lambda_handler(event, context):
"""
Process uploaded images: resize, optimize, and create thumbnails
"""
try:
# Parse S3 event
for record in event['Records']:
# Extract bucket and object key from S3 event
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
# Skip non-image files
if not key.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')):
logger.info(f"Skipping non-image file: {key}")
continue
# Download image from S3
response = s3.get_object(Bucket=bucket, Key=key)
image_data = response['Body'].read()
# Process image with PIL
original_image = Image.open(io.BytesIO(image_data))
# Create multiple sizes
sizes = {
'thumbnail': (150, 150),
'medium': (400, 400),
'large': (800, 800)
}
processed_images = {}
for size_name, dimensions in sizes.items():
# Resize image maintaining aspect ratio
resized_image = original_image.copy()
resized_image.thumbnail(dimensions, Image.Resampling.LANCZOS)
# Convert to RGB if necessary (for JPEG)
if resized_image.mode != 'RGB':
resized_image = resized_image.convert('RGB')
# Save to bytes buffer
buffer = io.BytesIO()
resized_image.save(buffer, format='JPEG', quality=85, optimize=True)
buffer.seek(0)
# Generate new key for processed image
base_name = key.rsplit('.', 1)[0]
processed_key = f"{base_name}_{size_name}.jpg"
# Upload processed image to S3
s3.put_object(
Bucket=bucket,
Key=processed_key,
Body=buffer.getvalue(),
ContentType='image/jpeg',
CacheControl='max-age=31536000' # 1 year cache
)
processed_images[size_name] = {
'key': processed_key,
'size': len(buffer.getvalue()),
'dimensions': resized_image.size
}
logger.info(f"Created {size_name} version: {processed_key}")
# Extract user ID from key (assuming format: uploads/user_id/filename)
key_parts = key.split('/')
if len(key_parts) >= 2 and key_parts[0] == 'uploads':
user_id = key_parts[1]
# Update user record with image URLs
dynamodb = boto3.resource('dynamodb')
users_table = dynamodb.Table('Users')
users_table.update_item(
Key={'user_id': user_id},
UpdateExpression='SET profile_images = :images, updated_at = :timestamp',
ExpressionAttributeValues={
':images': processed_images,
':timestamp': boto3.dynamodb.conditions.datetime.utcnow().isoformat()
}
)
# Publish image processing complete event
event_data = {
'event_type': 'image_processed',
'user_id': user_id,
'original_key': key,
'processed_images': processed_images,
'timestamp': boto3.dynamodb.conditions.datetime.utcnow().isoformat()
}
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:user-events',
Subject='Image Processing Complete',
Message=json.dumps(event_data),
MessageAttributes={
'event_type': {
'DataType': 'String',
'StringValue': 'image_processed'
}
}
)
return {'statusCode': 200}
except Exception as e:
logger.error(f"Error processing image: {str(e)}")
raise
pythonThis image processing function showcases how serverless functions can handle computationally intensive tasks. The function automatically scales based on the number of images being uploaded, and you only pay for the processing time used.
API Gateway with Lambda Backend Pattern
The API Gateway with Lambda backend pattern has become the foundation for most web applications I help clients build. This pattern provides a fully managed REST API with automatic scaling, built-in authentication, and comprehensive request/response handling.
Architectural Overview
API Gateway serves as the front door to your serverless application. It handles HTTP routing, request validation, authentication, rate limiting, and response transformation. Behind the gateway, Lambda functions implement your business logic, typically connecting to DynamoDB for data storage and other AWS services for additional functionality.
The key advantage of this pattern is that it eliminates the need to manage web servers, load balancers, and reverse proxies. API Gateway handles millions of concurrent requests while automatically scaling your Lambda functions to match demand.
In a recent client project for a content management system, we replaced a traditional LAMP stack with API Gateway and Lambda functions. The new architecture handled traffic spikes during viral content without any infrastructure changes, and the monthly hosting costs dropped by 70%.
Complete REST API Implementation
Here's a comprehensive Lambda function that handles CRUD operations for a blog API:
import json
import boto3
import uuid
from datetime import datetime
import logging
from boto3.dynamodb.conditions import Key, Attr
logger = logging.getLogger()
logger.setLevel(logging.INFO)
dynamodb = boto3.resource('dynamodb')
posts_table = dynamodb.Table('BlogPosts')
def lambda_handler(event, context):
"""
Handle REST API requests for blog posts
Supports GET, POST, PUT, DELETE operations
"""
try:
http_method = event['httpMethod']
path = event['path']
path_parameters = event.get('pathParameters') or {}
query_parameters = event.get('queryStringParameters') or {}
body = event.get('body')
# Parse request body if present
request_data = {}
if body:
try:
request_data = json.loads(body)
except json.JSONDecodeError:
return create_response(400, {'error': 'Invalid JSON in request body'})
# Route to appropriate handler based on HTTP method and path
if http_method == 'GET' and path == '/posts':
return get_posts(query_parameters)
elif http_method == 'GET' and path == '/posts/{post_id}':
return get_post(path_parameters.get('post_id'))
elif http_method == 'POST' and path == '/posts':
return create_post(request_data)
elif http_method == 'PUT' and path == '/posts/{post_id}':
return update_post(path_parameters.get('post_id'), request_data)
elif http_method == 'DELETE' and path == '/posts/{post_id}':
return delete_post(path_parameters.get('post_id'))
else:
return create_response(404, {'error': 'Not found'})
except Exception as e:
logger.error(f"Unhandled error: {str(e)}")
return create_response(500, {'error': 'Internal server error'})
def get_posts(query_params):
"""
Get list of blog posts with pagination and filtering
"""
try:
# Default pagination parameters
limit = int(query_params.get('limit', 10))
limit = min(limit, 100) # Maximum 100 items per request
# Scan parameters
scan_kwargs = {
'Limit': limit,
'FilterExpression': Attr('status').eq('published')
}
# Handle pagination with exclusive start key
if 'next_token' in query_params:
try:
import base64
start_key = json.loads(base64.b64decode(query_params['next_token']))
scan_kwargs['ExclusiveStartKey'] = start_key
except Exception:
return create_response(400, {'error': 'Invalid pagination token'})
# Add category filter if specified
if 'category' in query_params:
category_filter = Attr('category').eq(query_params['category'])
if 'FilterExpression' in scan_kwargs:
scan_kwargs['FilterExpression'] = scan_kwargs['FilterExpression'] & category_filter
else:
scan_kwargs['FilterExpression'] = category_filter
# Execute scan
response = posts_table.scan(**scan_kwargs)
# Prepare response data
posts = response.get('Items', [])
# Sort posts by creation date (newest first)
posts.sort(key=lambda x: x.get('created_at', ''), reverse=True)
# Prepare pagination info
result = {
'posts': posts,
'count': len(posts)
}
# Include next token if there are more items
if 'LastEvaluatedKey' in response:
next_token = base64.b64encode(
json.dumps(response['LastEvaluatedKey']).encode()
).decode()
result['next_token'] = next_token
return create_response(200, result)
except Exception as e:
logger.error(f"Error getting posts: {str(e)}")
return create_response(500, {'error': 'Failed to retrieve posts'})
def get_post(post_id):
"""
Get a single blog post by ID
"""
if not post_id:
return create_response(400, {'error': 'Post ID is required'})
try:
response = posts_table.get_item(Key={'post_id': post_id})
if 'Item' not in response:
return create_response(404, {'error': 'Post not found'})
post = response['Item']
# Only return published posts or if user is authenticated as author
# (In production, you'd check authentication here)
if post.get('status') != 'published':
return create_response(404, {'error': 'Post not found'})
# Increment view count
posts_table.update_item(
Key={'post_id': post_id},
UpdateExpression='ADD view_count :inc',
ExpressionAttributeValues={':inc': 1}
)
return create_response(200, post)
except Exception as e:
logger.error(f"Error getting post {post_id}: {str(e)}")
return create_response(500, {'error': 'Failed to retrieve post'})
def create_post(data):
"""
Create a new blog post
"""
# Validate required fields
required_fields = ['title', 'content', 'author_id']
for field in required_fields:
if field not in data or not data[field].strip():
return create_response(400, {'error': f'Missing required field: {field}'})
try:
# Generate post ID and timestamps
post_id = str(uuid.uuid4())
timestamp = datetime.utcnow().isoformat()
# Create post object
post = {
'post_id': post_id,
'title': data['title'].strip(),
'content': data['content'],
'author_id': data['author_id'],
'category': data.get('category', 'general'),
'tags': data.get('tags', []),
'status': data.get('status', 'draft'),
'created_at': timestamp,
'updated_at': timestamp,
'view_count': 0,
'comment_count': 0
}
# Add optional fields
if 'excerpt' in data:
post['excerpt'] = data['excerpt']
if 'featured_image' in data:
post['featured_image'] = data['featured_image']
# Save to DynamoDB
posts_table.put_item(Item=post)
logger.info(f"Created post: {post_id}")
return create_response(201, post)
except Exception as e:
logger.error(f"Error creating post: {str(e)}")
return create_response(500, {'error': 'Failed to create post'})
def update_post(post_id, data):
"""
Update an existing blog post
"""
if not post_id:
return create_response(400, {'error': 'Post ID is required'})
try:
# Check if post exists
response = posts_table.get_item(Key={'post_id': post_id})
if 'Item' not in response:
return create_response(404, {'error': 'Post not found'})
# Build update expression
update_expression = 'SET updated_at = :timestamp'
expression_values = {':timestamp': datetime.utcnow().isoformat()}
# Update allowed fields
updateable_fields = ['title', 'content', 'category', 'tags', 'status', 'excerpt', 'featured_image']
for field in updateable_fields:
if field in data:
update_expression += f', {field} = :{field}'
expression_values[f':{field}'] = data[field]
# Update the post
response = posts_table.update_item(
Key={'post_id': post_id},
UpdateExpression=update_expression,
ExpressionAttributeValues=expression_values,
ReturnValues='ALL_NEW'
)
logger.info(f"Updated post: {post_id}")
return create_response(200, response['Attributes'])
except Exception as e:
logger.error(f"Error updating post {post_id}: {str(e)}")
return create_response(500, {'error': 'Failed to update post'})
def delete_post(post_id):
"""
Delete a blog post
"""
if not post_id:
return create_response(400, {'error': 'Post ID is required'})
try:
# Check if post exists
response = posts_table.get_item(Key={'post_id': post_id})
if 'Item' not in response:
return create_response(404, {'error': 'Post not found'})
# Delete the post
posts_table.delete_item(Key={'post_id': post_id})
logger.info(f"Deleted post: {post_id}")
return create_response(200, {'message': 'Post deleted successfully'})
except Exception as e:
logger.error(f"Error deleting post {post_id}: {str(e)}")
return create_response(500, {'error': 'Failed to delete post'})
def create_response(status_code, body):
"""
Create a properly formatted API Gateway response
"""
return {
'statusCode': status_code,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET,POST,PUT,DELETE,OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type,Authorization'
},
'body': json.dumps(body, default=str) # Handle datetime serialization
}
pythonThis implementation demonstrates several important patterns I use across client projects. The function handles multiple HTTP methods in a single Lambda, includes comprehensive error handling, implements pagination for list endpoints, and includes proper CORS headers for web applications.
Authentication and Authorization
Most production APIs need authentication. Here's a custom authorizer Lambda function that validates JWT tokens:
import json
import jwt
import boto3
import logging
from datetime import datetime
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""
Custom authorizer for API Gateway
Validates JWT tokens and returns IAM policy
"""
try:
# Extract token from Authorization header
token = event['authorizationToken']
if not token.startswith('Bearer '):
raise Exception('Invalid token format')
token = token[7:] # Remove 'Bearer ' prefix
# Validate JWT token
# In production, fetch the secret from AWS Secrets Manager
secret = get_jwt_secret()
try:
payload = jwt.decode(token, secret, algorithms=['HS256'])
except jwt.ExpiredSignatureError:
raise Exception('Token expired')
except jwt.InvalidTokenError:
raise Exception('Invalid token')
# Extract user information
user_id = payload.get('user_id')
email = payload.get('email')
roles = payload.get('roles', [])
if not user_id:
raise Exception('Invalid token payload')
# Build IAM policy
policy = generate_policy(user_id, 'Allow', event['methodArn'])
# Add user context
policy['context'] = {
'user_id': user_id,
'email': email,
'roles': ','.join(roles)
}
return policy
except Exception as e:
logger.error(f"Authorization failed: {str(e)}")
# Return deny policy
return generate_policy('user', 'Deny', event['methodArn'])
def get_jwt_secret():
"""
Retrieve JWT secret from AWS Secrets Manager
"""
secrets_client = boto3.client('secretsmanager')
try:
response = secrets_client.get_secret_value(SecretId='jwt-secret')
return response['SecretString']
except Exception as e:
logger.error(f"Failed to retrieve JWT secret: {str(e)}")
raise
def generate_policy(principal_id, effect, resource):
"""
Generate IAM policy for API Gateway
"""
return {
'principalId': principal_id,
'policyDocument': {
'Version': '2012-10-17',
'Statement': [
{
'Action': 'execute-api:Invoke',
'Effect': effect,
'Resource': resource
}
]
}
}
pythonThis authorizer function validates JWT tokens and provides user context to downstream Lambda functions. API Gateway caches the authorization decision for a configurable period, reducing the number of authorizer invocations.
Request Validation and Transformation
API Gateway can validate requests before they reach your Lambda functions, reducing processing overhead and improving security. Here's how to implement comprehensive request validation:
import json
import boto3
import logging
from jsonschema import validate, ValidationError
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Define request schemas
POST_SCHEMA = {
"type": "object",
"properties": {
"title": {
"type": "string",
"minLength": 1,
"maxLength": 200
},
"content": {
"type": "string",
"minLength": 10,
"maxLength": 50000
},
"author_id": {
"type": "string",
"pattern": "^[a-f0-9-]{36}$"
},
"category": {
"type": "string",
"enum": ["technology", "business", "lifestyle", "health", "education"]
},
"tags": {
"type": "array",
"items": {"type": "string"},
"maxItems": 10
},
"status": {
"type": "string",
"enum": ["draft", "published", "archived"]
}
},
"required": ["title", "content", "author_id"],
"additionalProperties": False
}
def lambda_handler(event, context):
"""
Lambda function with built-in request validation
"""
try:
http_method = event['httpMethod']
# Validate request body for POST/PUT requests
if http_method in ['POST', 'PUT'] and event.get('body'):
try:
request_data = json.loads(event['body'])
validate(instance=request_data, schema=POST_SCHEMA)
except json.JSONDecodeError:
return create_error_response(400, 'INVALID_JSON', 'Request body must be valid JSON')
except ValidationError as e:
return create_error_response(400, 'VALIDATION_ERROR', str(e))
# Continue with business logic
# ... rest of the handler code
except Exception as e:
logger.error(f"Unhandled error: {str(e)}")
return create_error_response(500, 'INTERNAL_ERROR', 'An unexpected error occurred')
def create_error_response(status_code, error_code, message):
"""
Create standardized error response
"""
return {
'statusCode': status_code,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'error': {
'code': error_code,
'message': message
}
})
}
pythonThis validation approach ensures data quality before processing and provides clear error messages to API consumers. The schema-based validation is particularly useful for complex nested objects.
Stream Processing with Kinesis Pattern
Stream processing has become essential for modern applications that need to analyze data in real-time. The Kinesis stream processing pattern enables you to build pipelines that can handle millions of events per hour while maintaining low latency and high availability.
Understanding Stream Processing
Stream processing differs from batch processing by processing data as it arrives rather than waiting for complete datasets. This enables real-time analytics, immediate alerting, and responsive user experiences based on live data.
In a recent client project for a logistics company, we built a stream processing pipeline that tracks package locations in real-time. GPS updates from delivery trucks flow through Kinesis streams to Lambda functions that update package statuses, calculate estimated delivery times, and trigger notifications to customers.
The key advantage of serverless stream processing is automatic scaling. As data volume increases, Lambda automatically scales the number of concurrent function executions to match the incoming stream rate. During peak delivery hours, the system might process thousands of location updates per second, while scaling down to handle just a few updates during quiet periods.
Kinesis Stream Architecture
The typical architecture includes Kinesis Data Streams for ingestion, Lambda functions for processing, and various destination services like DynamoDB, S3, or other streams for processed data. DynamoDB streams can also trigger Lambda functions when database records change, creating reactive data processing pipelines.
Here's a Lambda function that processes real-time user activity events from a Kinesis stream:
import json
import boto3
import logging
from datetime import datetime, timedelta
import base64
from decimal import Decimal
logger = logging.getLogger()
logger.setLevel(logging.INFO)
dynamodb = boto3.resource('dynamodb')
s3 = boto3.client('s3')
sns = boto3.client('sns')
# DynamoDB tables
user_activity_table = dynamodb.Table('UserActivity')
user_stats_table = dynamodb.Table('UserStats')
anomaly_alerts_table = dynamodb.Table('AnomalyAlerts')
def lambda_handler(event, context):
"""
Process user activity events from Kinesis stream
Performs real-time analytics and anomaly detection
"""
try:
processed_records = 0
anomalies_detected = 0
for record in event['Records']:
# Decode Kinesis record
payload = base64.b64decode(record['kinesis']['data'])
activity_data = json.loads(payload)
# Process individual activity event
result = process_activity_event(activity_data, record['kinesis'])
if result['processed']:
processed_records += 1
if result['anomaly_detected']:
anomalies_detected += 1
logger.info(f"Processed {processed_records} records, detected {anomalies_detected} anomalies")
return {
'batchItemFailures': [], # Return empty for successful processing
'processed_count': processed_records,
'anomaly_count': anomalies_detected
}
except Exception as e:
logger.error(f"Error processing Kinesis records: {str(e)}")
raise
def process_activity_event(activity_data, kinesis_metadata):
"""
Process individual activity event with analytics and anomaly detection
"""
try:
# Extract event data
user_id = activity_data['user_id']
event_type = activity_data['event_type']
timestamp = activity_data['timestamp']
session_id = activity_data.get('session_id')
properties = activity_data.get('properties', {})
# Store raw activity data
activity_item = {
'user_id': user_id,
'timestamp': timestamp,
'event_type': event_type,
'session_id': session_id,
'properties': properties,
'partition_key': kinesis_metadata['partitionKey'],
'sequence_number': kinesis_metadata['sequenceNumber']
}
user_activity_table.put_item(Item=activity_item)
# Update user statistics
update_user_stats(user_id, event_type, timestamp, properties)
# Check for anomalies
anomaly_detected = check_for_anomalies(user_id, event_type, timestamp, properties)
# Archive data to S3 for long-term storage (batch writes)
archive_to_s3(activity_data, timestamp)
return {
'processed': True,
'anomaly_detected': anomaly_detected
}
except Exception as e:
logger.error(f"Error processing activity event: {str(e)}")
return {
'processed': False,
'anomaly_detected': False
}
def update_user_stats(user_id, event_type, timestamp, properties):
"""
Update real-time user statistics
"""
try:
# Calculate time-based keys for different aggregation windows
event_time = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
hour_key = event_time.strftime('%Y-%m-%d-%H')
day_key = event_time.strftime('%Y-%m-%d')
# Update hourly stats
user_stats_table.update_item(
Key={
'user_id': user_id,
'time_window': f"hour:{hour_key}"
},
UpdateExpression='''
ADD event_count :one,
total_events :one
SET last_activity = :timestamp,
updated_at = :now
''',
ExpressionAttributeValues={
':one': 1,
':timestamp': timestamp,
':now': datetime.utcnow().isoformat()
},
ReturnValues='NONE'
)
# Update daily stats
user_stats_table.update_item(
Key={
'user_id': user_id,
'time_window': f"day:{day_key}"
},
UpdateExpression='''
ADD event_count :one,
total_events :one
SET last_activity = :timestamp,
updated_at = :now
''',
ExpressionAttributeValues={
':one': 1,
':timestamp': timestamp,
':now': datetime.utcnow().isoformat()
},
ReturnValues='NONE'
)
# Update event-type specific counters
if event_type in ['page_view', 'click', 'purchase', 'signup']:
user_stats_table.update_item(
Key={
'user_id': user_id,
'time_window': f"day:{day_key}:events"
},
UpdateExpression=f'ADD {event_type}_count :one SET updated_at = :now',
ExpressionAttributeValues={
':one': 1,
':now': datetime.utcnow().isoformat()
},
ReturnValues='NONE'
)
except Exception as e:
logger.error(f"Error updating user stats for {user_id}: {str(e)}")
def check_for_anomalies(user_id, event_type, timestamp, properties):
"""
Detect anomalous user behavior patterns
"""
try:
anomaly_detected = False
event_time = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
# Check for rapid-fire events (potential bot activity)
if event_type in ['page_view', 'click']:
recent_events = get_recent_event_count(user_id, event_type, minutes=5)
if recent_events > 50: # More than 50 events in 5 minutes
alert_data = {
'user_id': user_id,
'anomaly_type': 'rapid_fire_events',
'event_type': event_type,
'event_count': recent_events,
'time_window': '5_minutes',
'timestamp': timestamp,
'severity': 'high'
}
send_anomaly_alert(alert_data)
anomaly_detected = True
# Check for unusual purchase amounts
if event_type == 'purchase':
purchase_amount = Decimal(str(properties.get('amount', 0)))
if purchase_amount > Decimal('10000'): # Purchases over $10,000
alert_data = {
'user_id': user_id,
'anomaly_type': 'high_value_purchase',
'amount': float(purchase_amount),
'timestamp': timestamp,
'severity': 'medium'
}
send_anomaly_alert(alert_data)
anomaly_detected = True
# Check for geographic anomalies (if location data available)
if 'location' in properties:
location_anomaly = check_location_anomaly(user_id, properties['location'])
if location_anomaly:
alert_data = {
'user_id': user_id,
'anomaly_type': 'location_anomaly',
'current_location': properties['location'],
'timestamp': timestamp,
'severity': 'medium'
}
send_anomaly_alert(alert_data)
anomaly_detected = True
return anomaly_detected
except Exception as e:
logger.error(f"Error checking anomalies for {user_id}: {str(e)}")
return False
def get_recent_event_count(user_id, event_type, minutes=5):
"""
Count events for a user within the specified time window
"""
try:
# Query recent activity
cutoff_time = datetime.utcnow() - timedelta(minutes=minutes)
cutoff_timestamp = cutoff_time.isoformat()
response = user_activity_table.query(
KeyConditionExpression=boto3.dynamodb.conditions.Key('user_id').eq(user_id),
FilterExpression=boto3.dynamodb.conditions.Attr('timestamp').gte(cutoff_timestamp) &
boto3.dynamodb.conditions.Attr('event_type').eq(event_type),
ScanIndexForward=False, # Most recent first
Limit=100 # Reasonable limit for counting
)
return response['Count']
except Exception as e:
logger.error(f"Error counting recent events: {str(e)}")
return 0
def check_location_anomaly(user_id, current_location):
"""
Check if current location is unusual for the user
"""
try:
# Get user's recent locations (last 30 days)
cutoff_time = datetime.utcnow() - timedelta(days=30)
cutoff_timestamp = cutoff_time.isoformat()
response = user_activity_table.query(
KeyConditionExpression=boto3.dynamodb.conditions.Key('user_id').eq(user_id),
FilterExpression=boto3.dynamodb.conditions.Attr('timestamp').gte(cutoff_timestamp) &
boto3.dynamodb.conditions.Attr('properties.location').exists(),
ProjectionExpression='properties.location',
Limit=100
)
# Extract unique locations
recent_locations = set()
for item in response['Items']:
location = item.get('properties', {}).get('location', {})
if 'country' in location:
recent_locations.add(location['country'])
# Check if current location is new
current_country = current_location.get('country')
if current_country and current_country not in recent_locations and len(recent_locations) > 0:
return True
return False
except Exception as e:
logger.error(f"Error checking location anomaly: {str(e)}")
return False
def send_anomaly_alert(alert_data):
"""
Send anomaly alert notification
"""
try:
# Store alert in database
alert_item = {
**alert_data,
'alert_id': str(uuid.uuid4()),
'created_at': datetime.utcnow().isoformat(),
'status': 'new'
}
anomaly_alerts_table.put_item(Item=alert_item)
# Send SNS notification for high severity alerts
if alert_data['severity'] == 'high':
message = {
'alert_type': 'user_anomaly',
'data': alert_data
}
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:security-alerts',
Subject=f"Anomaly Detected: {alert_data['anomaly_type']}",
Message=json.dumps(message),
MessageAttributes={
'alert_type': {
'DataType': 'String',
'StringValue': 'user_anomaly'
},
'severity': {
'DataType': 'String',
'StringValue': alert_data['severity']
}
}
)
logger.info(f"Anomaly alert sent for user {alert_data['user_id']}")
except Exception as e:
logger.error(f"Error sending anomaly alert: {str(e)}")
def archive_to_s3(activity_data, timestamp):
"""
Archive activity data to S3 for long-term storage
This would typically be done in batches for efficiency
"""
try:
# Create S3 key based on timestamp for partitioning
event_time = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
s3_key = f"user-activity/{event_time.strftime('%Y/%m/%d/%H')}/{activity_data['user_id']}-{int(event_time.timestamp())}.json"
# Upload to S3 (in production, you might batch these writes)
s3.put_object(
Bucket='user-activity-archive',
Key=s3_key,
Body=json.dumps(activity_data),
ContentType='application/json'
)
except Exception as e:
logger.error(f"Error archiving to S3: {str(e)}")
pythonThis stream processing function demonstrates real-time analytics, anomaly detection, and data archiving. The function processes each event immediately while maintaining statistics and detecting unusual patterns that might indicate security issues or system problems.
Batch Processing with Kinesis Analytics
For more complex analytics that require windowed aggregations, you can combine Lambda with Kinesis Analytics. Here's a function that processes analytics results:
import json
import boto3
import logging
from datetime import datetime
from decimal import Decimal
logger = logging.getLogger()
logger.setLevel(logging.INFO)
dynamodb = boto3.resource('dynamodb')
cloudwatch = boto3.client('cloudwatch')
# DynamoDB table for storing analytics results
analytics_table = dynamodb.Table('AnalyticsResults')
def lambda_handler(event, context):
"""
Process aggregated analytics results from Kinesis Analytics
"""
try:
for record in event['Records']:
# Parse analytics result
payload = base64.b64decode(record['kinesis']['data'])
analytics_data = json.loads(payload)
# Process different types of analytics results
if analytics_data['result_type'] == 'hourly_user_stats':
process_hourly_stats(analytics_data)
elif analytics_data['result_type'] == 'conversion_funnel':
process_conversion_funnel(analytics_data)
elif analytics_data['result_type'] == 'real_time_dashboard':
update_real_time_dashboard(analytics_data)
return {'statusCode': 200}
except Exception as e:
logger.error(f"Error processing analytics results: {str(e)}")
raise
def process_hourly_stats(data):
"""
Store hourly user statistics
"""
try:
# Store in DynamoDB for querying
analytics_table.put_item(
Item={
'result_id': f"hourly_stats:{data['window_start']}",
'result_type': 'hourly_user_stats',
'window_start': data['window_start'],
'window_end': data['window_end'],
'metrics': {
'total_users': data['total_users'],
'active_sessions': data['active_sessions'],
'page_views': data['page_views'],
'unique_visitors': data['unique_visitors'],
'conversion_rate': Decimal(str(data['conversion_rate']))
},
'created_at': datetime.utcnow().isoformat()
}
)
# Send metrics to CloudWatch
cloudwatch.put_metric_data(
Namespace='UserAnalytics/Hourly',
MetricData=[
{
'MetricName': 'ActiveUsers',
'Value': data['total_users'],
'Unit': 'Count',
'Timestamp': datetime.fromisoformat(data['window_start'].replace('Z', '+00:00'))
},
{
'MetricName': 'PageViews',
'Value': data['page_views'],
'Unit': 'Count',
'Timestamp': datetime.fromisoformat(data['window_start'].replace('Z', '+00:00'))
},
{
'MetricName': 'ConversionRate',
'Value': data['conversion_rate'],
'Unit': 'Percent',
'Timestamp': datetime.fromisoformat(data['window_start'].replace('Z', '+00:00'))
}
]
)
except Exception as e:
logger.error(f"Error processing hourly stats: {str(e)}")
def update_real_time_dashboard(data):
"""
Update real-time dashboard data
"""
try:
# Store current metrics for dashboard
dashboard_item = {
'dashboard_id': 'real_time_metrics',
'updated_at': datetime.utcnow().isoformat(),
'current_metrics': {
'concurrent_users': data['concurrent_users'],
'requests_per_minute': data['requests_per_minute'],
'error_rate': Decimal(str(data['error_rate'])),
'avg_response_time': Decimal(str(data['avg_response_time']))
},
'ttl': int(datetime.utcnow().timestamp()) + 300 # 5 minute TTL
}
analytics_table.put_item(Item=dashboard_item)
# Update CloudWatch dashboard metrics
cloudwatch.put_metric_data(
Namespace='RealTimeDashboard',
MetricData=[
{
'MetricName': 'ConcurrentUsers',
'Value': data['concurrent_users'],
'Unit': 'Count'
},
{
'MetricName': 'RequestsPerMinute',
'Value': data['requests_per_minute'],
'Unit': 'Count/Minute'
},
{
'MetricName': 'ErrorRate',
'Value': data['error_rate'],
'Unit': 'Percent'
}
]
)
except Exception as e:
logger.error(f"Error updating real-time dashboard: {str(e)}")
pythonInfrastructure as Code Implementation
Infrastructure as Code (IaC) has become essential for serverless applications. Managing dozens of Lambda functions, API Gateway endpoints, and supporting resources manually quickly becomes unmaintainable. AWS SAM (Serverless Application Model) provides a simplified way to define serverless infrastructure.
Complete SAM Template
Here's a comprehensive SAM template that defines the entire serverless application infrastructure:
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: 'Serverless Blog Platform - Complete Infrastructure'
Parameters:
Environment:
Type: String
Default: dev
AllowedValues: [dev, staging, prod]
Description: Environment name
ApiDomainName:
Type: String
Default: api.yourblog.com
Description: Custom domain name for API
CertificateArn:
Type: String
Description: SSL certificate ARN for custom domain
Globals:
Function:
Runtime: python3.9
Timeout: 30
MemorySize: 256
Environment:
Variables:
ENVIRONMENT: !Ref Environment
BLOG_POSTS_TABLE: !Ref BlogPostsTable
USER_ACTIVITY_TABLE: !Ref UserActivityTable
USER_STATS_TABLE: !Ref UserStatsTable
ANALYTICS_TABLE: !Ref AnalyticsTable
USER_EVENTS_TOPIC: !Ref UserEventsTopic
SECURITY_ALERTS_TOPIC: !Ref SecurityAlertsTopic
IMAGE_BUCKET: !Ref ImageBucket
ACTIVITY_STREAM: !Ref UserActivityStream
Api:
Cors:
AllowMethods: "'GET,POST,PUT,DELETE,OPTIONS'"
AllowHeaders: "'Content-Type,Authorization'"
AllowOrigin: "'*'"
Auth:
DefaultAuthorizer: CustomAuthorizer
Authorizers:
CustomAuthorizer:
FunctionArn: !GetAtt AuthorizerFunction.Arn
Resources:
# API Gateway
BlogApi:
Type: AWS::Serverless::Api
Properties:
Name: !Sub 'blog-api-${Environment}'
StageName: !Ref Environment
Domain:
DomainName: !Ref ApiDomainName
CertificateArn: !Ref CertificateArn
Route53:
HostedZoneId: Z1D633PJN98FT9 # Replace with your hosted zone
MethodSettings:
- ResourcePath: '/*'
HttpMethod: '*'
ThrottlingBurstLimit: 1000
ThrottlingRateLimit: 500
CachingEnabled: true
CacheTtlInSeconds: 300
AccessLogSetting:
DestinationArn: !GetAtt ApiLogGroup.Arn
Format: '$requestId $requestTime $httpMethod $resourcePath $status $responseLength $responseLatency'
# Lambda Functions
AuthorizerFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub 'blog-authorizer-${Environment}'
CodeUri: src/authorizer/
Handler: app.lambda_handler
Environment:
Variables:
JWT_SECRET_ARN: !Ref JwtSecret
Policies:
- Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- secretsmanager:GetSecretValue
Resource: !Ref JwtSecret
BlogApiFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub 'blog-api-${Environment}'
CodeUri: src/blog_api/
Handler: app.lambda_handler
ReservedConcurrencyLimit: 100
Events:
GetPosts:
Type: Api
Properties:
RestApiId: !Ref BlogApi
Path: /posts
Method: get
Auth:
Authorizer: NONE
GetPost:
Type: Api
Properties:
RestApiId: !Ref BlogApi
Path: /posts/{post_id}
Method: get
Auth:
Authorizer: NONE
CreatePost:
Type: Api
Properties:
RestApiId: !Ref BlogApi
Path: /posts
Method: post
UpdatePost:
Type: Api
Properties:
RestApiId: !Ref BlogApi
Path: /posts/{post_id}
Method: put
DeletePost:
Type: Api
Properties:
RestApiId: !Ref BlogApi
Path: /posts/{post_id}
Method: delete
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref BlogPostsTable
UserRegistrationFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub 'user-registration-${Environment}'
CodeUri: src/user_registration/
Handler: app.lambda_handler
Events:
RegisterUser:
Type: Api
Properties:
RestApiId: !Ref BlogApi
Path: /users/register
Method: post
Auth:
Authorizer: NONE
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref UsersTable
- SNSPublishMessagePolicy:
TopicName: !GetAtt UserEventsTopic.TopicName
WelcomeEmailFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub 'welcome-email-${Environment}'
CodeUri: src/welcome_email/
Handler: app.lambda_handler
Events:
UserRegistered:
Type: SNS
Properties:
Topic: !Ref UserEventsTopic
FilterPolicy:
event_type:
- user_registered
Policies:
- SESCrudPolicy:
IdentityName: !Sub '*.${ApiDomainName}'
- DynamoDBCrudPolicy:
TableName: !Ref EmailLogTable
ImageProcessingFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub 'image-processing-${Environment}'
CodeUri: src/image_processing/
Handler: app.lambda_handler
Timeout: 300
MemorySize: 1024
Layers:
- !Ref PillowLayer
Events:
ImageUploaded:
Type: S3
Properties:
Bucket: !Ref ImageBucket
Events: s3:ObjectCreated:*
Filter:
S3Key:
Rules:
- Name: prefix
Value: uploads/
- Name: suffix
Value: .jpg
Policies:
- S3CrudPolicy:
BucketName: !Ref ImageBucket
- SNSPublishMessagePolicy:
TopicName: !GetAtt UserEventsTopic.TopicName
- DynamoDBCrudPolicy:
TableName: !Ref UsersTable
ActivityProcessingFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub 'activity-processing-${Environment}'
CodeUri: src/activity_processing/
Handler: app.lambda_handler
ReservedConcurrencyLimit: 50
Events:
ActivityStream:
Type: Kinesis
Properties:
Stream: !GetAtt UserActivityStream.Arn
StartingPosition: LATEST
BatchSize: 100
MaximumBatchingWindowInSeconds: 5
ParallelizationFactor: 4
Policies:
- KinesisStreamReadPolicy:
StreamName: !Ref UserActivityStream
- DynamoDBCrudPolicy:
TableName: !Ref UserActivityTable
- DynamoDBCrudPolicy:
TableName: !Ref UserStatsTable
- DynamoDBCrudPolicy:
TableName: !Ref AnomalyAlertsTable
- S3CrudPolicy:
BucketName: !Ref ArchiveBucket
- SNSPublishMessagePolicy:
TopicName: !GetAtt SecurityAlertsTopic.TopicName
# DynamoDB Tables
BlogPostsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub 'BlogPosts-${Environment}'
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: post_id
AttributeType: S
- AttributeName: author_id
AttributeType: S
- AttributeName: status
AttributeType: S
- AttributeName: created_at
AttributeType: S
KeySchema:
- AttributeName: post_id
KeyType: HASH
GlobalSecondaryIndexes:
- IndexName: AuthorIndex
KeySchema:
- AttributeName: author_id
KeyType: HASH
- AttributeName: created_at
KeyType: RANGE
Projection:
ProjectionType: ALL
- IndexName: StatusIndex
KeySchema:
- AttributeName: status
KeyType: HASH
- AttributeName: created_at
KeyType: RANGE
Projection:
ProjectionType: ALL
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
PointInTimeRecoverySpecification:
PointInTimeRecoveryEnabled: true
SSESpecification:
SSEEnabled: true
Tags:
- Key: Environment
Value: !Ref Environment
UsersTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub 'Users-${Environment}'
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: user_id
AttributeType: S
- AttributeName: email
AttributeType: S
KeySchema:
- AttributeName: user_id
KeyType: HASH
GlobalSecondaryIndexes:
- IndexName: EmailIndex
KeySchema:
- AttributeName: email
KeyType: HASH
Projection:
ProjectionType: ALL
PointInTimeRecoverySpecification:
PointInTimeRecoveryEnabled: true
SSESpecification:
SSEEnabled: true
UserActivityTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub 'UserActivity-${Environment}'
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: user_id
AttributeType: S
- AttributeName: timestamp
AttributeType: S
KeySchema:
- AttributeName: user_id
KeyType: HASH
- AttributeName: timestamp
KeyType: RANGE
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
UserStatsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub 'UserStats-${Environment}'
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: user_id
AttributeType: S
- AttributeName: time_window
AttributeType: S
KeySchema:
- AttributeName: user_id
KeyType: HASH
- AttributeName: time_window
KeyType: RANGE
# S3 Buckets
ImageBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub 'blog-images-${Environment}-${AWS::AccountId}'
CorsConfiguration:
CorsRules:
- AllowedHeaders: ['*']
AllowedMethods: [GET, PUT, POST, DELETE, HEAD]
AllowedOrigins: ['*']
MaxAge: 3600
LifecycleConfiguration:
Rules:
- Status: Enabled
Transitions:
- StorageClass: STANDARD_IA
TransitionInDays: 30
- StorageClass: GLACIER
TransitionInDays: 90
VersioningConfiguration:
Status: Enabled
PublicAccessBlockConfiguration:
BlockPublicAcls: false
BlockPublicPolicy: false
IgnorePublicAcls: false
RestrictPublicBuckets: false
ArchiveBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub 'user-activity-archive-${Environment}-${AWS::AccountId}'
LifecycleConfiguration:
Rules:
- Status: Enabled
Transitions:
- StorageClass: STANDARD_IA
TransitionInDays: 30
- StorageClass: GLACIER
TransitionInDays: 90
- StorageClass: DEEP_ARCHIVE
TransitionInDays: 365
# Kinesis Stream
UserActivityStream:
Type: AWS::Kinesis::Stream
Properties:
Name: !Sub 'user-activity-${Environment}'
ShardCount: 2
StreamEncryption:
EncryptionType: KMS
KeyId: alias/aws/kinesis
Tags:
- Key: Environment
Value: !Ref Environment
# SNS Topics
UserEventsTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: !Sub 'user-events-${Environment}'
DisplayName: 'User Events Topic'
KmsMasterKeyId: alias/aws/sns
SecurityAlertsTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: !Sub 'security-alerts-${Environment}'
DisplayName: 'Security Alerts Topic'
KmsMasterKeyId: alias/aws/sns
# Secrets Manager
JwtSecret:
Type: AWS::SecretsManager::Secret
Properties:
Name: !Sub 'jwt-secret-${Environment}'
Description: 'JWT signing secret'
GenerateSecretString:
SecretStringTemplate: '{}'
GenerateStringKey: 'secret'
PasswordLength: 64
ExcludeCharacters: '"@/\'
# Lambda Layer
PillowLayer:
Type: AWS::Serverless::LayerVersion
Properties:
LayerName: !Sub 'pillow-layer-${Environment}'
Description: 'PIL/Pillow library for image processing'
ContentUri: layers/pillow/
CompatibleRuntimes:
- python3.9
RetentionPolicy: Retain
# CloudWatch Log Groups
ApiLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/apigateway/blog-api-${Environment}'
RetentionInDays: 14
# CloudWatch Alarms
HighErrorRateAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: !Sub 'blog-api-high-error-rate-${Environment}'
AlarmDescription: 'High error rate on blog API'
MetricName: 4XXError
Namespace: AWS/ApiGateway
Statistic: Sum
Period: 300
EvaluationPeriods: 2
Threshold: 50
ComparisonOperator: GreaterThanThreshold
Dimensions:
- Name: ApiName
Value: !Sub 'blog-api-${Environment}'
AlarmActions:
- !Ref SecurityAlertsTopic
HighLatencyAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: !Sub 'blog-api-high-latency-${Environment}'
AlarmDescription: 'High latency on blog API'
MetricName: Latency
Namespace: AWS/ApiGateway
Statistic: Average
Period: 300
EvaluationPeriods: 3
Threshold: 5000
ComparisonOperator: GreaterThanThreshold
Dimensions:
- Name: ApiName
Value: !Sub 'blog-api-${Environment}'
Outputs:
ApiUrl:
Description: 'Blog API URL'
Value: !Sub 'https://${BlogApi}.execute-api.${AWS::Region}.amazonaws.com/${Environment}'
Export:
Name: !Sub '${AWS::StackName}-ApiUrl'
CustomDomainUrl:
Description: 'Custom domain URL'
Value: !Sub 'https://${ApiDomainName}'
Export:
Name: !Sub '${AWS::StackName}-CustomDomainUrl'
ImageBucketName:
Description: 'S3 bucket for images'
Value: !Ref ImageBucket
Export:
Name: !Sub '${AWS::StackName}-ImageBucket'
UserActivityStreamName:
Description: 'Kinesis stream for user activity'
Value: !Ref UserActivityStream
Export:
Name: !Sub '${AWS::StackName}-ActivityStream'
yamlThis SAM template defines a complete serverless application infrastructure including API Gateway, Lambda functions, DynamoDB tables, S3 buckets, Kinesis streams, SNS topics, and monitoring resources. The template uses parameters for environment-specific configuration and includes proper security settings.
Deployment Scripts
Here's a deployment script that handles the complete application lifecycle:
#!/bin/bash
# deploy.sh - Complete serverless application deployment script
set -e
# Configuration
ENVIRONMENT=${1:-dev}
REGION=${2:-us-east-1}
STACK_NAME="serverless-blog-${ENVIRONMENT}"
S3_BUCKET="serverless-blog-deployments-${ENVIRONMENT}"
DOMAIN_NAME="api-${ENVIRONMENT}.yourblog.com"
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
echo -e "${GREEN}Starting deployment of ${STACK_NAME} to ${REGION}${NC}"
# Check if AWS CLI is configured
if ! aws sts get-caller-identity > /dev/null 2>&1; then
echo -e "${RED}Error: AWS CLI is not configured${NC}"
exit 1
fi
# Create deployment bucket if it doesn't exist
echo -e "${YELLOW}Checking deployment bucket...${NC}"
if ! aws s3 ls "s3://${S3_BUCKET}" 2>&1 > /dev/null; then
echo "Creating deployment bucket: ${S3_BUCKET}"
aws s3 mb "s3://${S3_BUCKET}" --region ${REGION}
aws s3api put-bucket-versioning \
--bucket ${S3_BUCKET} \
--versioning-configuration Status=Enabled
aws s3api put-bucket-encryption \
--bucket ${S3_BUCKET} \
--server-side-encryption-configuration '{
"Rules": [
{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "AES256"
}
}
]
}'
fi
# Install Python dependencies for Lambda functions
echo -e "${YELLOW}Installing Python dependencies...${NC}"
find src/ -name requirements.txt | while read requirements_file; do
function_dir=$(dirname $requirements_file)
echo "Installing dependencies for ${function_dir}"
pip install -r $requirements_file -t $function_dir
done
# Build Pillow layer
echo -e "${YELLOW}Building Pillow layer...${NC}"
if [ ! -d "layers/pillow/python" ]; then
mkdir -p layers/pillow/python
pip install Pillow -t layers/pillow/python
fi
# Validate SAM template
echo -e "${YELLOW}Validating SAM template...${NC}"
sam validate --template-file template.yaml
# Build SAM application
echo -e "${YELLOW}Building SAM application...${NC}"
sam build --template-file template.yaml --build-dir .aws-sam/build
# Get SSL certificate ARN (assumes certificate exists)
echo -e "${YELLOW}Getting SSL certificate ARN...${NC}"
CERT_ARN=$(aws acm list-certificates \
--region ${REGION} \
--query "CertificatesSummaryList[?DomainName=='*.yourblog.com'].CertificateArn" \
--output text)
if [ -z "$CERT_ARN" ]; then
echo -e "${RED}Error: SSL certificate not found for *.yourblog.com${NC}"
echo "Please create an SSL certificate in ACM first"
exit 1
fi
# Deploy the application
echo -e "${YELLOW}Deploying application...${NC}"
sam deploy \
--template-file .aws-sam/build/template.yaml \
--stack-name ${STACK_NAME} \
--s3-bucket ${S3_BUCKET} \
--s3-prefix ${STACK_NAME} \
--region ${REGION} \
--capabilities CAPABILITY_IAM \
--parameter-overrides \
Environment=${ENVIRONMENT} \
ApiDomainName=${DOMAIN_NAME} \
CertificateArn=${CERT_ARN} \
--tags \
Environment=${ENVIRONMENT} \
Project=ServerlessBlog
# Get stack outputs
echo -e "${YELLOW}Getting stack outputs...${NC}"
API_URL=$(aws cloudformation describe-stacks \
--stack-name ${STACK_NAME} \
--region ${REGION} \
--query 'Stacks[0].Outputs[?OutputKey==`ApiUrl`].OutputValue' \
--output text)
CUSTOM_DOMAIN_URL=$(aws cloudformation describe-stacks \
--stack-name ${STACK_NAME} \
--region ${REGION} \
--query 'Stacks[0].Outputs[?OutputKey==`CustomDomainUrl`].OutputValue' \
--output text)
IMAGE_BUCKET=$(aws cloudformation describe-stacks \
--stack-name ${STACK_NAME} \
--region ${REGION} \
--query 'Stacks[0].Outputs[?OutputKey==`ImageBucketName`].OutputValue' \
--output text)
# Run smoke tests
echo -e "${YELLOW}Running smoke tests...${NC}"
./scripts/smoke-test.sh ${API_URL}
# Display deployment summary
echo -e "${GREEN}Deployment completed successfully!${NC}"
echo -e "Stack Name: ${STACK_NAME}"
echo -e "API URL: ${API_URL}"
echo -e "Custom Domain: ${CUSTOM_DOMAIN_URL}"
echo -e "Image Bucket: ${IMAGE_BUCKET}"
# Save deployment info
cat > deployment-info.json << EOF
{
"stack_name": "${STACK_NAME}",
"environment": "${ENVIRONMENT}",
"region": "${REGION}",
"api_url": "${API_URL}",
"custom_domain_url": "${CUSTOM_DOMAIN_URL}",
"image_bucket": "${IMAGE_BUCKET}",
"deployed_at": "$(date -u +"%Y-%m-%dT%H:%M:%SZ")"
}
EOF
echo -e "${GREEN}Deployment information saved to deployment-info.json${NC}"
bashThis deployment script handles the complete application lifecycle including dependency installation, layer building, validation, deployment, and smoke testing. It also saves deployment information for reference.
Performance Optimization Strategies
Performance optimization in serverless applications requires a different approach than traditional server-based applications. You're optimizing for cold start times, memory usage, and execution duration rather than server capacity and persistent connections.
Cold Start Optimization
Cold starts happen when Lambda creates a new execution environment for your function. This occurs when scaling up, after periods of inactivity, or when deploying new code. In my experience with high-traffic client applications, cold starts can add 100ms to 3 seconds of latency depending on the runtime and function complexity.
Here's an optimized Lambda function that minimizes cold start impact:
import json
import boto3
import logging
from typing import Dict, Any, Optional
import os
from datetime import datetime
# Initialize clients outside handler (reused across invocations)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Connection pooling and reuse
dynamodb = None
s3_client = None
def get_dynamodb():
"""Lazy initialization of DynamoDB resource"""
global dynamodb
if dynamodb is None:
dynamodb = boto3.resource('dynamodb')
return dynamodb
def get_s3_client():
"""Lazy initialization of S3 client"""
global s3_client
if s3_client is None:
s3_client = boto3.client('s3')
return s3_client
# Pre-load environment variables
POSTS_TABLE_NAME = os.environ.get('BLOG_POSTS_TABLE', 'BlogPosts-dev')
CACHE_TTL = int(os.environ.get('CACHE_TTL', '300'))
# In-memory cache for frequently accessed data
function_cache = {}
def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""
Optimized Lambda handler with connection reuse and caching
"""
try:
# Extract request info
http_method = event['httpMethod']
path = event['path']
# Route to specific handlers
if http_method == 'GET' and path == '/posts':
return handle_get_posts(event)
elif http_method == 'GET' and '/posts/' in path:
post_id = event['pathParameters']['post_id']
return handle_get_post(post_id)
else:
return create_response(404, {'error': 'Not found'})
except Exception as e:
logger.error(f"Handler error: {str(e)}")
return create_response(500, {'error': 'Internal server error'})
def handle_get_posts(event: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle GET /posts with caching and optimized queries
"""
try:
query_params = event.get('queryStringParameters') or {}
cache_key = f"posts_{hash(str(sorted(query_params.items())))}"
# Check in-memory cache first
cached_result = get_from_cache(cache_key)
if cached_result:
logger.info("Serving from cache")
return create_response(200, cached_result)
# Query DynamoDB with optimized parameters
dynamodb = get_dynamodb()
table = dynamodb.Table(POSTS_TABLE_NAME)
# Use consistent read only when necessary
response = table.scan(
FilterExpression=boto3.dynamodb.conditions.Attr('status').eq('published'),
ProjectionExpression='post_id, title, excerpt, author_id, created_at, category',
Limit=int(query_params.get('limit', 20)),
ConsistentRead=False # Use eventually consistent reads for better performance
)
posts = response.get('Items', [])
# Sort and prepare response
posts.sort(key=lambda x: x.get('created_at', ''), reverse=True)
result = {
'posts': posts,
'count': len(posts)
}
# Cache the result
set_cache(cache_key, result, CACHE_TTL)
return create_response(200, result)
except Exception as e:
logger.error(f"Error handling get_posts: {str(e)}")
return create_response(500, {'error': 'Failed to fetch posts'})
def handle_get_post(post_id: str) -> Dict[str, Any]:
"""
Handle GET /posts/{post_id} with caching
"""
try:
cache_key = f"post_{post_id}"
# Check cache first
cached_post = get_from_cache(cache_key)
if cached_post:
return create_response(200, cached_post)
# Query DynamoDB
dynamodb = get_dynamodb()
table = dynamodb.Table(POSTS_TABLE_NAME)
response = table.get_item(
Key={'post_id': post_id},
ConsistentRead=False
)
if 'Item' not in response:
return create_response(404, {'error': 'Post not found'})
post = response['Item']
# Only return published posts
if post.get('status') != 'published':
return create_response(404, {'error': 'Post not found'})
# Cache the result
set_cache(cache_key, post, CACHE_TTL)
# Asynchronously update view count (don't wait for completion)
try:
table.update_item(
Key={'post_id': post_id},
UpdateExpression='ADD view_count :inc',
ExpressionAttributeValues={':inc': 1}
)
except Exception as e:
logger.warning(f"Failed to update view count: {str(e)}")
return create_response(200, post)
except Exception as e:
logger.error(f"Error handling get_post: {str(e)}")
return create_response(500, {'error': 'Failed to fetch post'})
def get_from_cache(key: str) -> Optional[Dict[str, Any]]:
"""Get item from in-memory cache with TTL check"""
if key in function_cache:
cached_item = function_cache[key]
if datetime.utcnow().timestamp() < cached_item['expires_at']:
return cached_item['data']
else:
# Remove expired item
del function_cache[key]
return None
def set_cache(key: str, data: Dict[str, Any], ttl: int):
"""Set item in in-memory cache with TTL"""
expires_at = datetime.utcnow().timestamp() + ttl
function_cache[key] = {
'data': data,
'expires_at': expires_at
}
# Clean up cache if it gets too large
if len(function_cache) > 100:
cleanup_cache()
def cleanup_cache():
"""Remove expired items from cache"""
current_time = datetime.utcnow().timestamp()
expired_keys = [
key for key, value in function_cache.items()
if current_time >= value['expires_at']
]
for key in expired_keys:
del function_cache[key]
def create_response(status_code: int, body: Dict[str, Any]) -> Dict[str, Any]:
"""Create API Gateway response with caching headers"""
headers = {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET,POST,PUT,DELETE,OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type,Authorization'
}
# Add cache headers for GET requests
if status_code == 200:
headers['Cache-Control'] = f'max-age={CACHE_TTL}'
headers['ETag'] = f'"{hash(str(body))}"'
return {
'statusCode': status_code,
'headers': headers,
'body': json.dumps(body, default=str)
}
pythonThis optimized function demonstrates several performance techniques: global variable initialization, connection reuse, in-memory caching, lazy loading, and efficient DynamoDB queries.
Memory and CPU Optimization
Lambda functions are billed based on memory allocation and execution time. Finding the optimal memory setting often reduces both cost and latency. Here's a function that processes images with optimized memory usage:
import json
import boto3
import logging
import io
import os
from PIL import Image, ImageOps
from concurrent.futures import ThreadPoolExecutor
import threading
from typing import List, Tuple, Dict
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Global S3 client with connection pooling
s3_client = None
s3_lock = threading.Lock()
def get_s3_client():
"""Thread-safe S3 client initialization"""
global s3_client
if s3_client is None:
with s3_lock:
if s3_client is None:
s3_client = boto3.client(
's3',
config=boto3.session.Config(
max_pool_connections=50,
retries={'max_attempts': 3}
)
)
return s3_client
def lambda_handler(event, context):
"""
Process multiple images concurrently with memory optimization
"""
try:
processed_count = 0
errors = []
# Process S3 records concurrently
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
future = executor.submit(process_single_image, bucket, key)
futures.append(future)
# Collect results
for future in futures:
try:
result = future.result(timeout=30)
if result['success']:
processed_count += 1
else:
errors.append(result['error'])
except Exception as e:
errors.append(str(e))
logger.info(f"Processed {processed_count} images, {len(errors)} errors")
return {
'statusCode': 200,
'processed_count': processed_count,
'error_count': len(errors)
}
except Exception as e:
logger.error(f"Handler error: {str(e)}")
raise
def process_single_image(bucket: str, key: str) -> Dict[str, any]:
"""
Process a single image with memory-efficient techniques
"""
try:
# Skip non-image files
if not key.lower().endswith(('.jpg', '.jpeg', '.png', '.webp')):
return {'success': False, 'error': f'Skipped non-image file: {key}'}
s3 = get_s3_client()
# Download image
response = s3.get_object(Bucket=bucket, Key=key)
image_data = response['Body'].read()
# Process with memory-efficient streaming
original_image = Image.open(io.BytesIO(image_data))
# Optimize image orientation
original_image = ImageOps.exif_transpose(original_image)
# Define target sizes with quality settings
size_configs = [
{'name': 'thumbnail', 'size': (150, 150), 'quality': 85},
{'name': 'medium', 'size': (600, 600), 'quality': 90},
{'name': 'large', 'size': (1200, 1200), 'quality': 95}
]
# Process sizes sequentially to control memory usage
processed_images = {}
base_name = os.path.splitext(key)[0]
for config in size_configs:
processed_image = create_resized_image(
original_image,
config['size'],
config['quality']
)
# Upload to S3
processed_key = f"{base_name}_{config['name']}.jpg"
upload_success = upload_image_to_s3(
s3, bucket, processed_key, processed_image
)
if upload_success:
processed_images[config['name']] = {
'key': processed_key,
'size': len(processed_image.getvalue())
}
# Clear memory immediately after upload
processed_image.close()
# Clear original image from memory
original_image.close()
del image_data
logger.info(f"Successfully processed {key}")
return {
'success': True,
'processed_images': processed_images
}
except Exception as e:
logger.error(f"Error processing {key}: {str(e)}")
return {'success': False, 'error': str(e)}
def create_resized_image(original: Image.Image, target_size: Tuple[int, int], quality: int) -> io.BytesIO:
"""
Create resized image with memory optimization
"""
# Create a copy to avoid modifying original
resized = original.copy()
# Resize maintaining aspect ratio
resized.thumbnail(target_size, Image.Resampling.LANCZOS)
# Convert to RGB if necessary (for JPEG output)
if resized.mode not in ('RGB', 'L'):
resized = resized.convert('RGB')
# Save to memory buffer with optimization
buffer = io.BytesIO()
resized.save(
buffer,
format='JPEG',
quality=quality,
optimize=True,
progressive=True
)
buffer.seek(0)
# Clear resized image from memory
resized.close()
return buffer
def upload_image_to_s3(s3_client, bucket: str, key: str, image_buffer: io.BytesIO) -> bool:
"""
Upload image to S3 with proper metadata
"""
try:
s3_client.put_object(
Bucket=bucket,
Key=key,
Body=image_buffer.getvalue(),
ContentType='image/jpeg',
CacheControl='max-age=31536000', # 1 year
Metadata={
'processed-by': 'lambda-image-processor',
'processed-at': str(int(datetime.utcnow().timestamp()))
}
)
return True
except Exception as e:
logger.error(f"Failed to upload {key}: {str(e)}")
return False
pythonThis image processing function uses several memory optimization techniques: streaming I/O, immediate memory cleanup, concurrent processing with controlled parallelism, and efficient image handling.
Database Query Optimization
DynamoDB performance depends heavily on proper query patterns and index design. Here's an optimized data access layer:
import boto3
from boto3.dynamodb.conditions import Key, Attr
from typing import Dict, List, Optional, Any
import logging
from decimal import Decimal
import json
from datetime import datetime, timedelta
logger = logging.getLogger()
class OptimizedDynamoDBAccess:
"""
Optimized DynamoDB access patterns with caching and batch operations
"""
def __init__(self, table_name: str):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)
self.batch_size = 25 # DynamoDB batch limit
def get_posts_paginated(self, limit: int = 20, last_key: Optional[str] = None,
category: Optional[str] = None) -> Dict[str, Any]:
"""
Get posts with efficient pagination using GSI
"""
try:
# Use GSI for category filtering
if category:
query_kwargs = {
'IndexName': 'CategoryIndex',
'KeyConditionExpression': Key('category').eq(category),
'ScanIndexForward': False, # Newest first
'Limit': limit,
'FilterExpression': Attr('status').eq('published')
}
else:
# Use GSI for status filtering
query_kwargs = {
'IndexName': 'StatusIndex',
'KeyConditionExpression': Key('status').eq('published'),
'ScanIndexForward': False, # Newest first
'Limit': limit
}
# Handle pagination
if last_key:
query_kwargs['ExclusiveStartKey'] = json.loads(last_key)
response = self.table.query(**query_kwargs)
items = response.get('Items', [])
# Prepare pagination info
result = {
'items': self._serialize_items(items),
'count': len(items)
}
if 'LastEvaluatedKey' in response:
result['last_key'] = json.dumps(response['LastEvaluatedKey'], default=str)
return result
except Exception as e:
logger.error(f"Error querying posts: {str(e)}")
raise
def batch_get_posts(self, post_ids: List[str]) -> List[Dict[str, Any]]:
"""
Efficiently retrieve multiple posts using batch_get_item
"""
if not post_ids:
return []
try:
# Split into batches (DynamoDB batch_get_item limit is 100)
all_items = []
for i in range(0, len(post_ids), self.batch_size):
batch_ids = post_ids[i:i + self.batch_size]
# Prepare batch request
request_items = {
self.table.table_name: {
'Keys': [{'post_id': post_id} for post_id in batch_ids],
'ProjectionExpression': 'post_id, title, excerpt, author_id, created_at, #status',
'ExpressionAttributeNames': {'#status': 'status'}
}
}
# Execute batch get with retry logic
response = self._batch_get_with_retry(request_items)
items = response.get('Responses', {}).get(self.table.table_name, [])
# Filter published posts only
published_items = [item for item in items if item.get('status') == 'published']
all_items.extend(published_items)
return self._serialize_items(all_items)
except Exception as e:
logger.error(f"Error batch getting posts: {str(e)}")
raise
def _batch_get_with_retry(self, request_items: Dict[str, Any], max_retries: int = 3) -> Dict[str, Any]:
"""
Execute batch_get_item with exponential backoff for unprocessed items
"""
import time
import random
for attempt in range(max_retries):
try:
response = self.dynamodb.batch_get_item(RequestItems=request_items)
# Handle unprocessed keys
unprocessed = response.get('UnprocessedKeys', {})
if unprocessed:
if attempt < max_retries - 1:
# Exponential backoff with jitter
delay = (2 ** attempt) + random.uniform(0, 1)
time.sleep(delay)
request_items = unprocessed
continue
return response
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt)
return {}
def update_post_stats(self, post_id: str, increment_views: bool = False,
increment_comments: bool = False) -> None:
"""
Efficiently update post statistics using atomic operations
"""
try:
update_expression = 'SET updated_at = :timestamp'
expression_values = {':timestamp': datetime.utcnow().isoformat()}
if increment_views:
update_expression += ', view_count = if_not_exists(view_count, :zero) + :inc'
expression_values[':zero'] = 0
expression_values[':inc'] = 1
if increment_comments:
update_expression += ', comment_count = if_not_exists(comment_count, :zero) + :inc'
if ':zero' not in expression_values:
expression_values[':zero'] = 0
if ':inc' not in expression_values:
expression_values[':inc'] = 1
self.table.update_item(
Key={'post_id': post_id},
UpdateExpression=update_expression,
ExpressionAttributeValues=expression_values,
ReturnValues='NONE'
)
except Exception as e:
logger.error(f"Error updating post stats: {str(e)}")
# Don't raise - stats updates shouldn't break the main flow
def _serialize_items(self, items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Serialize DynamoDB items for JSON response
"""
serialized = []
for item in items:
# Convert Decimal to float/int for JSON serialization
serialized_item = {}
for key, value in item.items():
if isinstance(value, Decimal):
serialized_item[key] = float(value) if value % 1 else int(value)
else:
serialized_item[key] = value
serialized.append(serialized_item)
return serialized
pythonThis optimized DynamoDB access class demonstrates efficient query patterns, batch operations, and proper error handling that I've found essential in production serverless applications.
Monitoring and Observability
Effective monitoring becomes critical as serverless applications scale. Unlike traditional applications where you monitor servers and databases, serverless monitoring focuses on function performance, error rates, and distributed tracing across services.
AWS X-Ray Integration
X-Ray provides distributed tracing that shows how requests flow through your serverless application. This becomes invaluable when debugging performance issues or understanding complex event-driven workflows.
Here's a Lambda function with comprehensive X-Ray instrumentation:
import json
import boto3
import logging
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
from typing import Dict, Any
import time
from datetime import datetime
# Patch AWS SDK calls for automatic tracing
patch_all()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Initialize clients (will be automatically traced)
dynamodb = boto3.resource('dynamodb')
s3 = boto3.client('s3')
sns = boto3.client('sns')
@xray_recorder.capture('lambda_handler')
def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""
Lambda handler with comprehensive X-Ray tracing
"""
try:
# Add custom annotations for filtering traces
xray_recorder.put_annotation('environment', 'production')
xray_recorder.put_annotation('function_version', context.function_version)
# Add metadata for additional context
xray_recorder.put_metadata('event_info', {
'source': event.get('source', 'unknown'),
'detail_type': event.get('detail-type', 'unknown'),
'resources': event.get('resources', [])
})
# Process the event
result = process_event(event, context)
# Add result metadata
xray_recorder.put_metadata('result_info', {
'status': result.get('status', 'unknown'),
'records_processed': result.get('records_processed', 0)
})
return result
except Exception as e:
# Add error information to trace
xray_recorder.put_annotation('error', True)
xray_recorder.put_metadata('error_details', {
'error_type': type(e).__name__,
'error_message': str(e)
})
logger.error(f"Handler error: {str(e)}")
raise
@xray_recorder.capture('process_event')
def process_event(event: Dict[str, Any], context) -> Dict[str, Any]:
"""
Process incoming event with detailed tracing
"""
start_time = time.time()
records_processed = 0
try:
# Add subsegment for database operations
with xray_recorder.in_subsegment('database_operations') as subsegment:
subsegment.put_annotation('operation_type', 'batch_process')
# Process records from the event
for record in event.get('Records', []):
process_single_record(record)
records_processed += 1
subsegment.put_metadata('processing_stats', {
'records_count': records_processed,
'processing_time': time.time() - start_time
})
# Send completion notification
send_completion_notification(records_processed)
return {
'status': 'success',
'records_processed': records_processed,
'processing_time': time.time() - start_time
}
except Exception as e:
xray_recorder.put_annotation('processing_error', True)
raise
@xray_recorder.capture('process_single_record')
def process_single_record(record: Dict[str, Any]) -> None:
"""
Process individual record with tracing
"""
try:
record_type = record.get('eventName', 'unknown')
xray_recorder.put_annotation('record_type', record_type)
if record_type == 'ObjectCreated:Put':
process_s3_upload(record)
elif record_type == 'INSERT':
process_dynamodb_insert(record)
else:
logger.warning(f"Unknown record type: {record_type}")
except Exception as e:
xray_recorder.put_annotation('record_error', True)
logger.error(f"Error processing record: {str(e)}")
raise
@xray_recorder.capture('process_s3_upload')
def process_s3_upload(record: Dict[str, Any]) -> None:
"""
Process S3 upload event with detailed tracing
"""
try:
# Extract S3 information
s3_info = record['s3']
bucket = s3_info['bucket']['name']
key = s3_info['object']['key']
# Add trace annotations
xray_recorder.put_annotation('s3_bucket', bucket)
xray_recorder.put_annotation('s3_key', key)
# Process the file
with xray_recorder.in_subsegment('s3_operations') as subsegment:
subsegment.put_annotation('operation', 'get_object')
response = s3.get_object(Bucket=bucket, Key=key)
file_size = response['ContentLength']
subsegment.put_metadata('file_info', {
'size': file_size,
'content_type': response.get('ContentType', 'unknown')
})
# Store processing result
store_processing_result(bucket, key, file_size)
except Exception as e:
xray_recorder.put_annotation('s3_processing_error', True)
raise
@xray_recorder.capture('store_processing_result')
def store_processing_result(bucket: str, key: str, file_size: int) -> None:
"""
Store processing result in DynamoDB
"""
try:
table = dynamodb.Table('ProcessingResults')
with xray_recorder.in_subsegment('dynamodb_put') as subsegment:
subsegment.put_annotation('table_name', 'ProcessingResults')
table.put_item(Item={
'processing_id': f"{bucket}#{key}",
'bucket': bucket,
'key': key,
'file_size': file_size,
'processed_at': datetime.utcnow().isoformat(),
'status': 'completed'
})
subsegment.put_metadata('item_info', {
'processing_id': f"{bucket}#{key}",
'file_size': file_size
})
except Exception as e:
xray_recorder.put_annotation('dynamodb_error', True)
raise
@xray_recorder.capture('send_completion_notification')
def send_completion_notification(records_count: int) -> None:
"""
Send processing completion notification
"""
try:
with xray_recorder.in_subsegment('sns_publish') as subsegment:
subsegment.put_annotation('notification_type', 'processing_complete')
message = {
'event_type': 'processing_completed',
'records_processed': records_count,
'timestamp': datetime.utcnow().isoformat()
}
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:processing-notifications',
Subject='Processing Completed',
Message=json.dumps(message)
)
subsegment.put_metadata('notification_info', {
'records_count': records_count
})
except Exception as e:
xray_recorder.put_annotation('notification_error', True)
logger.warning(f"Failed to send notification: {str(e)}")
# Don't raise - notification failure shouldn't fail the main process
pythonThis X-Ray instrumentation provides detailed visibility into function execution, including custom annotations for filtering traces and metadata for debugging.
Custom CloudWatch Metrics
Beyond the default Lambda metrics, custom metrics provide insights into business logic and application performance:
import boto3
import logging
from datetime import datetime
from typing import Dict, List, Any
import json
cloudwatch = boto3.client('cloudwatch')
logger = logging.getLogger()
class MetricsCollector:
"""
Collect and publish custom CloudWatch metrics
"""
def __init__(self, namespace: str = 'ServerlessApp'):
self.namespace = namespace
self.metrics_buffer = []
self.max_buffer_size = 20 # CloudWatch limit
def put_metric(self, metric_name: str, value: float, unit: str = 'Count',
dimensions: Dict[str, str] = None) -> None:
"""
Add metric to buffer for batch publishing
"""
metric_data = {
'MetricName': metric_name,
'Value': value,
'Unit': unit,
'Timestamp': datetime.utcnow()
}
if dimensions:
metric_data['Dimensions'] = [
{'Name': k, 'Value': v} for k, v in dimensions.items()
]
self.metrics_buffer.append(metric_data)
# Auto-flush if buffer is full
if len(self.metrics_buffer) >= self.max_buffer_size:
self.flush_metrics()
def put_business_metric(self, event_type: str, user_id: str = None,
value: float = 1.0) -> None:
"""
Track business-specific metrics
"""
dimensions = {'EventType': event_type}
if user_id:
dimensions['UserType'] = self._classify_user(user_id)
self.put_metric(
metric_name='BusinessEvents',
value=value,
unit='Count',
dimensions=dimensions
)
def put_performance_metric(self, operation: str, duration_ms: float,
success: bool = True) -> None:
"""
Track operation performance metrics
"""
# Track duration
self.put_metric(
metric_name='OperationDuration',
value=duration_ms,
unit='Milliseconds',
dimensions={'Operation': operation}
)
# Track success/failure
self.put_metric(
metric_name='OperationResult',
value=1.0,
unit='Count',
dimensions={
'Operation': operation,
'Result': 'Success' if success else 'Error'
}
)
def put_resource_metric(self, resource_type: str, resource_id: str,
metric_name: str, value: float) -> None:
"""
Track resource-specific metrics
"""
self.put_metric(
metric_name=metric_name,
value=value,
unit='Count',
dimensions={
'ResourceType': resource_type,
'ResourceId': resource_id
}
)
def flush_metrics(self) -> None:
"""
Publish all buffered metrics to CloudWatch
"""
if not self.metrics_buffer:
return
try:
cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=self.metrics_buffer
)
logger.info(f"Published {len(self.metrics_buffer)} metrics to CloudWatch")
self.metrics_buffer.clear()
except Exception as e:
logger.error(f"Failed to publish metrics: {str(e)}")
# Clear buffer to prevent memory leaks
self.metrics_buffer.clear()
def _classify_user(self, user_id: str) -> str:
"""
Classify user type for metrics (simplified example)
"""
# In real implementation, you'd look up user data
if user_id.startswith('premium_'):
return 'Premium'
elif user_id.startswith('trial_'):
return 'Trial'
else:
return 'Free'
def __del__(self):
"""
Ensure metrics are flushed when object is destroyed
"""
self.flush_metrics()
# Example usage in Lambda function
def lambda_handler(event, context):
"""
Lambda function with comprehensive metrics collection
"""
metrics = MetricsCollector('BlogPlatform')
start_time = datetime.utcnow()
try:
# Track function invocation
metrics.put_metric('FunctionInvocations', 1.0)
# Process the request
result = process_blog_request(event, metrics)
# Track success
execution_time = (datetime.utcnow() - start_time).total_seconds() * 1000
metrics.put_performance_metric('blog_request', execution_time, True)
# Flush all metrics
metrics.flush_metrics()
return result
except Exception as e:
# Track error
execution_time = (datetime.utcnow() - start_time).total_seconds() * 1000
metrics.put_performance_metric('blog_request', execution_time, False)
metrics.put_metric('FunctionErrors', 1.0)
# Flush metrics before raising
metrics.flush_metrics()
raise
def process_blog_request(event: Dict[str, Any], metrics: MetricsCollector) -> Dict[str, Any]:
"""
Process blog request with metrics tracking
"""
http_method = event['httpMethod']
path = event['path']
# Track API usage
metrics.put_metric(
metric_name='APIRequests',
value=1.0,
dimensions={
'Method': http_method,
'Path': path.split('/')[1] if '/' in path else 'root'
}
)
if http_method == 'GET' and '/posts/' in path:
return handle_get_post(event, metrics)
elif http_method == 'POST' and path == '/posts':
return handle_create_post(event, metrics)
else:
metrics.put_metric('UnsupportedRequests', 1.0)
return {'statusCode': 404, 'body': json.dumps({'error': 'Not found'})}
def handle_get_post(event: Dict[str, Any], metrics: MetricsCollector) -> Dict[str, Any]:
"""
Handle GET request with detailed metrics
"""
post_id = event['pathParameters']['post_id']
# Track post views
metrics.put_business_metric('post_view')
metrics.put_resource_metric('Post', post_id, 'PostViews', 1.0)
# Simulate post retrieval
# In real implementation, you'd fetch from DynamoDB
post = {'id': post_id, 'title': 'Sample Post', 'views': 1}
return {
'statusCode': 200,
'body': json.dumps(post)
}
def handle_create_post(event: Dict[str, Any], metrics: MetricsCollector) -> Dict[str, Any]:
"""
Handle POST request with business metrics
"""
try:
post_data = json.loads(event['body'])
# Track post creation
metrics.put_business_metric('post_created')
# Track content length
content_length = len(post_data.get('content', ''))
metrics.put_metric(
metric_name='PostContentLength',
value=content_length,
unit='Count',
dimensions={'ContentType': 'BlogPost'}
)
# Simulate post creation
post_id = 'new-post-123'
metrics.put_resource_metric('Post', post_id, 'PostsCreated', 1.0)
return {
'statusCode': 201,
'body': json.dumps({'id': post_id, 'status': 'created'})
}
except Exception as e:
metrics.put_metric('PostCreationErrors', 1.0)
raise
pythonAutomated Alerting and Dashboards
CloudWatch alarms and dashboards provide proactive monitoring. Here's a SAM template that sets up comprehensive monitoring:
# CloudWatch Dashboard
ApplicationDashboard:
Type: AWS::CloudWatch::Dashboard
Properties:
DashboardName: !Sub 'ServerlessBlog-${Environment}'
DashboardBody: !Sub |
{
"widgets": [
{
"type": "metric",
"x": 0, "y": 0, "width": 12, "height": 6,
"properties": {
"metrics": [
["AWS/Lambda", "Invocations", "FunctionName", "${BlogApiFunction}"],
[".", "Errors", ".", "."],
[".", "Duration", ".", "."],
[".", "Throttles", ".", "."]
],
"period": 300,
"stat": "Sum",
"region": "${AWS::Region}",
"title": "Lambda Function Performance"
}
},
{
"type": "metric",
"x": 0, "y": 6, "width": 12, "height": 6,
"properties": {
"metrics": [
["AWS/ApiGateway", "Count", "ApiName", "blog-api-${Environment}"],
[".", "4XXError", ".", "."],
[".", "5XXError", ".", "."],
[".", "Latency", ".", "."]
],
"period": 300,
"stat": "Sum",
"region": "${AWS::Region}",
"title": "API Gateway Metrics"
}
},
{
"type": "metric",
"x": 0, "y": 12, "width": 12, "height": 6,
"properties": {
"metrics": [
["BlogPlatform", "BusinessEvents", "EventType", "post_view"],
[".", ".", ".", "post_created"],
[".", "APIRequests", "Method", "GET"],
[".", ".", ".", "POST"]
],
"period": 300,
"stat": "Sum",
"region": "${AWS::Region}",
"title": "Business Metrics"
}
}
]
}
# CloudWatch Alarms
HighErrorRateAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: !Sub 'blog-api-high-error-rate-${Environment}'
AlarmDescription: 'High error rate detected'
ComparisonOperator: GreaterThanThreshold
EvaluationPeriods: 2
MetricName: Errors
Namespace: AWS/Lambda
Period: 300
Statistic: Sum
Threshold: 10
ActionsEnabled: true
AlarmActions:
- !Ref AlertTopic
Dimensions:
- Name: FunctionName
Value: !Ref BlogApiFunction
HighLatencyAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: !Sub 'blog-api-high-latency-${Environment}'
AlarmDescription: 'High latency detected'
ComparisonOperator: GreaterThanThreshold
EvaluationPeriods: 3
MetricName: Duration
Namespace: AWS/Lambda
Period: 300
Statistic: Average
Threshold: 5000
ActionsEnabled: true
AlarmActions:
- !Ref AlertTopic
Dimensions:
- Name: FunctionName
Value: !Ref BlogApiFunction
LowBusinessActivityAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: !Sub 'low-business-activity-${Environment}'
AlarmDescription: 'Unusually low business activity'
ComparisonOperator: LessThanThreshold
EvaluationPeriods: 3
MetricName: BusinessEvents
Namespace: BlogPlatform
Period: 900
Statistic: Sum
Threshold: 10
ActionsEnabled: true
AlarmActions:
- !Ref AlertTopic
TreatMissingData: breaching
yamlCost Optimization Techniques
Cost optimization in serverless applications requires understanding the pricing models and implementing strategies that reduce compute time, memory usage, and service calls. The pay-per-use model means that optimizations directly impact your bill.
Resource Sizing and Lifecycle Management
Right-sizing Lambda functions and implementing proper lifecycle policies can significantly reduce costs. Here's a cost optimization analyzer I've developed for client projects:
import boto3
import json
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
import logging
logger = logging.getLogger()
class ServerlessCostOptimizer:
"""
Analyze and optimize serverless application costs
"""
def __init__(self, region: str = 'us-east-1'):
self.lambda_client = boto3.client('lambda', region_name=region)
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
self.pricing_client = boto3.client('pricing', region_name='us-east-1')
self.s3 = boto3.client('s3', region_name=region)
def analyze_lambda_costs(self, days: int = 30) -> Dict[str, any]:
"""
Analyze Lambda function costs and provide optimization recommendations
"""
try:
# Get all Lambda functions
functions = self._get_all_functions()
cost_analysis = {
'total_functions': len(functions),
'analysis_period_days': days,
'functions': {},
'recommendations': []
}
total_estimated_cost = 0
for function in functions:
function_name = function['FunctionName']
logger.info(f"Analyzing function: {function_name}")
# Get function metrics
metrics = self._get_function_metrics(function_name, days)
# Calculate costs
cost_breakdown = self._calculate_function_costs(function, metrics)
# Generate recommendations
recommendations = self._generate_function_recommendations(function, metrics, cost_breakdown)
cost_analysis['functions'][function_name] = {
'current_config': {
'memory_size': function['MemorySize'],
'timeout': function['Timeout'],
'runtime': function['Runtime']
},
'metrics': metrics,
'cost_breakdown': cost_breakdown,
'recommendations': recommendations
}
total_estimated_cost += cost_breakdown['total_monthly_cost']
cost_analysis['total_estimated_monthly_cost'] = total_estimated_cost
cost_analysis['global_recommendations'] = self._generate_global_recommendations(cost_analysis)
return cost_analysis
except Exception as e:
logger.error(f"Error analyzing Lambda costs: {str(e)}")
raise
def _get_all_functions(self) -> List[Dict[str, any]]:
"""Get all Lambda functions in the account"""
functions = []
paginator = self.lambda_client.get_paginator('list_functions')
for page in paginator.paginate():
functions.extend(page['Functions'])
return functions
def _get_function_metrics(self, function_name: str, days: int) -> Dict[str, any]:
"""Get CloudWatch metrics for a function"""
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=days)
# Get invocation count
invocations = self._get_metric_sum('Invocations', function_name, start_time, end_time)
# Get duration statistics
duration_avg = self._get_metric_average('Duration', function_name, start_time, end_time)
duration_max = self._get_metric_maximum('Duration', function_name, start_time, end_time)
# Get error count
errors = self._get_metric_sum('Errors', function_name, start_time, end_time)
# Get throttles
throttles = self._get_metric_sum('Throttles', function_name, start_time, end_time)
return {
'invocations': invocations,
'avg_duration_ms': duration_avg,
'max_duration_ms': duration_max,
'errors': errors,
'throttles': throttles,
'error_rate': (errors / invocations * 100) if invocations > 0 else 0,
'throttle_rate': (throttles / invocations * 100) if invocations > 0 else 0
}
def _get_metric_sum(self, metric_name: str, function_name: str,
start_time: datetime, end_time: datetime) -> float:
"""Get sum of a CloudWatch metric"""
try:
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/Lambda',
MetricName=metric_name,
Dimensions=[{'Name': 'FunctionName', 'Value': function_name}],
StartTime=start_time,
EndTime=end_time,
Period=3600, # 1 hour periods
Statistics=['Sum']
)
return sum(point['Sum'] for point in response['Datapoints'])
except Exception as e:
logger.warning(f"Failed to get {metric_name} for {function_name}: {str(e)}")
return 0.0
def _get_metric_average(self, metric_name: str, function_name: str,
start_time: datetime, end_time: datetime) -> float:
"""Get average of a CloudWatch metric"""
try:
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/Lambda',
MetricName=metric_name,
Dimensions=[{'Name': 'FunctionName', 'Value': function_name}],
StartTime=start_time,
EndTime=end_time,
Period=3600,
Statistics=['Average']
)
if response['Datapoints']:
return sum(point['Average'] for point in response['Datapoints']) / len(response['Datapoints'])
return 0.0
except Exception:
return 0.0
def _get_metric_maximum(self, metric_name: str, function_name: str,
start_time: datetime, end_time: datetime) -> float:
"""Get maximum of a CloudWatch metric"""
try:
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/Lambda',
MetricName=metric_name,
Dimensions=[{'Name': 'FunctionName', 'Value': function_name}],
StartTime=start_time,
EndTime=end_time,
Period=3600,
Statistics=['Maximum']
)
if response['Datapoints']:
return max(point['Maximum'] for point in response['Datapoints'])
return 0.0
except Exception:
return 0.0
def _calculate_function_costs(self, function: Dict[str, any], metrics: Dict[str, any]) -> Dict[str, any]:
"""Calculate detailed cost breakdown for a function"""
# AWS Lambda pricing (US East 1, as of 2025)
request_price = 0.0000002 # per request
gb_second_price = 0.0000166667 # per GB-second
memory_gb = function['MemorySize'] / 1024
avg_duration_seconds = metrics['avg_duration_ms'] / 1000
monthly_invocations = metrics['invocations'] * 30 / 30 # normalize to monthly
# Calculate costs
request_cost = monthly_invocations * request_price
compute_cost = monthly_invocations * memory_gb * avg_duration_seconds * gb_second_price
total_cost = request_cost + compute_cost
return {
'monthly_invocations': monthly_invocations,
'request_cost': request_cost,
'compute_cost': compute_cost,
'total_monthly_cost': total_cost,
'cost_per_invocation': total_cost / monthly_invocations if monthly_invocations > 0 else 0
}
def _generate_function_recommendations(self, function: Dict[str, any], metrics: Dict[str, any],
cost_breakdown: Dict[str, any]) -> List[Dict[str, any]]:
"""Generate optimization recommendations for a function"""
recommendations = []
current_memory = function['MemorySize']
current_timeout = function['Timeout']
avg_duration = metrics['avg_duration_ms']
max_duration = metrics['max_duration_ms']
# Memory optimization
if avg_duration > 0:
# Test different memory sizes
optimal_memory = self._find_optimal_memory(current_memory, avg_duration, metrics['invocations'])
if optimal_memory != current_memory:
potential_savings = self._calculate_memory_savings(
current_memory, optimal_memory, metrics, cost_breakdown
)
recommendations.append({
'type': 'memory_optimization',
'current_value': current_memory,
'recommended_value': optimal_memory,
'potential_monthly_savings': potential_savings,
'reason': f'Optimal memory size for price/performance ratio'
})
# Timeout optimization
if max_duration > 0 and current_timeout > (max_duration / 1000 * 1.2): # 20% buffer
recommended_timeout = int(max_duration / 1000 * 1.5) # 50% buffer
recommendations.append({
'type': 'timeout_optimization',
'current_value': current_timeout,
'recommended_value': recommended_timeout,
'potential_monthly_savings': 0, # Timeout doesn't affect cost directly
'reason': f'Reduce timeout from {current_timeout}s to {recommended_timeout}s (max duration: {max_duration/1000:.1f}s)'
})
# Provisioned concurrency recommendations
if metrics['invocations'] > 10000: # High-traffic function
pc_cost = self._calculate_provisioned_concurrency_cost(function, metrics)
if pc_cost['net_savings'] > 0:
recommendations.append({
'type': 'provisioned_concurrency',
'current_value': 0,
'recommended_value': pc_cost['recommended_pc'],
'potential_monthly_savings': pc_cost['net_savings'],
'reason': 'High-traffic function would benefit from provisioned concurrency'
})
# Unused function detection
if metrics['invocations'] < 10: # Very low usage
recommendations.append({
'type': 'unused_function',
'current_value': 'active',
'recommended_value': 'consider_removal',
'potential_monthly_savings': cost_breakdown['total_monthly_cost'],
'reason': f'Function has only {metrics["invocations"]} invocations in the analysis period'
})
return recommendations
def _find_optimal_memory(self, current_memory: int, avg_duration: float, invocations: float) -> int:
"""Find optimal memory size for cost/performance"""
memory_options = [128, 256, 512, 1024, 1536, 2048, 3008]
best_memory = current_memory
best_cost = float('inf')
for memory in memory_options:
# Estimate performance improvement (CPU scales with memory)
cpu_factor = memory / current_memory
estimated_duration = avg_duration / min(cpu_factor, 2.0) # Cap at 2x improvement
# Calculate cost for this memory size
memory_gb = memory / 1024
monthly_cost = invocations * 30 / 30 * (
0.0000002 + # request cost
memory_gb * (estimated_duration / 1000) * 0.0000166667 # compute cost
)
if monthly_cost < best_cost:
best_cost = monthly_cost
best_memory = memory
return best_memory
def _calculate_memory_savings(self, current_memory: int, optimal_memory: int,
metrics: Dict[str, any], current_costs: Dict[str, any]) -> float:
"""Calculate potential savings from memory optimization"""
cpu_factor = optimal_memory / current_memory
estimated_duration = metrics['avg_duration_ms'] / min(cpu_factor, 2.0)
optimal_costs = self._calculate_function_costs(
{'MemorySize': optimal_memory},
{**metrics, 'avg_duration_ms': estimated_duration}
)
return current_costs['total_monthly_cost'] - optimal_costs['total_monthly_cost']
def _calculate_provisioned_concurrency_cost(self, function: Dict[str, any],
metrics: Dict[str, any]) -> Dict[str, any]:
"""Calculate provisioned concurrency cost/benefit analysis"""
# Simplified calculation - in reality you'd analyze traffic patterns
avg_concurrent_executions = metrics['invocations'] / (30 * 24 * 60) # rough estimate
recommended_pc = max(1, int(avg_concurrent_executions * 1.5))
# PC pricing: $0.0000041667 per GB-second
memory_gb = function['MemorySize'] / 1024
pc_monthly_cost = recommended_pc * memory_gb * 30 * 24 * 3600 * 0.0000041667
# Estimated cold start savings (very rough)
cold_start_reduction_value = metrics['invocations'] * 0.001 # $0.001 per invocation
net_savings = cold_start_reduction_value - pc_monthly_cost
return {
'recommended_pc': recommended_pc,
'monthly_pc_cost': pc_monthly_cost,
'estimated_cold_start_savings': cold_start_reduction_value,
'net_savings': net_savings
}
def _generate_global_recommendations(self, analysis: Dict[str, any]) -> List[str]:
"""Generate account-wide optimization recommendations"""
recommendations = []
total_cost = analysis['total_estimated_monthly_cost']
# Check for unused functions
unused_functions = sum(1 for func_data in analysis['functions'].values()
if any(rec['type'] == 'unused_function' for rec in func_data['recommendations']))
if unused_functions > 0:
recommendations.append(f"Consider removing {unused_functions} unused functions")
# Check for over-provisioned functions
over_provisioned = sum(1 for func_data in analysis['functions'].values()
if any(rec['type'] == 'memory_optimization' and rec['potential_monthly_savings'] > 5
for rec in func_data['recommendations']))
if over_provisioned > 0:
recommendations.append(f"Optimize memory for {over_provisioned} over-provisioned functions")
# Cost threshold recommendations
if total_cost > 1000:
recommendations.append("Consider implementing reserved capacity for high-traffic functions")
if total_cost > 100:
recommendations.append("Implement automated cost monitoring and alerting")
return recommendations
def generate_cost_report(self, analysis: Dict[str, any], output_file: str = None) -> str:
"""Generate a detailed cost optimization report"""
report = []
report.append("# Serverless Cost Optimization Report")
report.append(f"Generated: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC")
report.append(f"Analysis Period: {analysis['analysis_period_days']} days")
report.append(f"Total Functions: {analysis['total_functions']}")
report.append(f"Estimated Monthly Cost: ${analysis['total_estimated_monthly_cost']:.2f}")
report.append("")
# Summary of potential savings
total_potential_savings = sum(
sum(rec['potential_monthly_savings'] for rec in func_data['recommendations'])
for func_data in analysis['functions'].values()
)
report.append(f"## Cost Optimization Summary")
report.append(f"Total Potential Monthly Savings: ${total_potential_savings:.2f}")
report.append(f"Potential Savings Percentage: {(total_potential_savings / analysis['total_estimated_monthly_cost'] * 100):.1f}%")
report.append("")
# Global recommendations
report.append("## Global Recommendations")
for rec in analysis['global_recommendations']:
report.append(f"- {rec}")
report.append("")
# Function-by-function analysis
report.append("## Function Analysis")
for func_name, func_data in analysis['functions'].items():
report.append(f"### {func_name}")
report.append(f"Current Cost: ${func_data['cost_breakdown']['total_monthly_cost']:.2f}/month")
report.append(f"Invocations: {func_data['metrics']['invocations']:,.0f}")
report.append(f"Avg Duration: {func_data['metrics']['avg_duration_ms']:.0f}ms")
report.append(f"Error Rate: {func_data['metrics']['error_rate']:.2f}%")
report.append("")
if func_data['recommendations']:
report.append("**Recommendations:**")
for rec in func_data['recommendations']:
savings_text = f" (Save ${rec['potential_monthly_savings']:.2f}/month)" if rec['potential_monthly_savings'] > 0 else ""
report.append(f"- {rec['reason']}{savings_text}")
else:
report.append("No optimization recommendations")
report.append("")
report_text = "\n".join(report)
if output_file:
with open(output_file, 'w') as f:
f.write(report_text)
return report_text
# Usage example
if __name__ == "__main__":
optimizer = ServerlessCostOptimizer()
analysis = optimizer.analyze_lambda_costs(days=30)
report = optimizer.generate_cost_report(analysis, 'cost_optimization_report.md')
print("Cost optimization report generated!")
pythonS3 Storage Optimization
S3 storage costs can grow significantly with serverless applications. Here's an automated S3 lifecycle management system:
import boto3
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
import logging
logger = logging.getLogger()
class S3CostOptimizer:
"""
Optimize S3 storage costs through lifecycle policies and analysis
"""
def __init__(self):
self.s3 = boto3.client('s3')
self.cloudwatch = boto3.client('cloudwatch')
def optimize_bucket_lifecycle(self, bucket_name: str,
data_patterns: Dict[str, any] = None) -> Dict[str, any]:
"""
Create optimized lifecycle policies based on data access patterns
"""
try:
# Analyze existing data if patterns not provided
if not data_patterns:
data_patterns = self._analyze_bucket_access_patterns(bucket_name)
# Generate lifecycle rules
lifecycle_rules = self._generate_lifecycle_rules(data_patterns)
# Apply lifecycle configuration
lifecycle_config = {
'Rules': lifecycle_rules
}
self.s3.put_bucket_lifecycle_configuration(
Bucket=bucket_name,
LifecycleConfiguration=lifecycle_config
)
# Calculate cost savings
savings_estimate = self._estimate_lifecycle_savings(bucket_name, lifecycle_rules)
return {
'bucket': bucket_name,
'rules_applied': len(lifecycle_rules),
'estimated_monthly_savings': savings_estimate,
'lifecycle_rules': lifecycle_rules
}
except Exception as e:
logger.error(f"Error optimizing bucket lifecycle: {str(e)}")
raise
def _analyze_bucket_access_patterns(self, bucket_name: str) -> Dict[str, any]:
"""
Analyze S3 access patterns to inform lifecycle policies
"""
# This would typically analyze CloudTrail logs or S3 access logs
# Simplified example based on object prefixes and ages
try:
paginator = self.s3.get_paginator('list_objects_v2')
patterns = {
'total_objects': 0,
'total_size_gb': 0,
'prefix_patterns': {},
'age_distribution': {'0-30': 0, '30-90': 0, '90-365': 0, '365+': 0}
}
for page in paginator.paginate(Bucket=bucket_name):
for obj in page.get('Contents', []):
patterns['total_objects'] += 1
patterns['total_size_gb'] += obj['Size'] / (1024**3)
# Analyze by prefix
prefix = obj['Key'].split('/')[0] if '/' in obj['Key'] else 'root'
if prefix not in patterns['prefix_patterns']:
patterns['prefix_patterns'][prefix] = {'count': 0, 'size_gb': 0}
patterns['prefix_patterns'][prefix]['count'] += 1
patterns['prefix_patterns'][prefix]['size_gb'] += obj['Size'] / (1024**3)
# Analyze by age
age_days = (datetime.now(obj['LastModified'].tzinfo) - obj['LastModified']).days
if age_days <= 30:
patterns['age_distribution']['0-30'] += 1
elif age_days <= 90:
patterns['age_distribution']['30-90'] += 1
elif age_days <= 365:
patterns['age_distribution']['90-365'] += 1
else:
patterns['age_distribution']['365+'] += 1
return patterns
except Exception as e:
logger.error(f"Error analyzing bucket patterns: {str(e)}")
return {}
def _generate_lifecycle_rules(self, patterns: Dict[str, any]) -> List[Dict[str, any]]:
"""
Generate lifecycle rules based on access patterns
"""
rules = []
# Rule for logs and temporary data
if 'logs' in patterns.get('prefix_patterns', {}):
rules.append({
'ID': 'logs-lifecycle',
'Status': 'Enabled',
'Filter': {'Prefix': 'logs/'},
'Transitions': [
{
'Days': 30,
'StorageClass': 'STANDARD_IA'
},
{
'Days': 90,
'StorageClass': 'GLACIER'
}
],
'Expiration': {'Days': 365}
})
# Rule for user uploads
if 'uploads' in patterns.get('prefix_patterns', {}):
rules.append({
'ID': 'uploads-lifecycle',
'Status': 'Enabled',
'Filter': {'Prefix': 'uploads/'},
'Transitions': [
{
'Days': 90,
'StorageClass': 'STANDARD_IA'
},
{
'Days': 365,
'StorageClass': 'GLACIER'
}
]
})
# Rule for processed images
if 'processed' in patterns.get('prefix_patterns', {}):
rules.append({
'ID': 'processed-images-lifecycle',
'Status': 'Enabled',
'Filter': {'Prefix': 'processed/'},
'Transitions': [
{
'Days': 60,
'StorageClass': 'STANDARD_IA'
}
]
})
# Global rule for multipart uploads cleanup
rules.append({
'ID': 'multipart-cleanup',
'Status': 'Enabled',
'Filter': {},
'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 7}
})
# Global rule for old versions (if versioning enabled)
rules.append({
'ID': 'old-versions-cleanup',
'Status': 'Enabled',
'Filter': {},
'NoncurrentVersionTransitions': [
{
'NoncurrentDays': 30,
'StorageClass': 'STANDARD_IA'
},
{
'NoncurrentDays': 90,
'StorageClass': 'GLACIER'
}
],
'NoncurrentVersionExpiration': {'NoncurrentDays': 365}
})
return rules
def _estimate_lifecycle_savings(self, bucket_name: str, rules: List[Dict[str, any]]) -> float:
"""
Estimate monthly savings from lifecycle policies
"""
# Simplified cost calculation
# Standard: $0.023/GB/month
# Standard-IA: $0.0125/GB/month
# Glacier: $0.004/GB/month
try:
# Get bucket size
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/S3',
MetricName='BucketSizeBytes',
Dimensions=[
{'Name': 'BucketName', 'Value': bucket_name},
{'Name': 'StorageType', 'Value': 'StandardStorage'}
],
StartTime=datetime.utcnow() - timedelta(days=1),
EndTime=datetime.utcnow(),
Period=86400,
Statistics=['Average']
)
if response['Datapoints']:
size_bytes = response['Datapoints'][0]['Average']
size_gb = size_bytes / (1024**3)
# Estimate savings based on transitions
current_cost = size_gb * 0.023 # All in Standard
# Rough estimate: 50% transitions to IA after 30-90 days, 25% to Glacier
estimated_new_cost = (
size_gb * 0.25 * 0.023 + # 25% stays in Standard
size_gb * 0.50 * 0.0125 + # 50% moves to Standard-IA
size_gb * 0.25 * 0.004 # 25% moves to Glacier
)
return max(0, current_cost - estimated_new_cost)
return 0.0
except Exception as e:
logger.warning(f"Could not estimate savings: {str(e)}")
return 0.0
def setup_intelligent_tiering(self, bucket_name: str, prefixes: List[str] = None) -> Dict[str, any]:
"""
Set up S3 Intelligent Tiering for automated cost optimization
"""
try:
if not prefixes:
prefixes = [''] # Apply to entire bucket
configurations = []
for i, prefix in enumerate(prefixes):
config_id = f"intelligent-tiering-{i}"
config = {
'Id': config_id,
'Status': 'Enabled',
'OptionalFields': ['BucketKeyStatus'],
'Tiering': {
'Days': 1,
'AccessTier': 'ARCHIVE_ACCESS'
}
}
if prefix:
config['Filter'] = {'Prefix': prefix}
# Apply intelligent tiering configuration
self.s3.put_bucket_intelligent_tiering_configuration(
Bucket=bucket_name,
Id=config_id,
IntelligentTieringConfiguration=config
)
configurations.append(config)
return {
'bucket': bucket_name,
'configurations': configurations,
'estimated_savings': '10-20% on infrequently accessed objects'
}
except Exception as e:
logger.error(f"Error setting up intelligent tiering: {str(e)}")
raise
pythonSecurity Best Practices
Security in serverless applications requires a layered approach covering authentication, authorization, data protection, and secure coding practices. The shared responsibility model means AWS handles infrastructure security while you're responsible for application-level security.
JWT Authentication and Authorization
Here's a comprehensive authentication system I've implemented for multiple client projects:
import json
import jwt
import boto3
import hashlib
import secrets
import logging
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
logger = logging.getLogger()
@dataclass
class User:
user_id: str
email: str
password_hash: str
roles: List[str]
created_at: str
last_login: Optional[str] = None
is_active: bool = True
failed_login_attempts: int = 0
locked_until: Optional[str] = None
class SecureAuthenticationService:
"""
Comprehensive authentication and authorization service
"""
def __init__(self):
self.dynamodb = boto3.resource('dynamodb')
self.secrets_manager = boto3.client('secretsmanager')
self.users_table = self.dynamodb.Table('Users')
self.sessions_table = self.dynamodb.Table('UserSessions')
# Security configuration
self.max_failed_attempts = 5
self.lockout_duration_minutes = 30
self.session_timeout_hours = 24
self.password_min_length = 12
def register_user(self, email: str, password: str, first_name: str,
last_name: str) -> Dict[str, Any]:
"""
Register new user with security validations
"""
try:
# Validate input
validation_result = self._validate_registration_input(email, password)
if not validation_result['valid']:
return {
'success': False,
'error': validation_result['error'],
'error_code': 'VALIDATION_ERROR'
}
# Check if user already exists
if self._user_exists(email):
return {
'success': False,
'error': 'User with this email already exists',
'error_code': 'USER_EXISTS'
}
# Hash password securely
password_hash = self._hash_password(password)
# Create user record
user_id = secrets.token_urlsafe(16)
user = User(
user_id=user_id,
email=email.lower().strip(),
password_hash=password_hash,
roles=['user'],
created_at=datetime.utcnow().isoformat()
)
# Store user
self.users_table.put_item(Item=user.__dict__)
logger.info(f"User registered successfully: {user_id}")
return {
'success': True,
'user_id': user_id,
'message': 'User registered successfully'
}
except Exception as e:
logger.error(f"Registration error: {str(e)}")
return {
'success': False,
'error': 'Registration failed',
'error_code': 'INTERNAL_ERROR'
}
def authenticate_user(self, email: str, password: str,
client_ip: str = None) -> Dict[str, Any]:
"""
Authenticate user with security measures
"""
try:
email = email.lower().strip()
# Get user record
user = self._get_user_by_email(email)
if not user:
# Prevent timing attacks
self._hash_password('dummy_password')
return {
'success': False,
'error': 'Invalid credentials',
'error_code': 'INVALID_CREDENTIALS'
}
# Check if account is locked
if self._is_account_locked(user):
return {
'success': False,
'error': 'Account temporarily locked due to failed login attempts',
'error_code': 'ACCOUNT_LOCKED'
}
# Check if account is active
if not user.get('is_active', True):
return {
'success': False,
'error': 'Account is deactivated',
'error_code': 'ACCOUNT_DEACTIVATED'
}
# Verify password
if not self._verify_password(password, user['password_hash']):
self._handle_failed_login(user['user_id'])
return {
'success': False,
'error': 'Invalid credentials',
'error_code': 'INVALID_CREDENTIALS'
}
# Successful authentication
self._handle_successful_login(user['user_id'])
# Generate JWT token
token_data = self._generate_jwt_token(user)
# Create session record
session_id = self._create_session(user['user_id'], client_ip)
return {
'success': True,
'access_token': token_data['access_token'],
'refresh_token': token_data['refresh_token'],
'expires_in': token_data['expires_in'],
'session_id': session_id,
'user': {
'user_id': user['user_id'],
'email': user['email'],
'roles': user.get('roles', ['user'])
}
}
except Exception as e:
logger.error(f"Authentication error: {str(e)}")
return {
'success': False,
'error': 'Authentication failed',
'error_code': 'INTERNAL_ERROR'
}
def validate_token(self, token: str) -> Dict[str, Any]:
"""
Validate JWT token and return user information
"""
try:
# Get JWT secret
jwt_secret = self._get_jwt_secret()
# Decode and validate token
payload = jwt.decode(token, jwt_secret, algorithms=['HS256'])
# Check if session is still valid
session_valid = self._is_session_valid(payload.get('session_id'))
if not session_valid:
return {
'valid': False,
'error': 'Session expired or invalid',
'error_code': 'SESSION_INVALID'
}
# Get current user data
user = self._get_user_by_id(payload['user_id'])
if not user or not user.get('is_active', True):
return {
'valid': False,
'error': 'User not found or deactivated',
'error_code': 'USER_INVALID'
}
return {
'valid': True,
'user_id': payload['user_id'],
'email': payload['email'],
'roles': user.get('roles', ['user']),
'session_id': payload.get('session_id')
}
except jwt.ExpiredSignatureError:
return {
'valid': False,
'error': 'Token expired',
'error_code': 'TOKEN_EXPIRED'
}
except jwt.InvalidTokenError:
return {
'valid': False,
'error': 'Invalid token',
'error_code': 'TOKEN_INVALID'
}
except Exception as e:
logger.error(f"Token validation error: {str(e)}")
return {
'valid': False,
'error': 'Token validation failed',
'error_code': 'INTERNAL_ERROR'
}
def refresh_token(self, refresh_token: str) -> Dict[str, Any]:
"""
Refresh access token using refresh token
"""
try:
jwt_secret = self._get_jwt_secret()
# Decode refresh token
payload = jwt.decode(refresh_token, jwt_secret, algorithms=['HS256'])
if payload.get('type') != 'refresh':
return {
'success': False,
'error': 'Invalid refresh token',
'error_code': 'INVALID_REFRESH_TOKEN'
}
# Get user and validate session
user = self._get_user_by_id(payload['user_id'])
if not user or not self._is_session_valid(payload.get('session_id')):
return {
'success': False,
'error': 'Invalid session',
'error_code': 'SESSION_INVALID'
}
# Generate new access token
token_data = self._generate_jwt_token(user, session_id=payload.get('session_id'))
return {
'success': True,
'access_token': token_data['access_token'],
'expires_in': token_data['expires_in']
}
except jwt.ExpiredSignatureError:
return {
'success': False,
'error': 'Refresh token expired',
'error_code': 'REFRESH_TOKEN_EXPIRED'
}
except Exception as e:
logger.error(f"Token refresh error: {str(e)}")
return {
'success': False,
'error': 'Token refresh failed',
'error_code': 'INTERNAL_ERROR'
}
def logout_user(self, session_id: str) -> Dict[str, Any]:
"""
Logout user and invalidate session
"""
try:
# Invalidate session
self.sessions_table.update_item(
Key={'session_id': session_id},
UpdateExpression='SET is_active = :false, logged_out_at = :timestamp',
ExpressionAttributeValues={
':false': False,
':timestamp': datetime.utcnow().isoformat()
}
)
return {'success': True, 'message': 'Logged out successfully'}
except Exception as e:
logger.error(f"Logout error: {str(e)}")
return {
'success': False,
'error': 'Logout failed',
'error_code': 'INTERNAL_ERROR'
}
def change_password(self, user_id: str, current_password: str,
new_password: str) -> Dict[str, Any]:
"""
Change user password with validation
"""
try:
# Get user
user = self._get_user_by_id(user_id)
if not user:
return {
'success': False,
'error': 'User not found',
'error_code': 'USER_NOT_FOUND'
}
# Verify current password
if not self._verify_password(current_password, user['password_hash']):
return {
'success': False,
'error': 'Current password is incorrect',
'error_code': 'INVALID_CURRENT_PASSWORD'
}
# Validate new password
validation = self._validate_password(new_password)
if not validation['valid']:
return {
'success': False,
'error': validation['error'],
'error_code': 'WEAK_PASSWORD'
}
# Hash new password
new_password_hash = self._hash_password(new_password)
# Update password
self.users_table.update_item(
Key={'user_id': user_id},
UpdateExpression='SET password_hash = :hash, updated_at = :timestamp',
ExpressionAttributeValues={
':hash': new_password_hash,
':timestamp': datetime.utcnow().isoformat()
}
)
# Invalidate all sessions for this user
self._invalidate_user_sessions(user_id)
logger.info(f"Password changed for user: {user_id}")
return {
'success': True,
'message': 'Password changed successfully. Please log in again.'
}
except Exception as e:
logger.error(f"Password change error: {str(e)}")
return {
'success': False,
'error': 'Password change failed',
'error_code': 'INTERNAL_ERROR'
}
def _validate_registration_input(self, email: str, password: str) -> Dict[str, Any]:
"""Validate registration input"""
import re
# Email validation
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
return {'valid': False, 'error': 'Invalid email format'}
# Password validation
password_validation = self._validate_password(password)
if not password_validation['valid']:
return password_validation
return {'valid': True}
def _validate_password(self, password: str) -> Dict[str, Any]:
"""Validate password strength"""
import re
if len(password) < self.password_min_length:
return {
'valid': False,
'error': f'Password must be at least {self.password_min_length} characters long'
}
# Check for complexity requirements
requirements = {
'uppercase': r'[A-Z]',
'lowercase': r'[a-z]',
'digit': r'\d',
'special': r'[!@#$%^&*(),.?":{}|<>]'
}
missing_requirements = []
for req_name, pattern in requirements.items():
if not re.search(pattern, password):
missing_requirements.append(req_name)
if missing_requirements:
return {
'valid': False,
'error': f'Password must contain: {", ".join(missing_requirements)}'
}
return {'valid': True}
def _hash_password(self, password: str) -> str:
"""Hash password using PBKDF2 with salt"""
import hashlib
import secrets
salt = secrets.token_hex(16)
hashed = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
return f"{salt}:{hashed.hex()}"
def _verify_password(self, password: str, stored_hash: str) -> bool:
"""Verify password against stored hash"""
import hashlib
try:
salt, hashed = stored_hash.split(':')
new_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
return new_hash.hex() == hashed
except Exception:
return False
def _get_jwt_secret(self) -> str:
"""Get JWT secret from AWS Secrets Manager"""
try:
response = self.secrets_manager.get_secret_value(SecretId='jwt-secret')
secret_data = json.loads(response['SecretString'])
return secret_data['secret']
except Exception as e:
logger.error(f"Failed to get JWT secret: {str(e)}")
raise
def _generate_jwt_token(self, user: Dict[str, Any], session_id: str = None) -> Dict[str, Any]:
"""Generate JWT access and refresh tokens"""
jwt_secret = self._get_jwt_secret()
now = datetime.utcnow()
# Access token (short-lived)
access_payload = {
'user_id': user['user_id'],
'email': user['email'],
'roles': user.get('roles', ['user']),
'session_id': session_id,
'type': 'access',
'iat': now,
'exp': now + timedelta(hours=1)
}
# Refresh token (long-lived)
refresh_payload = {
'user_id': user['user_id'],
'session_id': session_id,
'type': 'refresh',
'iat': now,
'exp': now + timedelta(days=30)
}
access_token = jwt.encode(access_payload, jwt_secret, algorithm='HS256')
refresh_token = jwt.encode(refresh_payload, jwt_secret, algorithm='HS256')
return {
'access_token': access_token,
'refresh_token': refresh_token,
'expires_in': 3600 # 1 hour
}
def _user_exists(self, email: str) -> bool:
"""Check if user exists by email"""
try:
response = self.users_table.query(
IndexName='EmailIndex',
KeyConditionExpression=boto3.dynamodb.conditions.Key('email').eq(email)
)
return len(response['Items']) > 0
except Exception:
return False
def _get_user_by_email(self, email: str) -> Optional[Dict[str, Any]]:
"""Get user record by email"""
try:
response = self.users_table.query(
IndexName='EmailIndex',
KeyConditionExpression=boto3.dynamodb.conditions.Key('email').eq(email)
)
return response['Items'][0] if response['Items'] else None
except Exception:
return None
def _get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]:
"""Get user record by ID"""
try:
response = self.users_table.get_item(Key={'user_id': user_id})
return response.get('Item')
except Exception:
return None
def _is_account_locked(self, user: Dict[str, Any]) -> bool:
"""Check if account is locked due to failed attempts"""
locked_until = user.get('locked_until')
if not locked_until:
return False
try:
lock_time = datetime.fromisoformat(locked_until)
return datetime.utcnow() < lock_time
except Exception:
return False
def _handle_failed_login(self, user_id: str) -> None:
"""Handle failed login attempt"""
try:
failed_attempts = 1
update_expression = 'ADD failed_login_attempts :inc SET updated_at = :timestamp'
expression_values = {
':inc': 1,
':timestamp': datetime.utcnow().isoformat()
}
# Check current failed attempts
user = self._get_user_by_id(user_id)
if user:
current_attempts = user.get('failed_login_attempts', 0) + 1
# Lock account if too many failed attempts
if current_attempts >= self.max_failed_attempts:
lock_until = datetime.utcnow() + timedelta(minutes=self.lockout_duration_minutes)
update_expression += ', locked_until = :lock_until'
expression_values[':lock_until'] = lock_until.isoformat()
self.users_table.update_item(
Key={'user_id': user_id},
UpdateExpression=update_expression,
ExpressionAttributeValues=expression_values
)
except Exception as e:
logger.error(f"Error handling failed login: {str(e)}")
def _handle_successful_login(self, user_id: str) -> None:
"""Handle successful login"""
try:
self.users_table.update_item(
Key={'user_id': user_id},
UpdateExpression='''
SET last_login = :timestamp,
failed_login_attempts = :zero,
updated_at = :timestamp
REMOVE locked_until
''',
ExpressionAttributeValues={
':timestamp': datetime.utcnow().isoformat(),
':zero': 0
}
)
except Exception as e:
logger.error(f"Error handling successful login: {str(e)}")
def _create_session(self, user_id: str, client_ip: str = None) -> str:
"""Create user session record"""
try:
session_id = secrets.token_urlsafe(32)
session_data = {
'session_id': session_id,
'user_id': user_id,
'created_at': datetime.utcnow().isoformat(),
'last_activity': datetime.utcnow().isoformat(),
'is_active': True,
'client_ip': client_ip,
'expires_at': (datetime.utcnow() + timedelta(hours=self.session_timeout_hours)).isoformat()
}
self.sessions_table.put_item(Item=session_data)
return session_id
except Exception as e:
logger.error(f"Error creating session: {str(e)}")
return secrets.token_urlsafe(32) # Return a session ID anyway
def _is_session_valid(self, session_id: str) -> bool:
"""Check if session is valid and active"""
if not session_id:
return False
try:
response = self.sessions_table.get_item(Key={'session_id': session_id})
session = response.get('Item')
if not session or not session.get('is_active', False):
return False
# Check expiration
expires_at = datetime.fromisoformat(session['expires_at'])
if datetime.utcnow() > expires_at:
# Expire the session
self.sessions_table.update_item(
Key={'session_id': session_id},
UpdateExpression='SET is_active = :false',
ExpressionAttributeValues={':false': False}
)
return False
# Update last activity
self.sessions_table.update_item(
Key={'session_id': session_id},
UpdateExpression='SET last_activity = :timestamp',
ExpressionAttributeValues={':timestamp': datetime.utcnow().isoformat()}
)
return True
except Exception as e:
logger.error(f"Error validating session: {str(e)}")
return False
def _invalidate_user_sessions(self, user_id: str) -> None:
"""Invalidate all sessions for a user"""
try:
# Query all sessions for the user
response = self.sessions_table.query(
IndexName='UserIdIndex',
KeyConditionExpression=boto3.dynamodb.conditions.Key('user_id').eq(user_id),
FilterExpression=boto3.dynamodb.conditions.Attr('is_active').eq(True)
)
# Invalidate each session
for session in response['Items']:
self.sessions_table.update_item(
Key={'session_id': session['session_id']},
UpdateExpression='SET is_active = :false, logged_out_at = :timestamp',
ExpressionAttributeValues={
':false': False,
':timestamp': datetime.utcnow().isoformat()
}
)
except Exception as e:
logger.error(f"Error invalidating user sessions: {str(e)}")
pythonInput Validation and Sanitization
Proper input validation prevents injection attacks and data corruption. Here's a comprehensive validation framework:
import re
import html
import json
import logging
from typing import Dict, Any, List, Union, Optional
from dataclasses import dataclass
logger = logging.getLogger()
@dataclass
class ValidationRule:
field_name: str
required: bool = False
data_type: str = 'string'
min_length: Optional[int] = None
max_length: Optional[int] = None
min_value: Optional[Union[int, float]] = None
max_value: Optional[Union[int, float]] = None
pattern: Optional[str] = None
allowed_values: Optional[List[Any]] = None
custom_validator: Optional[callable] = None
class InputValidator:
"""
Comprehensive input validation and sanitization
"""
def __init__(self):
# Common regex patterns
self.patterns = {
'email': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
'phone': r'^\+?1?-?\.?\s?\(?([0-9]{3})\)?[-.\s]?([0-9]{3})[-.\s]?([0-9]{4})$',
'uuid': r'^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$',
'url': r'^https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)$',
'slug': r'^[a-z0-9]+(?:-[a-z0-9]+)*$',
'username': r'^[a-zA-Z0-9_]{3,20}$'
}
def validate_data(self, data: Dict[str, Any], rules: List[ValidationRule]) -> Dict[str, Any]:
"""
Validate data against defined rules
"""
try:
errors = {}
validated_data = {}
# Create rules lookup
rules_map = {rule.field_name: rule for rule in rules}
# Check for required fields
for rule in rules:
if rule.required and rule.field_name not in data:
errors[rule.field_name] = f"Field '{rule.field_name}' is required"
# Validate each field in data
for field_name, value in data.items():
if field_name not in rules_map:
continue # Skip unknown fields
rule = rules_map[field_name]
# Skip validation for None values if field is not required
if value is None and not rule.required:
continue
field_errors = self._validate_field(value, rule)
if field_errors:
errors[field_name] = field_errors
else:
# Sanitize and add to validated data
validated_data[field_name] = self._sanitize_value(value, rule)
return {
'valid': len(errors) == 0,
'errors': errors,
'data': validated_data
}
except Exception as e:
logger.error(f"Validation error: {str(e)}")
return {
'valid': False,
'errors': {'general': 'Validation failed'},
'data': {}
}
def _validate_field(self, value: Any, rule: ValidationRule) -> Optional[str]:
"""Validate individual field against rule"""
# Type validation
if rule.data_type == 'string' and not isinstance(value, str):
return f"Must be a string"
elif rule.data_type == 'integer' and not isinstance(value, int):
return f"Must be an integer"
elif rule.data_type == 'float' and not isinstance(value, (int, float)):
return f"Must be a number"
elif rule.data_type == 'boolean' and not isinstance(value, bool):
return f"Must be a boolean"
elif rule.data_type == 'array' and not isinstance(value, list):
return f"Must be an array"
elif rule.data_type == 'object' and not isinstance(value, dict):
return f"Must be an object"
# String-specific validations
if rule.data_type == 'string' and isinstance(value, str):
# Length validation
if rule.min_length is not None and len(value) < rule.min_length:
return f"Must be at least {rule.min_length} characters long"
if rule.max_length is not None and len(value) > rule.max_length:
return f"Must be no more than {rule.max_length} characters long"
# Pattern validation
if rule.pattern:
pattern = self.patterns.get(rule.pattern, rule.pattern)
if not re.match(pattern, value):
return f"Invalid format for {rule.field_name}"
# Numeric validations
if rule.data_type in ['integer', 'float'] and isinstance(value, (int, float)):
if rule.min_value is not None and value < rule.min_value:
return f"Must be at least {rule.min_value}"
if rule.max_value is not None and value > rule.max_value:
return f"Must be no more than {rule.max_value}"
# Array validations
if rule.data_type == 'array' and isinstance(value, list):
if rule.min_length is not None and len(value) < rule.min_length:
return f"Must have at least {rule.min_length} items"
if rule.max_length is not None and len(value) > rule.max_length:
return f"Must have no more than {rule.max_length} items"
# Allowed values validation
if rule.allowed_values and value not in rule.allowed_values:
return f"Must be one of: {', '.join(map(str, rule.allowed_values))}"
# Custom validation
if rule.custom_validator:
try:
custom_result = rule.custom_validator(value)
if custom_result is not True:
return custom_result if isinstance(custom_result, str) else "Invalid value"
except Exception as e:
return f"Validation error: {str(e)}"
return None
def _sanitize_value(self, value: Any, rule: ValidationRule) -> Any:
"""Sanitize value based on type and rule"""
if rule.data_type == 'string' and isinstance(value, str):
# HTML escape for security
value = html.escape(value.strip())
# Additional sanitization based on field type
if rule.pattern == 'email':
value = value.lower()
elif rule.pattern == 'slug':
value = re.sub(r'[^a-z0-9-]', '', value.lower())
elif rule.data_type == 'array' and isinstance(value, list):
# Sanitize each item in the array
value = [self._sanitize_array_item(item) for item in value]
elif rule.data_type == 'object' and isinstance(value, dict):
# Sanitize object values
value = {k: self._sanitize_object_value(v) for k, v in value.items()}
return value
def _sanitize_array_item(self, item: Any) -> Any:
"""Sanitize individual array items"""
if isinstance(item, str):
return html.escape(item.strip())
return item
def _sanitize_object_value(self, value: Any) -> Any:
"""Sanitize object values"""
if isinstance(value, str):
return html.escape(value.strip())
elif isinstance(value, list):
return [self._sanitize_array_item(item) for item in value]
elif isinstance(value, dict):
return {k: self._sanitize_object_value(v) for k, v in value.items()}
return value
# Example usage in Lambda function
def lambda_handler(event, context):
"""
Lambda function with comprehensive input validation
"""
validator = InputValidator()
try:
# Parse request body
if event.get('body'):
try:
request_data = json.loads(event['body'])
except json.JSONDecodeError:
return {
'statusCode': 400,
'body': json.dumps({'error': 'Invalid JSON in request body'})
}
else:
request_data = {}
# Define validation rules based on endpoint
http_method = event['httpMethod']
path = event['path']
if path == '/posts' and http_method == 'POST':
validation_rules = [
ValidationRule('title', required=True, max_length=200, min_length=1),
ValidationRule('content', required=True, max_length=50000, min_length=10),
ValidationRule('category', required=True,
allowed_values=['technology', 'business', 'lifestyle', 'health']),
ValidationRule('tags', data_type='array', max_length=10),
ValidationRule('excerpt', max_length=500),
ValidationRule('featured_image', pattern='url'),
ValidationRule('status', allowed_values=['draft', 'published'], required=True),
ValidationRule('publish_at', custom_validator=validate_publish_date)
]
elif path == '/users/register' and http_method == 'POST':
validation_rules = [
ValidationRule('email', required=True, pattern='email', max_length=254),
ValidationRule('password', required=True, min_length=12,
custom_validator=validate_password_strength),
ValidationRule('first_name', required=True, max_length=50, min_length=1),
ValidationRule('last_name', required=True, max_length=50, min_length=1),
ValidationRule('username', pattern='username', max_length=20, min_length=3),
ValidationRule('phone', pattern='phone'),
ValidationRule('terms_accepted', data_type='boolean', required=True)
]
else:
return {
'statusCode': 404,
'body': json.dumps({'error': 'Endpoint not found'})
}
# Validate request data
validation_result = validator.validate_data(request_data, validation_rules)
if not validation_result['valid']:
return {
'statusCode': 400,
'body': json.dumps({
'error': 'Validation failed',
'details': validation_result['errors']
})
}
# Process with validated and sanitized data
sanitized_data = validation_result['data']
# Your business logic here using sanitized_data
result = process_request(sanitized_data, path, http_method)
return {
'statusCode': 200,
'body': json.dumps(result)
}
except Exception as e:
logger.error(f"Request processing error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': 'Internal server error'})
}
def validate_password_strength(password: str) -> Union[bool, str]:
"""Custom password strength validator"""
import re
# Check for common weak patterns
if re.search(r'(.)\1{2,}', password): # Same character repeated 3+ times
return "Password cannot contain repeated characters"
common_patterns = ['123', 'abc', 'qwe', 'password', 'admin']
for pattern in common_patterns:
if pattern in password.lower():
return f"Password cannot contain common pattern: {pattern}"
# Check complexity
requirements = {
'uppercase': r'[A-Z]',
'lowercase': r'[a-z]',
'digit': r'\d',
'special': r'[!@#$%^&*(),.?":{}|<>]'
}
missing = []
for req_name, pattern in requirements.items():
if not re.search(pattern, password):
missing.append(req_name)
if missing:
return f"Password must contain: {', '.join(missing)}"
return True
def validate_publish_date(date_str: str) -> Union[bool, str]:
"""Custom date validator"""
try:
from datetime import datetime
# Parse ISO format date
publish_date = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
# Check if date is not in the past (with some tolerance)
now = datetime.utcnow().replace(tzinfo=publish_date.tzinfo)
if publish_date < now - timedelta(minutes=5):
return "Publish date cannot be in the past"
# Check if date is not too far in the future
if publish_date > now + timedelta(days=365):
return "Publish date cannot be more than a year in the future"
return True
except ValueError:
return "Invalid date format. Use ISO 8601 format."
def process_request(data: Dict[str, Any], path: str, method: str) -> Dict[str, Any]:
"""Process validated request data"""
# Your business logic implementation
return {'message': 'Request processed successfully', 'data': data}
pythonTesting Strategy
Testing serverless applications requires a different approach than traditional applications. You need to test Lambda functions in isolation, integration between services, and end-to-end workflows across distributed systems.
Unit Testing with Mocks
Here's a comprehensive unit testing framework I use for Lambda functions:
import unittest
from unittest.mock import Mock, patch, MagicMock
import json
import boto3
from moto import mock_dynamodb, mock_s3, mock_sns
import pytest
from datetime import datetime
import sys
import os
# Add src directory to path for imports
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
from blog_api import app # Your Lambda function module
class TestBlogAPI(unittest.TestCase):
"""
Unit tests for blog API Lambda function
"""
def setUp(self):
"""Set up test fixtures"""
self.mock_context = Mock()
self.mock_context.function_name = 'test-function'
self.mock_context.function_version = '1'
self.mock_context.invoked_function_arn = 'arn:aws:lambda:us-east-1:123456789012:function:test'
self.mock_context.memory_limit_in_mb = 256
self.mock_context.remaining_time_in_millis = lambda: 30000
# Sample test data
self.sample_post = {
'post_id': 'test-post-123',
'title': 'Test Post Title',
'content': 'This is test content for the blog post.',
'author_id': 'author-123',
'category': 'technology',
'status': 'published',
'created_at': '2025-01-01T00:00:00Z',
'updated_at': '2025-01-01T00:00:00Z'
}
@mock_dynamodb
def test_get_post_success(self):
"""Test successful post retrieval"""
# Create mock DynamoDB table
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb.create_table(
TableName='BlogPosts-test',
KeySchema=[
{'AttributeName': 'post_id', 'KeyType': 'HASH'}
],
AttributeDefinitions=[
{'AttributeName': 'post_id', 'AttributeType': 'S'}
],
BillingMode='PAY_PER_REQUEST'
)
# Insert test data
table.put_item(Item=self.sample_post)
# Create test event
event = {
'httpMethod': 'GET',
'path': '/posts/test-post-123',
'pathParameters': {'post_id': 'test-post-123'},
'queryStringParameters': None,
'body': None,
'headers': {}
}
# Mock environment variables
with patch.dict(os.environ, {'BLOG_POSTS_TABLE': 'BlogPosts-test'}):
# Execute function
response = app.lambda_handler(event, self.mock_context)
# Verify response
self.assertEqual(response['statusCode'], 200)
response_body = json.loads(response['body'])
self.assertEqual(response_body['post_id'], 'test-post-123')
self.assertEqual(response_body['title'], 'Test Post Title')
@mock_dynamodb
def test_get_post_not_found(self):
"""Test post not found scenario"""
# Create empty mock table
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
dynamodb.create_table(
TableName='BlogPosts-test',
KeySchema=[
{'AttributeName': 'post_id', 'KeyType': 'HASH'}
],
AttributeDefinitions=[
{'AttributeName': 'post_id', 'AttributeType': 'S'}
],
BillingMode='PAY_PER_REQUEST'
)
event = {
'httpMethod': 'GET',
'path': '/posts/nonexistent-post',
'pathParameters': {'post_id': 'nonexistent-post'},
'queryStringParameters': None,
'body': None,
'headers': {}
}
with patch.dict(os.environ, {'BLOG_POSTS_TABLE': 'BlogPosts-test'}):
response = app.lambda_handler(event, self.mock_context)
self.assertEqual(response['statusCode'], 404)
response_body = json.loads(response['body'])
self.assertIn('error', response_body)
@mock_dynamodb
def test_create_post_success(self):
"""Test successful post creation"""
# Create mock table
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
dynamodb.create_table(
TableName='BlogPosts-test',
KeySchema=[
{'AttributeName': 'post_id', 'KeyType': 'HASH'}
],
AttributeDefinitions=[
{'AttributeName': 'post_id', 'AttributeType': 'S'}
],
BillingMode='PAY_PER_REQUEST'
)
new_post_data = {
'title': 'New Test Post',
'content': 'This is the content of the new post.',
'author_id': 'author-456',
'category': 'business',
'status': 'draft'
}
event = {
'httpMethod': 'POST',
'path': '/posts',
'pathParameters': None,
'queryStringParameters': None,
'body': json.dumps(new_post_data),
'headers': {'Content-Type': 'application/json'}
}
with patch.dict(os.environ, {'BLOG_POSTS_TABLE': 'BlogPosts-test'}):
response = app.lambda_handler(event, self.mock_context)
self.assertEqual(response['statusCode'], 201)
response_body = json.loads(response['body'])
self.assertIn('post_id', response_body)
self.assertEqual(response_body['title'], 'New Test Post')
def test_create_post_validation_error(self):
"""Test post creation with validation errors"""
invalid_post_data = {
'title': '', # Empty title should fail validation
'content': 'Short', # Too short content
'category': 'invalid_category' # Invalid category
}
event = {
'httpMethod': 'POST',
'path': '/posts',
'pathParameters': None,
'queryStringParameters': None,
'body': json.dumps(invalid_post_data),
'headers': {'Content-Type': 'application/json'}
}
response = app.lambda_handler(event, self.mock_context)
self.assertEqual(response['statusCode'], 400)
response_body = json.loads(response['body'])
self.assertIn('error', response_body)
@mock_dynamodb
def test_get_posts_with_pagination(self):
"""Test posts listing with pagination"""
# Create mock table and data
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb.create_table(
TableName='BlogPosts-test',
KeySchema=[
{'AttributeName': 'post_id', 'KeyType': 'HASH'}
],
AttributeDefinitions=[
{'AttributeName': 'post_id', 'AttributeType': 'S'},
{'AttributeName': 'status', 'AttributeType': 'S'},
{'AttributeName': 'created_at', 'AttributeType': 'S'}
],
GlobalSecondaryIndexes=[
{
'IndexName': 'StatusIndex',
'KeySchema': [
{'AttributeName': 'status', 'KeyType': 'HASH'},
{'AttributeName': 'created_at', 'KeyType': 'RANGE'}
],
'Projection': {'ProjectionType': 'ALL'}
}
],
BillingMode='PAY_PER_REQUEST'
)
# Insert multiple test posts
for i in range(5):
post = {
'post_id': f'post-{i}',
'title': f'Test Post {i}',
'content': f'Content for post {i}',
'status': 'published',
'created_at': f'2025-01-0{i+1}T00:00:00Z'
}
table.put_item(Item=post)
event = {
'httpMethod': 'GET',
'path': '/posts',
'pathParameters': None,
'queryStringParameters': {'limit': '3'},
'body': None,
'headers': {}
}
with patch.dict(os.environ, {'BLOG_POSTS_TABLE': 'BlogPosts-test'}):
response = app.lambda_handler(event, self.mock_context)
self.assertEqual(response['statusCode'], 200)
response_body = json.loads(response['body'])
self.assertIn('posts', response_body)
self.assertLessEqual(len(response_body['posts']), 3)
@patch('boto3.client')
def test_dynamodb_error_handling(self, mock_boto_client):
"""Test DynamoDB error handling"""
# Mock DynamoDB client to raise exception
mock_dynamodb = Mock()
mock_boto_client.return_value = mock_dynamodb
mock_dynamodb.get_item.side_effect = Exception("DynamoDB error")
event = {
'httpMethod': 'GET',
'path': '/posts/test-post',
'pathParameters': {'post_id': 'test-post'},
'queryStringParameters': None,
'body': None,
'headers': {}
}
response = app.lambda_handler(event, self.mock_context)
self.assertEqual(response['statusCode'], 500)
response_body = json.loads(response['body'])
self.assertIn('error', response_body)
def test_invalid_json_handling(self):
"""Test handling of invalid JSON in request body"""
event = {
'httpMethod': 'POST',
'path': '/posts',
'pathParameters': None,
'queryStringParameters': None,
'body': '{"invalid": json}', # Invalid JSON
'headers': {'Content-Type': 'application/json'}
}
response = app.lambda_handler(event, self.mock_context)
self.assertEqual(response['statusCode'], 400)
response_body = json.loads(response['body'])
self.assertIn('error', response_body)
class TestUserRegistration(unittest.TestCase):
"""
Unit tests for user registration function
"""
def setUp(self):
self.mock_context = Mock()
self.mock_context.function_name = 'user-registration-test'
@mock_dynamodb
@mock_sns
def test_user_registration_success(self):
"""Test successful user registration"""
# Create mock DynamoDB table
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb.create_table(
TableName='Users-test',
KeySchema=[
{'AttributeName': 'user_id', 'KeyType': 'HASH'}
],
AttributeDefinitions=[
{'AttributeName': 'user_id', 'AttributeType': 'S'},
{'AttributeName': 'email', 'AttributeType': 'S'}
],
GlobalSecondaryIndexes=[
{
'IndexName': 'EmailIndex',
'KeySchema': [
{'AttributeName': 'email', 'KeyType': 'HASH'}
],
'Projection': {'ProjectionType': 'ALL'}
}
],
BillingMode='PAY_PER_REQUEST'
)
# Create mock SNS topic
sns = boto3.client('sns', region_name='us-east-1')
topic_response = sns.create_topic(Name='user-events-test')
topic_arn = topic_response['TopicArn']
registration_data = {
'email': '[email protected]',
'password': 'Secure*Password123',
'first_name': 'Test',
'last_name': 'User'
}
event = {
'httpMethod': 'POST',
'path': '/users/register',
'pathParameters': None,
'queryStringParameters': None,
'body': json.dumps(registration_data),
'headers': {'Content-Type': 'application/json'}
}
# Mock environment variables
with patch.dict(os.environ, {
'USERS_TABLE': 'Users-test',
'USER_EVENTS_TOPIC': topic_arn
}):
from user_registration import app
response = app.lambda_handler(event, self.mock_context)
# Verify successful registration
self.assertEqual(response['statusCode'], 201)
response_body = json.loads(response['body'])
self.assertIn('user_id', response_body)
self.assertEqual(response_body['message'], 'User registered successfully')
# Verify user was stored in DynamoDB
users = table.scan()['Items']
self.assertEqual(len(users), 1)
self.assertEqual(users[0]['email'], '[email protected]')
# Verify SNS message was published
messages = sns.list_subscriptions_by_topic(TopicArn=topic_arn)
# In a real test, you'd verify the message content
@mock_dynamodb
def test_user_registration_duplicate_email(self):
"""Test registration with duplicate email"""
# Setup table with existing user
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb.create_table(
TableName='Users-test',
KeySchema=[
{'AttributeName': 'user_id', 'KeyType': 'HASH'}
],
AttributeDefinitions=[
{'AttributeName': 'user_id', 'AttributeType': 'S'},
{'AttributeName': 'email', 'AttributeType': 'S'}
],
GlobalSecondaryIndexes=[
{
'IndexName': 'EmailIndex',
'KeySchema': [
{'AttributeName': 'email', 'KeyType': 'HASH'}
],
'Projection': {'ProjectionType': 'ALL'}
}
],
BillingMode='PAY_PER_REQUEST'
)
# Insert existing user
table.put_item(Item={
'user_id': 'existing-user-123',
'email': '[email protected]',
'password_hash': 'existing_hash'
})
registration_data = {
'email': '[email protected]', # Duplicate email
'password': 'NewPassword123!',
'first_name': 'Test',
'last_name': 'User'
}
event = {
'httpMethod': 'POST',
'path': '/users/register',
'body': json.dumps(registration_data),
'headers': {'Content-Type': 'application/json'}
}
with patch.dict(os.environ, {'USERS_TABLE': 'Users-test'}):
from user_registration import app
response = app.lambda_handler(event, self.mock_context)
# Verify rejection
self.assertEqual(response['statusCode'], 409)
response_body = json.loads(response['body'])
self.assertIn('already exists', response_body['error'])
def test_password_validation(self):
"""Test password strength validation"""
weak_passwords = [
'short', # Too short
'nouppercase123!', # No uppercase
'NOLOWERCASE123!', # No lowercase
'NoNumbers!', # No numbers
'NoSpecialChars1' # No special characters
]
for password in weak_passwords:
registration_data = {
'email': '[email protected]',
'password': password,
'first_name': 'Test',
'last_name': 'User'
}
event = {
'httpMethod': 'POST',
'path': '/users/register',
'body': json.dumps(registration_data),
'headers': {'Content-Type': 'application/json'}
}
from user_registration import app
response = app.lambda_handler(event, self.mock_context)
self.assertEqual(response['statusCode'], 400)
response_body = json.loads(response['body'])
self.assertIn('error', response_body)
pythonIntegration Testing
Integration tests verify that different components work together correctly. For serverless applications, this means testing the interaction between Lambda functions, databases, and other AWS services:
import boto3
import json
import time
import requests
import pytest
from typing import Dict, Any
class TestServerlessIntegration:
"""
Integration tests for serverless application components
"""
def setup_class(self):
"""Setup for integration tests"""
# These would be real AWS resources in a test environment
self.api_endpoint = os.environ.get('API_ENDPOINT')
self.dynamodb = boto3.resource('dynamodb')
self.s3 = boto3.client('s3')
self.sns = boto3.client('sns')
# Test data cleanup list
self.cleanup_items = []
def teardown_class(self):
"""Cleanup after integration tests"""
# Clean up test data
for item in self.cleanup_items:
try:
if item['type'] == 'dynamodb':
table = self.dynamodb.Table(item['table'])
table.delete_item(Key=item['key'])
elif item['type'] == 's3':
self.s3.delete_object(Bucket=item['bucket'], Key=item['key'])
except Exception as e:
print(f"Cleanup error: {e}")
def test_complete_user_registration_flow(self):
"""Test complete user registration workflow"""
# Generate unique test data
timestamp = str(int(time.time()))
test_email = f"test+{timestamp}@example.com"
registration_data = {
'email': test_email,
'password': 'Secure*Password123',
'first_name': 'Integration',
'last_name': 'Test'
}
# Step 1: Register user
response = requests.post(
f"{self.api_endpoint}/users/register",
json=registration_data,
headers={'Content-Type': 'application/json'}
)
assert response.status_code == 201
registration_response = response.json()
user_id = registration_response['user_id']
# Track for cleanup
self.cleanup_items.append({
'type': 'dynamodb',
'table': 'Users',
'key': {'user_id': user_id}
})
# Step 2: Verify user exists in database
time.sleep(2) # Allow for eventual consistency
users_table = self.dynamodb.Table('Users')
user_record = users_table.get_item(Key={'user_id': user_id})
assert 'Item' in user_record
assert user_record['Item']['email'] == test_email
assert user_record['Item']['status'] == 'active'
# Step 3: Test login with new user
login_data = {
'email': test_email,
'password': 'Secure*Password123'
}
login_response = requests.post(
f"{self.api_endpoint}/auth/login",
json=login_data,
headers={'Content-Type': 'application/json'}
)
assert login_response.status_code == 200
login_result = login_response.json()
assert 'access_token' in login_result
assert 'refresh_token' in login_result
# Step 4: Test authenticated API call
headers = {
'Authorization': f"Bearer {login_result['access_token']}",
'Content-Type': 'application/json'
}
profile_response = requests.get(
f"{self.api_endpoint}/users/{user_id}",
headers=headers
)
assert profile_response.status_code == 200
profile_data = profile_response.json()
assert profile_data['email'] == test_email
def test_image_processing_pipeline(self):
"""Test end-to-end image processing workflow"""
# Generate test image
test_image_data = self.create_test_image()
test_key = f"test-images/integration-test-{int(time.time())}.jpg"
# Step 1: Upload image to S3
self.s3.put_object(
Bucket='test-images-bucket',
Key=test_key,
Body=test_image_data,
ContentType='image/jpeg'
)
# Track for cleanup
self.cleanup_items.append({
'type': 's3',
'bucket': 'test-images-bucket',
'key': test_key
})
# Step 2: Wait for processing (Lambda triggered by S3 event)
time.sleep(10) # Allow time for processing
# Step 3: Verify processed images exist
processed_sizes = ['thumbnail', 'medium', 'large']
base_key = test_key.rsplit('.', 1)[0]
for size in processed_sizes:
processed_key = f"{base_key}_{size}.jpg"
try:
response = self.s3.head_object(
Bucket='test-images-bucket',
Key=processed_key
)
assert response['ContentType'] == 'image/jpeg'
# Track for cleanup
self.cleanup_items.append({
'type': 's3',
'bucket': 'test-images-bucket',
'key': processed_key
})
except self.s3.exceptions.NoSuchKey:
pytest.fail(f"Processed image not found: {processed_key}")
# Step 4: Verify processing completion event
# This would check SNS notifications or database updates
time.sleep(5) # Allow for event processing
# Check if processing completion was recorded
events_table = self.dynamodb.Table('ProcessingEvents')
processing_record = events_table.query(
IndexName='ImageKeyIndex',
KeyConditionExpression='image_key = :key',
ExpressionAttributeValues={':key': test_key}
)
assert len(processing_record['Items']) > 0
assert processing_record['Items'][0]['status'] == 'completed'
def test_real_time_analytics_flow(self):
"""Test real-time analytics data flow"""
# Generate test activity events
test_user_id = f"test-user-{int(time.time())}"
activity_events = [
{
'user_id': test_user_id,
'event_type': 'page_view',
'timestamp': datetime.utcnow().isoformat() + 'Z',
'properties': {
'page': '/home',
'duration': 1500
}
},
{
'user_id': test_user_id,
'event_type': 'click',
'timestamp': datetime.utcnow().isoformat() + 'Z',
'properties': {
'element': 'nav-menu',
'page': '/home'
}
}
]
# Step 1: Send events to Kinesis stream
kinesis_client = boto3.client('kinesis')
for event in activity_events:
kinesis_client.put_record(
StreamName='user-activity-test',
Data=json.dumps(event),
PartitionKey=test_user_id
)
# Step 2: Wait for stream processing
time.sleep(15) # Allow for Lambda processing
# Step 3: Verify analytics results
analytics_table = self.dynamodb.Table('UserAnalytics')
# Check hourly stats
hour_key = datetime.utcnow().strftime('%Y-%m-%d-%H')
hourly_stats = analytics_table.get_item(
Key={
'user_id': test_user_id,
'time_window': f"hour:{hour_key}"
}
)
assert 'Item' in hourly_stats
assert hourly_stats['Item']['event_count'] >= 2
# Check daily stats
day_key = datetime.utcnow().strftime('%Y-%m-%d')
daily_stats = analytics_table.get_item(
Key={
'user_id': test_user_id,
'time_window': f"day:{day_key}"
}
)
assert 'Item' in daily_stats
assert daily_stats['Item']['total_events'] >= 2
def test_error_handling_and_retry_logic(self):
"""Test error handling and retry mechanisms"""
# Test with intentionally invalid data
invalid_post_data = {
'title': '', # Invalid - empty title
'content': 'x', # Invalid - too short
'author_id': 'invalid-author-id'
}
# Step 1: Submit invalid data
response = requests.post(
f"{self.api_endpoint}/posts",
json=invalid_post_data,
headers={'Content-Type': 'application/json'}
)
# Should receive validation error
assert response.status_code == 400
error_response = response.json()
assert 'error' in error_response
assert 'validation' in error_response['error'].lower()
# Step 2: Test retry logic with temporary failure
# This would simulate network timeouts or service unavailability
# In a real test, you might use chaos engineering tools
# Step 3: Test graceful degradation
# Verify system continues operating when non-critical components fail
def create_test_image(self) -> bytes:
"""Create a simple test image"""
from PIL import Image
import io
# Create a simple test image
image = Image.new('RGB', (800, 600), color='blue')
# Save to bytes
img_byte_array = io.BytesIO()
image.save(img_byte_array, format='JPEG')
return img_byte_array.getvalue()
pythonEnd-to-End Testing
End-to-end tests validate complete user workflows across the entire application stack:
import pytest
import time
import boto3
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
class TestEndToEndWorkflows:
"""
End-to-end tests for complete user workflows
"""
@pytest.fixture(scope="class")
def browser(self):
"""Setup browser for E2E tests"""
options = webdriver.ChromeOptions()
options.add_argument('--headless') # Run headless for CI/CD
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=options)
driver.implicitly_wait(10)
yield driver
driver.quit()
@pytest.fixture(scope="class")
def test_user_credentials(self):
"""Create test user for E2E tests"""
timestamp = str(int(time.time()))
return {
'email': f"e2e-test+{timestamp}@example.com",
'password': 'E2ETest*Password123',
'first_name': 'E2E',
'last_name': 'Test'
}
def test_complete_blog_workflow(self, browser, test_user_credentials):
"""Test complete blog creation and publication workflow"""
web_app_url = os.environ.get('WEB_APP_URL', 'https://test-blog-app.example.com')
# Step 1: User registration
browser.get(f"{web_app_url}/register")
# Fill registration form
browser.find_element(By.ID, "email").send_keys(test_user_credentials['email'])
browser.find_element(By.ID, "password").send_keys(test_user_credentials['password'])
browser.find_element(By.ID, "firstName").send_keys(test_user_credentials['first_name'])
browser.find_element(By.ID, "lastName").send_keys(test_user_credentials['last_name'])
# Submit registration
browser.find_element(By.ID, "register-button").click()
# Wait for registration success
WebDriverWait(browser, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, "registration-success"))
)
# Step 2: Email verification (simulate)
# In a real E2E test, you might check a test email service
time.sleep(2)
# Step 3: User login
browser.get(f"{web_app_url}/login")
browser.find_element(By.ID, "email").send_keys(test_user_credentials['email'])
browser.find_element(By.ID, "password").send_keys(test_user_credentials['password'])
browser.find_element(By.ID, "login-button").click()
# Wait for login success
WebDriverWait(browser, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, "dashboard"))
)
# Step 4: Create new blog post
browser.find_element(By.ID, "create-post-button").click()
# Fill post form
post_title = f"E2E Test Post {timestamp}"
post_content = "This is a test blog post created during end-to-end testing."
browser.find_element(By.ID, "post-title").send_keys(post_title)
browser.find_element(By.ID, "post-content").send_keys(post_content)
browser.find_element(By.ID, "post-category").send_keys("technology")
# Save as draft first
browser.find_element(By.ID, "save-draft-button").click()
# Wait for save confirmation
WebDriverWait(browser, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, "save-success"))
)
# Step 5: Upload featured image
browser.find_element(By.ID, "upload-image-button").click()
# Simulate file upload
file_input = browser.find_element(By.ID, "image-file-input")
# In real test, you'd upload an actual test image file
# file_input.send_keys("/path/to/test/image.jpg")
# Step 6: Publish post
browser.find_element(By.ID, "publish-button").click()
# Confirm publication
WebDriverWait(browser, 10).until(
EC.element_to_be_clickable((By.ID, "confirm-publish"))
).click()
# Wait for publication success
WebDriverWait(browser, 15).until(
EC.presence_of_element_located((By.CLASS_NAME, "publish-success"))
)
# Step 7: Verify post appears in public blog
browser.get(f"{web_app_url}/blog")
# Wait for blog list to load
WebDriverWait(browser, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, "blog-post-list"))
)
# Verify our post appears
post_links = browser.find_elements(By.CLASS_NAME, "post-title-link")
post_titles = [link.text for link in post_links]
assert post_title in post_titles, f"Published post '{post_title}' not found in blog list"
# Step 8: View individual post
target_post_link = None
for link in post_links:
if link.text == post_title:
target_post_link = link
break
assert target_post_link is not None
target_post_link.click()
# Verify post content
WebDriverWait(browser, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, "post-content"))
)
displayed_title = browser.find_element(By.CLASS_NAME, "post-title").text
displayed_content = browser.find_element(By.CLASS_NAME, "post-content").text
assert displayed_title == post_title
assert post_content in displayed_content
def test_image_processing_workflow(self, browser, test_user_credentials):
"""Test image upload and processing workflow"""
web_app_url = os.environ.get('WEB_APP_URL')
# Login first (assuming user exists from previous test)
browser.get(f"{web_app_url}/login")
browser.find_element(By.ID, "email").send_keys(test_user_credentials['email'])
browser.find_element(By.ID, "password").send_keys(test_user_credentials['password'])
browser.find_element(By.ID, "login-button").click()
# Navigate to profile
browser.get(f"{web_app_url}/profile")
# Upload profile image
browser.find_element(By.ID, "upload-avatar-button").click()
# Wait for upload interface
file_input = WebDriverWait(browser, 10).until(
EC.presence_of_element_located((By.ID, "avatar-file-input"))
)
# In real test, upload actual image file
# file_input.send_keys("/path/to/test/avatar.jpg")
# Simulate upload completion
time.sleep(5) # Allow for image processing
# Verify processed images are displayed
WebDriverWait(browser, 30).until(
EC.presence_of_element_located((By.CLASS_NAME, "avatar-thumbnail"))
)
# Check that different image sizes are available
thumbnail = browser.find_element(By.CLASS_NAME, "avatar-thumbnail")
medium_img = browser.find_element(By.CLASS_NAME, "avatar-medium")
assert thumbnail.get_attribute("src") is not None
assert medium_img.get_attribute("src") is not None
assert thumbnail.get_attribute("src") != medium_img.get_attribute("src")
def test_real_time_notifications(self, browser, test_user_credentials):
"""Test real-time notification system"""
web_app_url = os.environ.get('WEB_APP_URL')
# Open two browser windows (simulate two users)
browser2 = webdriver.Chrome()
try:
# Login as first user
browser.get(f"{web_app_url}/login")
browser.find_element(By.ID, "email").send_keys(test_user_credentials['email'])
browser.find_element(By.ID, "password").send_keys(test_user_credentials['password'])
browser.find_element(By.ID, "login-button").click()
# Create second test user and login
second_user_email = f"e2e-test-2+{int(time.time())}@example.com"
# Register second user via API (faster than UI)
api_endpoint = os.environ.get('API_ENDPOINT')
requests.post(f"{api_endpoint}/users/register", json={
'email': second_user_email,
'password': 'SecondUser*Pass123',
'first_name': 'Second',
'last_name': 'User'
})
# Login second user
browser2.get(f"{web_app_url}/login")
browser2.find_element(By.ID, "email").send_keys(second_user_email)
browser2.find_element(By.ID, "password").send_keys('SecondUser*Pass123')
browser2.find_element(By.ID, "login-button").click()
# First user creates a post
browser.get(f"{web_app_url}/create-post")
browser.find_element(By.ID, "post-title").send_keys("Real-time Test Post")
browser.find_element(By.ID, "post-content").send_keys("Testing notifications")
browser.find_element(By.ID, "publish-button").click()
# Second user should receive notification
browser2.get(f"{web_app_url}/dashboard")
# Wait for real-time notification
WebDriverWait(browser2, 30).until(
EC.presence_of_element_located((By.CLASS_NAME, "notification-new-post"))
)
# Verify notification content
notification = browser2.find_element(By.CLASS_NAME, "notification-new-post")
assert "Real-time Test Post" in notification.text
finally:
browser2.quit()
pythonDeployment and CI/CD
Successful serverless applications require robust deployment pipelines that handle testing, validation, and gradual rollouts. I've developed deployment strategies across multiple client projects that ensure reliable releases while maintaining fast iteration cycles.
Advanced Deployment Strategies
Production serverless deployments need more sophisticated approaches than simple deployments. Here's a complete CI/CD pipeline that handles multiple environments, testing stages, and deployment strategies:
# .github/workflows/serverless-deploy.yml
name: Serverless Application Deployment
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
env:
AWS_REGION: us-east-1
SAM_CLI_TELEMETRY: 0
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.9, 3.10, 3.11]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements-dev.txt
- name: Run linting
run: |
flake8 src/ tests/ --max-line-length=100
black --check src/ tests/
isort --check-only src/ tests/
- name: Run security scanning
run: |
bandit -r src/ -f json -o security-report.json
safety check
- name: Run unit tests
run: |
pytest tests/unit/ -v --cov=src --cov-report=xml --cov-report=html
- name: Upload coverage reports
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
- name: Archive test results
uses: actions/upload-artifact@v3
if: always()
with:
name: test-results-${{ matrix.python-version }}
path: |
htmlcov/
security-report.json
validate:
runs-on: ubuntu-latest
needs: test
steps:
- uses: actions/checkout@v3
- name: Setup AWS SAM CLI
uses: aws-actions/setup-sam@v2
- name: Validate SAM template
run: |
sam validate --template-file template.yaml
- name: Run SAM build
run: |
sam build --use-container
- name: Package application
run: |
sam package \
--s3-bucket ${{ secrets.SAM_DEPLOYMENT_BUCKET }} \
--s3-prefix packages \
--output-template-file packaged-template.yaml
- name: Upload packaged template
uses: actions/upload-artifact@v3
with:
name: packaged-template
path: packaged-template.yaml
deploy-dev:
runs-on: ubuntu-latest
needs: [test, validate]
if: github.ref == 'refs/heads/develop'
environment: development
steps:
- uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}
- name: Download packaged template
uses: actions/download-artifact@v3
with:
name: packaged-template
- name: Deploy to development
run: |
sam deploy \
--template-file packaged-template.yaml \
--stack-name serverless-blog-dev \
--capabilities CAPABILITY_IAM \
--parameter-overrides \
Environment=dev \
ApiDomainName=api-dev.yourblog.com \
CertificateArn=${{ secrets.DEV_CERTIFICATE_ARN }} \
--no-fail-on-empty-changeset \
--tags Environment=dev Project=ServerlessBlog
- name: Run integration tests
run: |
export API_ENDPOINT=$(aws cloudformation describe-stacks \
--stack-name serverless-blog-dev \
--query 'Stacks[0].Outputs[?OutputKey==`ApiUrl`].OutputValue' \
--output text)
pytest tests/integration/ -v --api-endpoint=$API_ENDPOINT
- name: Run smoke tests
run: |
./scripts/smoke-test.sh dev
deploy-staging:
runs-on: ubuntu-latest
needs: [deploy-dev]
if: github.ref == 'refs/heads/main'
environment: staging
steps:
- uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}
- name: Download packaged template
uses: actions/download-artifact@v3
with:
name: packaged-template
- name: Deploy to staging
run: |
sam deploy \
--template-file packaged-template.yaml \
--stack-name serverless-blog-staging \
--capabilities CAPABILITY_IAM \
--parameter-overrides \
Environment=staging \
ApiDomainName=api-staging.yourblog.com \
CertificateArn=${{ secrets.STAGING_CERTIFICATE_ARN }} \
--no-fail-on-empty-changeset \
--tags Environment=staging Project=ServerlessBlog
- name: Run comprehensive tests
run: |
export API_ENDPOINT=$(aws cloudformation describe-stacks \
--stack-name serverless-blog-staging \
--query 'Stacks[0].Outputs[?OutputKey==`ApiUrl`].OutputValue' \
--output text)
# Run all test suites
pytest tests/ -v --api-endpoint=$API_ENDPOINT --run-e2e
- name: Performance testing
run: |
./scripts/performance-test.sh staging
- name: Security scanning
run: |
./scripts/security-scan.sh staging
deploy-production:
runs-on: ubuntu-latest
needs: [deploy-staging]
if: github.ref == 'refs/heads/main'
environment: production
steps:
- uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.PROD_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.PROD_AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}
- name: Download packaged template
uses: actions/download-artifact@v3
with:
name: packaged-template
- name: Blue-Green Deployment Setup
run: |
# Create production deployment with traffic shifting
sam deploy \
--template-file packaged-template.yaml \
--stack-name serverless-blog-prod \
--capabilities CAPABILITY_IAM \
--parameter-overrides \
Environment=prod \
ApiDomainName=api.yourblog.com \
CertificateArn=${{ secrets.PROD_CERTIFICATE_ARN }} \
--no-fail-on-empty-changeset \
--tags Environment=prod Project=ServerlessBlog
- name: Gradual Traffic Shifting
run: |
# Implement gradual traffic shifting
./scripts/traffic-shift.sh prod 10 # Start with 10% traffic
sleep 300 # Wait 5 minutes
# Monitor metrics and errors
if ./scripts/check-health.sh prod; then
./scripts/traffic-shift.sh prod 50 # Increase to 50%
sleep 300
if ./scripts/check-health.sh prod; then
./scripts/traffic-shift.sh prod 100 # Full traffic
else
./scripts/rollback.sh prod
exit 1
fi
else
./scripts/rollback.sh prod
exit 1
fi
- name: Post-deployment validation
run: |
./scripts/validate-production.sh
- name: Update monitoring dashboards
run: |
aws cloudwatch put-dashboard \
--dashboard-name "ServerlessBlog-Production" \
--dashboard-body file://monitoring/production-dashboard.json
- name: Notify deployment success
run: |
./scripts/notify-deployment.sh prod success
yamlBlue-Green Deployment Scripts
Here are the supporting scripts for advanced deployment strategies:
#!/bin/bash
# scripts/traffic-shift.sh - Gradual traffic shifting for production deployments
set -e
ENVIRONMENT=$1
TRAFFIC_PERCENT=$2
if [[ -z "$ENVIRONMENT" || -z "$TRAFFIC_PERCENT" ]]; then
echo "Usage: $0 <environment> <traffic_percent>"
exit 1
fi
echo "Shifting ${TRAFFIC_PERCENT}% traffic to new deployment in ${ENVIRONMENT}"
# Get the Lambda function name
FUNCTION_NAME=$(aws cloudformation describe-stacks \
--stack-name "serverless-blog-${ENVIRONMENT}" \
--query 'Stacks[0].Outputs[?OutputKey==`MainAPIFunction`].OutputValue' \
--output text)
# Get current alias configuration
CURRENT_CONFIG=$(aws lambda get-alias \
--function-name "$FUNCTION_NAME" \
--name "LIVE" 2>/dev/null || echo "{}")
if [[ "$CURRENT_CONFIG" == "{}" ]]; then
echo "Creating LIVE alias for first deployment"
aws lambda create-alias \
--function-name "$FUNCTION_NAME" \
--name "LIVE" \
--function-version '$LATEST'
else
# Update alias with weighted routing
LATEST_VERSION=$(aws lambda publish-version \
--function-name "$FUNCTION_NAME" \
--query 'Version' \
--output text)
# Calculate weights
NEW_WEIGHT=$TRAFFIC_PERCENT
OLD_WEIGHT=$((100 - TRAFFIC_PERCENT))
echo "Routing ${NEW_WEIGHT}% to version ${LATEST_VERSION}, ${OLD_WEIGHT}% to previous"
CURRENT_VERSION=$(echo "$CURRENT_CONFIG" | jq -r '.FunctionVersion')
if [[ "$OLD_WEIGHT" -gt 0 ]]; then
aws lambda update-alias \
--function-name "$FUNCTION_NAME" \
--name "LIVE" \
--function-version "$LATEST_VERSION" \
--routing-config "AdditionalVersionWeights={\"$CURRENT_VERSION\":$(echo "scale=2; $OLD_WEIGHT/100" | bc)}"
else
aws lambda update-alias \
--function-name "$FUNCTION_NAME" \
--name "LIVE" \
--function-version "$LATEST_VERSION"
fi
fi
echo "Traffic shifting completed"
bash#!/bin/bash
# scripts/check-health.sh - Health check for deployment validation
set -e
ENVIRONMENT=$1
THRESHOLD_ERROR_RATE=5.0 # 5% error rate threshold
THRESHOLD_LATENCY=2000 # 2000ms latency threshold
if [[ -z "$ENVIRONMENT" ]]; then
echo "Usage: $0 <environment>"
exit 1
fi
echo "Checking health metrics for ${ENVIRONMENT} environment"
# Get function name
FUNCTION_NAME=$(aws cloudformation describe-stacks \
--stack-name "serverless-blog-${ENVIRONMENT}" \
--query 'Stacks[0].Outputs[?OutputKey==`MainAPIFunction`].OutputValue' \
--output text)
# Check error rate over last 5 minutes
END_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S")
START_TIME=$(date -u -d '5 minutes ago' +"%Y-%m-%dT%H:%M:%S")
ERROR_COUNT=$(aws cloudwatch get-metric-statistics \
--namespace AWS/Lambda \
--metric-name Errors \
--dimensions Name=FunctionName,Value="$FUNCTION_NAME" \
--start-time "$START_TIME" \
--end-time "$END_TIME" \
--period 300 \
--statistics Sum \
--query 'Datapoints[0].Sum' \
--output text)
INVOCATION_COUNT=$(aws cloudwatch get-metric-statistics \
--namespace AWS/Lambda \
--metric-name Invocations \
--dimensions Name=FunctionName,Value="$FUNCTION_NAME" \
--start-time "$START_TIME" \
--end-time "$END_TIME" \
--period 300 \
--statistics Sum \
--query 'Datapoints[0].Sum' \
--output text)
# Handle None values
ERROR_COUNT=${ERROR_COUNT:-0}
INVOCATION_COUNT=${INVOCATION_COUNT:-0}
if [[ "$ERROR_COUNT" == "None" ]]; then
ERROR_COUNT=0
fi
if [[ "$INVOCATION_COUNT" == "None" ]]; then
INVOCATION_COUNT=1 # Avoid division by zero
fi
# Calculate error rate
ERROR_RATE=0
if [[ "$INVOCATION_COUNT" -gt 0 ]]; then
ERROR_RATE=$(echo "scale=2; $ERROR_COUNT * 100 / $INVOCATION_COUNT" | bc -l)
fi
echo "Error rate: ${ERROR_RATE}% (${ERROR_COUNT}/${INVOCATION_COUNT})"
# Check latency
AVG_DURATION=$(aws cloudwatch get-metric-statistics \
--namespace AWS/Lambda \
--metric-name Duration \
--dimensions Name=FunctionName,Value="$FUNCTION_NAME" \
--start-time "$START_TIME" \
--end-time "$END_TIME" \
--period 300 \
--statistics Average \
--query 'Datapoints[0].Average' \
--output text)
AVG_DURATION=${AVG_DURATION:-0}
if [[ "$AVG_DURATION" == "None" ]]; then
AVG_DURATION=0
fi
echo "Average duration: ${AVG_DURATION}ms"
# Evaluate health
HEALTH_OK=true
if (( $(echo "$ERROR_RATE > $THRESHOLD_ERROR_RATE" | bc -l) )); then
echo "ERROR: Error rate ${ERROR_RATE}% exceeds threshold ${THRESHOLD_ERROR_RATE}%"
HEALTH_OK=false
fi
if (( $(echo "$AVG_DURATION > $THRESHOLD_LATENCY" | bc -l) )); then
echo "ERROR: Average duration ${AVG_DURATION}ms exceeds threshold ${THRESHOLD_LATENCY}ms"
HEALTH_OK=false
fi
if [[ "$HEALTH_OK" == "true" ]]; then
echo "Health check PASSED"
exit 0
else
echo "Health check FAILED"
exit 1
fi
bash#!/bin/bash
# scripts/rollback.sh - Rollback deployment
set -e
ENVIRONMENT=$1
if [[ -z "$ENVIRONMENT" ]]; then
echo "Usage: $0 <environment>"
exit 1
fi
echo "Initiating rollback for ${ENVIRONMENT} environment"
# Get function name
FUNCTION_NAME=$(aws cloudformation describe-stacks \
--stack-name "serverless-blog-${ENVIRONMENT}" \
--query 'Stacks[0].Outputs[?OutputKey==`MainAPIFunction`].OutputValue' \
--output text)
# Get previous stable version
ALIAS_CONFIG=$(aws lambda get-alias \
--function-name "$FUNCTION_NAME" \
--name "LIVE")
PREVIOUS_VERSION=$(echo "$ALIAS_CONFIG" | jq -r '.RoutingConfig.AdditionalVersionWeights | keys[0]')
if [[ "$PREVIOUS_VERSION" != "null" && -n "$PREVIOUS_VERSION" ]]; then
echo "Rolling back to version: $PREVIOUS_VERSION"
aws lambda update-alias \
--function-name "$FUNCTION_NAME" \
--name "LIVE" \
--function-version "$PREVIOUS_VERSION"
echo "Rollback completed successfully"
else
echo "No previous version found for rollback"
exit 1
fi
# Notify about rollback
./scripts/notify-deployment.sh "$ENVIRONMENT" rollback
bashDatabase Migration Handling
Serverless applications often need to handle database schema changes during deployment:
import boto3
import json
import logging
from typing import Dict, Any, List
from datetime import datetime
logger = logging.getLogger(__name__)
class DatabaseMigrationManager:
"""
Handle database migrations during serverless deployments
"""
def __init__(self, environment: str):
self.environment = environment
self.dynamodb = boto3.resource('dynamodb')
self.migrations_table = f'DatabaseMigrations-{environment}'
# Ensure migrations table exists
self._ensure_migrations_table()
def run_migrations(self) -> Dict[str, Any]:
"""
Run pending database migrations
"""
try:
# Get migration scripts
migration_scripts = self._get_migration_scripts()
# Get executed migrations
executed_migrations = self._get_executed_migrations()
# Find pending migrations
pending_migrations = [
migration for migration in migration_scripts
if migration['version'] not in executed_migrations
]
if not pending_migrations:
logger.info("No pending migrations")
return {'status': 'success', 'migrations_run': 0}
# Sort by version
pending_migrations.sort(key=lambda x: x['version'])
# Execute migrations
results = []
for migration in pending_migrations:
result = self._execute_migration(migration)
results.append(result)
if not result['success']:
logger.error(f"Migration failed: {migration['version']}")
return {
'status': 'failed',
'failed_migration': migration['version'],
'error': result['error']
}
return {
'status': 'success',
'migrations_run': len(results),
'results': results
}
except Exception as e:
logger.error(f"Migration error: {str(e)}")
return {'status': 'error', 'error': str(e)}
def _ensure_migrations_table(self):
"""Ensure the migrations tracking table exists"""
try:
table = self.dynamodb.Table(self.migrations_table)
table.wait_until_exists()
except:
# Create migrations table
self.dynamodb.create_table(
TableName=self.migrations_table,
KeySchema=[
{'AttributeName': 'migration_version', 'KeyType': 'HASH'}
],
AttributeDefinitions=[
{'AttributeName': 'migration_version', 'AttributeType': 'S'}
],
BillingMode='PAY_PER_REQUEST'
)
def _get_migration_scripts(self) -> List[Dict[str, Any]]:
"""Get available migration scripts"""
# In practice, these would be loaded from files or S3
return [
{
'version': '001_create_user_profiles_table',
'description': 'Add user profiles table',
'script': self._migration_001_create_user_profiles
},
{
'version': '002_add_user_preferences',
'description': 'Add user preferences column',
'script': self._migration_002_add_user_preferences
},
{
'version': '003_create_analytics_tables',
'description': 'Create analytics tables',
'script': self._migration_003_create_analytics_tables
}
]
def _get_executed_migrations(self) -> set:
"""Get list of executed migrations"""
try:
table = self.dynamodb.Table(self.migrations_table)
response = table.scan()
return {item['migration_version'] for item in response['Items']}
except Exception as e:
logger.warning(f"Could not get executed migrations: {e}")
return set()
def _execute_migration(self, migration: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a single migration"""
try:
logger.info(f"Running migration: {migration['version']}")
# Execute the migration script
migration_result = migration['script']()
if migration_result.get('success', False):
# Record successful migration
migrations_table = self.dynamodb.Table(self.migrations_table)
migrations_table.put_item(Item={
'migration_version': migration['version'],
'description': migration['description'],
'executed_at': datetime.utcnow().isoformat(),
'execution_time_ms': migration_result.get('execution_time', 0),
'environment': self.environment
})
logger.info(f"Migration completed: {migration['version']}")
return {
'success': True,
'version': migration['version'],
'execution_time': migration_result.get('execution_time', 0)
}
else:
return {
'success': False,
'version': migration['version'],
'error': migration_result.get('error', 'Unknown error')
}
except Exception as e:
logger.error(f"Migration execution failed: {e}")
return {
'success': False,
'version': migration['version'],
'error': str(e)
}
def _migration_001_create_user_profiles(self) -> Dict[str, Any]:
"""Create user profiles table"""
try:
start_time = datetime.utcnow()
table_name = f'UserProfiles-{self.environment}'
# Check if table already exists
try:
table = self.dynamodb.Table(table_name)
table.wait_until_exists(WaiterConfig={'Delay': 1, 'MaxAttempts': 1})
logger.info(f"Table {table_name} already exists")
return {'success': True}
except:
pass
# Create the table
self.dynamodb.create_table(
TableName=table_name,
KeySchema=[
{'AttributeName': 'user_id', 'KeyType': 'HASH'}
],
AttributeDefinitions=[
{'AttributeName': 'user_id', 'AttributeType': 'S'}
],
BillingMode='PAY_PER_REQUEST',
Tags=[
{'Key': 'Environment', 'Value': self.environment},
{'Key': 'ManagedBy', 'Value': 'DatabaseMigration'}
]
)
# Wait for table to be ready
table = self.dynamodb.Table(table_name)
table.wait_until_exists()
execution_time = (datetime.utcnow() - start_time).total_seconds() * 1000
return {
'success': True,
'execution_time': execution_time,
'table_created': table_name
}
except Exception as e:
return {'success': False, 'error': str(e)}
def _migration_002_add_user_preferences(self) -> Dict[str, Any]:
"""Add user preferences - DynamoDB is schemaless, so this is a no-op"""
try:
# In DynamoDB, we don't need to alter table structure
# Just document the new attribute usage
logger.info("User preferences attribute documented - no schema change needed")
return {'success': True, 'execution_time': 0}
except Exception as e:
return {'success': False, 'error': str(e)}
def _migration_003_create_analytics_tables(self) -> Dict[str, Any]:
"""Create analytics tables"""
try:
start_time = datetime.utcnow()
tables_to_create = [
{
'name': f'UserAnalytics-{self.environment}',
'key_schema': [
{'AttributeName': 'user_id', 'KeyType': 'HASH'},
{'AttributeName': 'time_window', 'KeyType': 'RANGE'}
],
'attributes': [
{'AttributeName': 'user_id', 'AttributeType': 'S'},
{'AttributeName': 'time_window', 'AttributeType': 'S'}
]
},
{
'name': f'AnomalyAlerts-{self.environment}',
'key_schema': [
{'AttributeName': 'alert_id', 'KeyType': 'HASH'}
],
'attributes': [
{'AttributeName': 'alert_id', 'AttributeType': 'S'},
{'AttributeName': 'user_id', 'AttributeType': 'S'},
{'AttributeName': 'created_at', 'AttributeType': 'S'}
],
'gsi': [
{
'IndexName': 'UserIdIndex',
'KeySchema': [
{'AttributeName': 'user_id', 'KeyType': 'HASH'},
{'AttributeName': 'created_at', 'KeyType': 'RANGE'}
],
'Projection': {'ProjectionType': 'ALL'}
}
]
}
]
created_tables = []
for table_config in tables_to_create:
try:
# Check if table exists
existing_table = self.dynamodb.Table(table_config['name'])
existing_table.wait_until_exists(WaiterConfig={'Delay': 1, 'MaxAttempts': 1})
continue
except:
pass
# Create table
create_params = {
'TableName': table_config['name'],
'KeySchema': table_config['key_schema'],
'AttributeDefinitions': table_config['attributes'],
'BillingMode': 'PAY_PER_REQUEST'
}
# Add GSI if specified
if 'gsi' in table_config:
create_params['GlobalSecondaryIndexes'] = table_config['gsi']
self.dynamodb.create_table(**create_params)
created_tables.append(table_config['name'])
# Wait for all tables to be ready
for table_name in created_tables:
table = self.dynamodb.Table(table_name)
table.wait_until_exists()
execution_time = (datetime.utcnow() - start_time).total_seconds() * 1000
return {
'success': True,
'execution_time': execution_time,
'tables_created': created_tables
}
except Exception as e:
return {'success': False, 'error': str(e)}
# Lambda function for running migrations during deployment
def lambda_handler(event, context):
"""
Lambda function to run database migrations
"""
try:
environment = event.get('environment', 'dev')
migration_manager = DatabaseMigrationManager(environment)
result = migration_manager.run_migrations()
logger.info(f"Migration result: {result}")
return {
'statusCode': 200 if result['status'] == 'success' else 500,
'body': json.dumps(result)
}
except Exception as e:
logger.error(f"Migration handler error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({
'status': 'error',
'error': str(e)
})
}
pythonConclusion: Building Production-Ready Serverless Applications
After implementing serverless architectures for dozens of clients across industries ranging from fintech to healthcare, the patterns and practices I've shared represent battle-tested approaches that work in production at scale.
The serverless journey isn't just about adopting new technology - it's about fundamentally changing how we think about building and operating applications. Success requires mastering event-driven design, embracing managed services, and implementing robust monitoring and deployment practices.
Key Takeaways for Serverless Success
Start with proper architecture patterns. The event-driven processing, API Gateway integration, and stream processing patterns form the foundation of most successful serverless applications. Master these patterns before tackling more complex architectures.
Invest in testing from day one. Serverless applications are distributed by nature, making comprehensive testing essential. Unit tests with proper mocking, integration tests against real services, and end-to-end tests that validate complete workflows all play crucial roles in maintaining quality.
Implement Infrastructure as Code religiously. Managing serverless resources manually becomes impossible as applications grow. AWS SAM templates, deployment scripts, and proper CI/CD pipelines aren't optional - they're requirements for production systems.
Monitor everything, but focus on business metrics. Technical metrics like function duration and error rates matter, but business metrics like user conversion rates and feature adoption provide the insights that drive product decisions.
Optimize for cost from the beginning. The pay-per-use model means optimization directly impacts your bill. Right-size functions, implement proper lifecycle policies, and monitor usage patterns to keep costs under control.
Security requires a layered approach. Authentication, authorization, input validation, and secure coding practices all contribute to a secure serverless application. No single security measure is sufficient.
Plan for failure. Distributed systems fail in complex ways. Implement retry logic, circuit breakers, dead letter queues, and graceful degradation to handle failures gracefully.
The Future of Serverless
Serverless technology continues evolving rapidly. Recent trends I'm seeing across client projects include increased adoption of container-based functions, better integration with AI/ML services, and more sophisticated observability tools.
The fundamental principles remain constant: design for events, embrace managed services, automate everything, and optimize continuously. Teams that master these principles build applications that scale effortlessly, cost less to operate, and require minimal maintenance.
The patterns and code examples in this guide provide a foundation for building production-ready serverless applications. The real learning happens when you implement these patterns in your own projects, adapt them to your specific requirements, and iterate based on real-world usage.
Serverless architecture isn't just a deployment model - it's a paradigm that enables teams to focus on business logic while AWS handles the infrastructure complexity. When implemented correctly, serverless applications deliver superior scalability, reliability, and cost efficiency compared to traditional architectures.
The journey from traditional to serverless thinking takes time, but the benefits - faster development cycles, lower operational overhead, and automatic scaling - make the investment worthwhile. Start with simple patterns, build expertise gradually, and don't try to solve every problem with serverless from the beginning.
Success in serverless comes from understanding when and how to apply these patterns, not from memorizing every AWS service feature. Focus on solving real business problems with clean, maintainable code, and let the serverless platform handle the rest.