Deploying Production LLMs with AWS Bedrock: A Complete Guide
Learn how to architect, deploy, and scale large language models in production using AWS Bedrock, covering cost optimization, security, and performance best practices.
Architecture Overview
Our production architecture leverages AWS Bedrock for scalable LLM deployments. This serverless approach ensures automatic scaling, cost optimization, and enterprise-grade security.
Infrastructure as Code
# terraform/bedrock.tf
resource "aws_bedrockagent_agent" "production_agent" {
agent_name = "production-llm-agent"
agent_resource_role_arn = aws_iam_role.bedrock_agent.arn
foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"
instruction = "You are an AI assistant for enterprise applications"
idle_session_ttl_in_seconds = 600
tags = {
Environment = "production"
ManagedBy = "terraform"
}
}
resource "aws_lambda_function" "bedrock_proxy" {
filename = "bedrock_proxy.zip"
function_name = "bedrock-llm-proxy"
role = aws_iam_role.lambda_exec.arn
handler = "index.handler"
runtime = "python3.11"
environment {
variables = {
BEDROCK_MODEL_ID = var.bedrock_model_id
MAX_TOKENS = "4096"
TEMPERATURE = "0.7"
}
}
timeout = 300
memory_size = 1024
}
Python Integration
import boto3
import json
from typing import Dict, List, Optional
class BedrockLLMClient:
def __init__(self, region_name: str = "us-east-1"):
self.bedrock = boto3.client(
service_name='bedrock-runtime',
region_name=region_name
)
self.model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
def generate_response(
self,
prompt: str,
max_tokens: int = 4096,
temperature: float = 0.7,
system_prompt: Optional[str] = None
) -> Dict:
"""Generate response from Claude via Bedrock."""
messages = [{"role": "user", "content": prompt}]
body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"temperature": temperature,
"messages": messages,
"system": system_prompt or "You are a helpful AI assistant."
})
try:
response = self.bedrock.invoke_model(
modelId=self.model_id,
body=body
)
response_body = json.loads(response.get('body').read())
return {
"success": True,
"content": response_body['content'][0]['text'],
"usage": response_body['usage'],
"model": self.model_id
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def stream_response(self, prompt: str):
"""Stream response from Bedrock."""
response = self.bedrock.invoke_model_with_response_stream(
modelId=self.model_id,
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4096,
"messages": [{"role": "user", "content": prompt}],
"stream": True
})
)
for event in response.get('body'):
chunk = json.loads(event['chunk']['bytes'])
if chunk['type'] == 'content_block_delta':
yield chunk['delta']['text']
Cost Optimization and Rate Limiting
AWS Bedrock pricing is per-token, making cost optimization critical for production deployments.
- •Input tokens: $3 per 1M tokens
- •Output tokens: $15 per 1M tokens
- •No infrastructure costs (serverless)
Cost Optimization Strategies:
- •Reduce unnecessary context
- •Use concise system prompts
- •Cache common prefixes
2. Output Length Control:
max_tokens is critical - output tokens cost 5x input
BAD: Unbounded output → response = bedrock.invoke(prompt, max_tokens=4096)
GOOD: Limit based on use case → response = bedrock.invoke(prompt, max_tokens=500) for summaries
- •Cache identical prompts (Redis/DynamoDB)
- •Use semantic similarity for near-duplicates
- •90%+ cache hit rate possible
- •Protect against abuse
- •Prevent bill shock
- •Implement quotas per user/tenant
- •10K requests/day, 500 tokens avg → $450/month
- •100K requests/day, 1K tokens avg → $9,000/month
- •1M requests/day, 2K tokens avg → $180,000/month
# Cost optimization implementation
import hashlib
import redis
from functools import wraps
from datetime import datetime, timedelta
class BedrockCostOptimizer:
"""Optimize Bedrock costs with caching and rate limiting."""
def __init__(self):
self.redis_client = redis.Redis(host='localhost', decode_responses=True)
self.cost_per_1k_input = 0.003
self.cost_per_1k_output = 0.015
def cache_response(self, ttl: int = 3600):
"""Cache responses to reduce costs."""
def decorator(func):
@wraps(func)
def wrapper(prompt: str, *args, **kwargs):
# Generate cache key
cache_key = f"bedrock:{hashlib.sha256(prompt.encode()).hexdigest()}"
# Check cache
cached = self.redis_client.get(cache_key)
if cached:
print(f"Cache hit! Saved ~\$0.01")
return json.loads(cached)
# Call Bedrock
response = func(prompt, *args, **kwargs)
# Cache result
self.redis_client.setex(
cache_key,
ttl,
json.dumps(response)
)
return response
return wrapper
return decorator
def rate_limit(self, max_requests: int, window_seconds: int):
"""Rate limit to prevent cost overruns."""
def decorator(func):
@wraps(func)
def wrapper(user_id: str, *args, **kwargs):
key = f"rate_limit:{user_id}:{datetime.now().strftime('%Y%m%d%H%M')}"
# Increment counter
count = self.redis_client.incr(key)
self.redis_client.expire(key, window_seconds)
if count > max_requests:
raise Exception(
f"Rate limit exceeded: {max_requests} requests per {window_seconds}s"
)
return func(*args, **kwargs)
return wrapper
return decorator
def calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
"""Calculate cost for a request."""
input_cost = (input_tokens / 1000) * self.cost_per_1k_input
output_cost = (output_tokens / 1000) * self.cost_per_1k_output
return input_cost + output_cost
def track_usage(self, user_id: str, input_tokens: int, output_tokens: int):
"""Track usage and costs per user."""
cost = self.calculate_cost(input_tokens, output_tokens)
# Track daily usage
date_key = datetime.now().strftime('%Y-%m-%d')
usage_key = f"usage:{user_id}:{date_key}"
self.redis_client.hincrby(usage_key, "requests", 1)
self.redis_client.hincrby(usage_key, "input_tokens", input_tokens)
self.redis_client.hincrby(usage_key, "output_tokens", output_tokens)
self.redis_client.hincrbyfloat(usage_key, "cost", cost)
self.redis_client.expire(usage_key, 86400 * 90) # 90 days
return cost
# Usage example
optimizer = BedrockCostOptimizer()
@optimizer.cache_response(ttl=3600)
@optimizer.rate_limit(max_requests=100, window_seconds=60)
def generate_with_optimization(user_id: str, prompt: str):
"""Generate response with caching and rate limiting."""
client = BedrockLLMClient()
response = client.generate_response(
prompt=prompt,
max_tokens=500 # Limit output to reduce cost
)
if response['success']:
# Track usage
cost = optimizer.track_usage(
user_id=user_id,
input_tokens=response['usage']['input_tokens'],
output_tokens=response['usage']['output_tokens']
)
print(f"Request cost: ${round(cost, 4)}")
return response
# Cost monitoring dashboard query
def get_user_costs(user_id: str, days: int = 30):
"""Get user costs over time."""
costs = []
for i in range(days):
date = (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
usage_key = f"usage:{user_id}:{date}"
data = redis_client.hgetall(usage_key)
if data:
costs.append({
'date': date,
'requests': int(data.get('requests', 0)),
'cost': float(data.get('cost', 0))
})
return costs
# Alert on high spend
def check_budget_alert(user_id: str, daily_budget: float = 100.0):
"""Alert if daily budget exceeded."""
today = datetime.now().strftime('%Y-%m-%d')
usage_key = f"usage:{user_id}:{today}"
cost_today = float(redis_client.hget(usage_key, "cost") or 0)
if cost_today > daily_budget:
# Send alert (Slack, email, etc.)
send_alert(
f"⚠️ Budget Alert: User {user_id} spent ${round(cost_today, 2)} today "
f"(budget: ${round(daily_budget, 2)})"
)
Monitoring and Observability
Production LLM deployments require comprehensive monitoring to ensure reliability and catch issues early.
Key Metrics to Track:
- •Latency (p50, p95, p99)
- •Tokens per second
- •Time to first token (TTFT)
- •Request success rate
- •Cost per request
- •Daily/monthly spend
- •Cost by user/tenant
- •Token usage trends
- •Response quality scores
- •User feedback ratings
- •Error rates by type
- •Prompt injection attempts
- •Lambda cold starts
- •API Gateway errors
- •Bedrock throttling
- •Downstream service health
- •Metrics: CloudWatch + Datadog
- •Logging: CloudWatch Logs + ELK
- •Tracing: AWS X-Ray
- •Alerting: PagerDuty + Slack
- •P99 latency > 10s → Warning
- •Success rate < 99% → Critical
- •Daily cost > $1,000 → Warning
- •Error rate > 1% → Warning
# Comprehensive monitoring implementation
import time
import logging
from datadog import statsd
from aws_xray_sdk.core import xray_recorder
from typing import Dict, Any
class BedrockMonitoring:
"""Monitoring and observability for Bedrock deployments."""
def __init__(self, service_name: str = "bedrock-llm"):
self.service_name = service_name
self.logger = logging.getLogger(service_name)
def track_request(self, func):
"""Decorator to track all request metrics."""
def wrapper(*args, **kwargs):
start_time = time.time()
# Start X-Ray trace
with xray_recorder.capture(f'{self.service_name}.{func.__name__}'):
try:
result = func(*args, **kwargs)
# Track success
duration = time.time() - start_time
self._log_success(func.__name__, duration, result)
return result
except Exception as e:
# Track failure
duration = time.time() - start_time
self._log_error(func.__name__, duration, str(e))
raise
return wrapper
def _log_success(self, operation: str, duration: float, result: Dict):
"""Log successful request."""
# Send metrics to Datadog
statsd.increment(f'{self.service_name}.requests', tags=[
f'operation:{operation}',
'status:success'
])
statsd.histogram(f'{self.service_name}.latency', duration, tags=[
f'operation:{operation}'
])
if 'usage' in result:
statsd.histogram(
f'{self.service_name}.tokens.input',
result['usage']['input_tokens']
)
statsd.histogram(
f'{self.service_name}.tokens.output',
result['usage']['output_tokens']
)
# Structured logging
self.logger.info({
'event': 'bedrock_request_success',
'operation': operation,
'duration_seconds': duration,
'input_tokens': result.get('usage', {}).get('input_tokens'),
'output_tokens': result.get('usage', {}).get('output_tokens'),
})
def _log_error(self, operation: str, duration: float, error: str):
"""Log failed request."""
statsd.increment(f'{self.service_name}.requests', tags=[
f'operation:{operation}',
'status:error'
])
self.logger.error({
'event': 'bedrock_request_error',
'operation': operation,
'duration_seconds': duration,
'error': error
})
def check_health(self) -> Dict[str, Any]:
"""Health check endpoint."""
try:
# Test Bedrock connectivity
client = BedrockLLMClient()
test_response = client.generate_response(
prompt="Say 'OK' if you're working",
max_tokens=10
)
if test_response['success']:
return {
'status': 'healthy',
'service': self.service_name,
'timestamp': datetime.now().isoformat(),
'checks': {
'bedrock': 'ok',
'latency_ms': 250 # Example
}
}
else:
return {
'status': 'unhealthy',
'service': self.service_name,
'error': test_response.get('error')
}
except Exception as e:
return {
'status': 'unhealthy',
'service': self.service_name,
'error': str(e)
}
# Usage with FastAPI endpoint
from fastapi import FastAPI
app = FastAPI()
monitoring = BedrockMonitoring()
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return monitoring.check_health()
@app.post("/generate")
@monitoring.track_request
async def generate_text(prompt: str, user_id: str):
"""Generate text with full monitoring."""
# Add custom trace metadata
xray_recorder.put_annotation('user_id', user_id)
xray_recorder.put_metadata('prompt_length', len(prompt))
client = BedrockLLMClient()
response = client.generate_response(prompt)
return response
# CloudWatch Custom Metrics
import boto3
cloudwatch = boto3.client('cloudwatch')
def publish_custom_metrics(
cost: float,
tokens: int,
latency: float
):
"""Publish custom metrics to CloudWatch."""
cloudwatch.put_metric_data(
Namespace='BedrockLLM/Production',
MetricData=[
{
'MetricName': 'RequestCost',
'Value': cost,
'Unit': 'None',
'Timestamp': datetime.now()
},
{
'MetricName': 'TokensProcessed',
'Value': tokens,
'Unit': 'Count',
'Timestamp': datetime.now()
},
{
'MetricName': 'Latency',
'Value': latency,
'Unit': 'Milliseconds',
'Timestamp': datetime.now(),
'StatisticValues': {
'SampleCount': 1,
'Sum': latency,
'Minimum': latency,
'Maximum': latency
}
}
]
)
# Alert on anomalies
def check_anomalies():
"""Check for anomalous patterns."""
# Get recent metrics
response = cloudwatch.get_metric_statistics(
Namespace='BedrockLLM/Production',
MetricName='Latency',
StartTime=datetime.now() - timedelta(hours=1),
EndTime=datetime.now(),
Period=300, # 5 minutes
Statistics=['Average', 'Maximum']
)
for datapoint in response['Datapoints']:
if datapoint['Maximum'] > 10000: # 10s threshold
send_alert(
f"⚠️ High latency detected: {datapoint['Maximum']}ms "
f"at {datapoint['Timestamp']}"
)
Conclusion: Production-Ready LLM Deployment
AWS Bedrock has democratized enterprise LLM deployment, removing the infrastructure complexity while maintaining production-grade reliability.
Key Takeaways:
1. Serverless Simplicity: No infrastructure management, automatic scaling, pay-per-use pricing
2. Enterprise Security: Built-in encryption, VPC isolation, IAM integration, audit logging
3. Cost Optimization: Caching reduces costs by 90%+, rate limiting prevents bill shock, usage tracking enables chargeback
4. Monitoring is Critical: Track latency, costs, quality, and system health from day one
- •- Cost monitoring and alerts
- •- Rate limiting per user/tenant
- •- Response caching strategy
- •- Error handling and retries
- •- Security scanning (prompt injection)
- •- Comprehensive logging
- •Need Claude/other foundation models
- •Want serverless deployment
- •Require enterprise compliance
- •Rapid time-to-market
- •Very high volume (>10M requests/day)
- •Need custom models
- •Latency <50ms requirement
- •Cost optimization at scale
AWS Bedrock excels for most enterprise use cases-fast deployment, reliable operation, and predictable costs. For the 5% of applications with extreme requirements, consider self-hosted alternatives.
Next Steps: Start with Bedrock's free tier, build your MVP, then optimize based on real usage patterns.
Related Articles
vLLM and Parallelized Inference: Scaling LLM Serving to Production
Deep dive into vLLM architecture, continuous batching, PagedAttention, tensor parallelism, and advanced techniques for serving large language models at scale with optimal throughput and latency.
AI & Machine LearningFine-Tuning LLMs for Enterprise Applications: A Practical Guide
End-to-end guide to fine-tuning large language models for domain-specific tasks, including data preparation, evaluation metrics, and deployment strategies.