Your weekly dose of actionable cloud wisdom to start the week right
The Problem
Your AWS architecture has grown organically over years, nobody’s sure if it follows best practices, and manual Well-Architected reviews happen once a year (if at all). You’re missing cost optimization opportunities, security gaps, and performance improvements because there’s no systematic way to assess your architecture against AWS standards. Meanwhile, your team spends weeks preparing for architecture reviews instead of building features.
The Solution
Automate AWS Well-Architected Framework assessments using AWS Config rules, custom Lambda functions, and infrastructure analysis tools. Create continuous architecture monitoring that identifies improvements, tracks compliance, and provides actionable recommendations without manual intervention.
Essential Well-Architected Automation Techniques:
1. Automated Well-Architected Assessment Framework
# well_architected_automation.py
import boto3
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
class WellArchitectedAutomation:
def __init__(self, region='eu-west-1'):
self.wellarchitected = boto3.client('wellarchitected', region_name=region)
self.config = boto3.client('config', region_name=region)
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
self.ec2 = boto3.client('ec2', region_name=region)
self.rds = boto3.client('rds', region_name=region)
self.s3 = boto3.client('s3')
def create_workload_assessment(self, workload_name: str, description: str,
environment: str = 'PRODUCTION') -> str:
"""
Create a new Well-Architected workload assessment
"""
try:
response = self.wellarchitected.create_workload(
WorkloadName=workload_name,
Description=description,
Environment=environment,
ReviewOwner='architecture-team@company.com',
ArchitecturalDesign='https://wiki.company.com/architecture/' + workload_name.lower(),
PillarPriorities=[
'costOptimization',
'security',
'reliability',
'performance',
'operationalExcellence',
'sustainability'
],
Tags={
'Environment': environment,
'ReviewType': 'Automated',
'CreatedBy': 'WellArchitectedAutomation',
'CreatedDate': datetime.now().strftime('%Y-%m-%d')
}
)
workload_id = response['WorkloadId']
print(f"Created workload assessment: {workload_id}")
# Start lens review
self.wellarchitected.create_lens_review(
WorkloadId=workload_id,
LensAlias='wellarchitected'
)
return workload_id
except Exception as e:
print(f"Error creating workload: {str(e)}")
return None
def analyze_cost_optimization(self) -> Dict[str, Any]:
"""
Automated cost optimization analysis
"""
recommendations = []
potential_savings = 0
# 1. Analyze unused EBS volumes
unused_volumes = self.find_unused_ebs_volumes()
if unused_volumes:
volume_cost = len(unused_volumes) * 0.08 * 30 # £0.08/GB/month estimate
recommendations.append({
'pillar': 'Cost Optimization',
'finding': f"Found {len(unused_volumes)} unused EBS volumes",
'impact': 'MEDIUM',
'effort': 'LOW',
'savings_per_month': volume_cost,
'recommendation': 'Delete unused EBS volumes or create snapshots if needed',
'resources': unused_volumes
})
potential_savings += volume_cost
# 2. Analyze unattached Elastic IPs
unused_eips = self.find_unused_elastic_ips()
if unused_eips:
eip_cost = len(unused_eips) * 3.65 # £3.65/month per unused EIP
recommendations.append({
'pillar': 'Cost Optimization',
'finding': f"Found {len(unused_eips)} unused Elastic IPs",
'impact': 'MEDIUM',
'effort': 'LOW',
'savings_per_month': eip_cost,
'recommendation': 'Release unused Elastic IP addresses',
'resources': unused_eips
})
potential_savings += eip_cost
# 3. Analyze oversized instances
oversized_instances = self.find_oversized_instances()
if oversized_instances:
instance_savings = sum([inst['potential_savings'] for inst in oversized_instances])
recommendations.append({
'pillar': 'Cost Optimization',
'finding': f"Found {len(oversized_instances)} potentially oversized instances",
'impact': 'HIGH',
'effort': 'MEDIUM',
'savings_per_month': instance_savings,
'recommendation': 'Right-size EC2 instances based on utilization metrics',
'resources': oversized_instances
})
potential_savings += instance_savings
# 4. Analyze old snapshots
old_snapshots = self.find_old_snapshots()
if old_snapshots:
snapshot_cost = sum([snap['size_gb'] for snap in old_snapshots]) * 0.05 # £0.05/GB/month
recommendations.append({
'pillar': 'Cost Optimization',
'finding': f"Found {len(old_snapshots)} snapshots older than 90 days",
'impact': 'MEDIUM',
'effort': 'LOW',
'savings_per_month': snapshot_cost,
'recommendation': 'Implement snapshot lifecycle policies',
'resources': old_snapshots
})
potential_savings += snapshot_cost
return {
'pillar': 'Cost Optimization',
'total_potential_savings': potential_savings,
'annual_savings': potential_savings * 12,
'recommendations': recommendations,
'assessment_date': datetime.now().isoformat()
}
def analyze_security_posture(self) -> Dict[str, Any]:
"""
Automated security analysis based on Well-Architected principles
"""
security_findings = []
# 1. Check for public S3 buckets
public_buckets = self.find_public_s3_buckets()
if public_buckets:
security_findings.append({
'pillar': 'Security',
'finding': f"Found {len(public_buckets)} publicly accessible S3 buckets",
'severity': 'HIGH',
'risk': 'Data exposure, potential data breach',
'recommendation': 'Review bucket policies and remove public access unless required',
'resources': public_buckets,
'remediation_steps': [
'Review bucket contents for sensitive data',
'Implement bucket policies with least privilege',
'Enable S3 Block Public Access',
'Set up CloudTrail logging for bucket access'
]
})
# 2. Check for security groups with overly permissive rules
open_security_groups = self.find_overly_permissive_security_groups()
if open_security_groups:
security_findings.append({
'pillar': 'Security',
'finding': f"Found {len(open_security_groups)} security groups with 0.0.0.0/0 access",
'severity': 'HIGH',
'risk': 'Unauthorized access, potential intrusion',
'recommendation': 'Implement least privilege access controls',
'resources': open_security_groups,
'remediation_steps': [
'Review and restrict CIDR ranges',
'Use specific IP ranges or security group references',
'Implement network ACLs for additional protection',
'Enable VPC Flow Logs for monitoring'
]
})
# 3. Check for unencrypted EBS volumes
unencrypted_volumes = self.find_unencrypted_ebs_volumes()
if unencrypted_volumes:
security_findings.append({
'pillar': 'Security',
'finding': f"Found {len(unencrypted_volumes)} unencrypted EBS volumes",
'severity': 'MEDIUM',
'risk': 'Data exposure if storage media is compromised',
'recommendation': 'Enable encryption for all EBS volumes',
'resources': unencrypted_volumes,
'remediation_steps': [
'Create encrypted snapshots of existing volumes',
'Replace volumes with encrypted versions',
'Enable encryption by default for new volumes',
'Use AWS KMS for key management'
]
})
# 4. Check for RDS instances without backup
unprotected_databases = self.find_rds_without_backup()
if unprotected_databases:
security_findings.append({
'pillar': 'Security',
'finding': f"Found {len(unprotected_databases)} RDS instances with inadequate backup",
'severity': 'MEDIUM',
'risk': 'Data loss, inability to recover from incidents',
'recommendation': 'Configure automated backups and point-in-time recovery',
'resources': unprotected_databases,
'remediation_steps': [
'Enable automated backups with 7+ day retention',
'Enable point-in-time recovery',
'Configure backup encryption',
'Test backup restoration procedures'
]
})
return {
'pillar': 'Security',
'total_findings': len(security_findings),
'high_severity': len([f for f in security_findings if f['severity'] == 'HIGH']),
'medium_severity': len([f for f in security_findings if f['severity'] == 'MEDIUM']),
'findings': security_findings,
'assessment_date': datetime.now().isoformat()
}
def analyze_reliability(self) -> Dict[str, Any]:
"""
Automated reliability analysis
"""
reliability_findings = []
# 1. Check for single AZ deployments
single_az_resources = self.find_single_az_resources()
if single_az_resources:
reliability_findings.append({
'pillar': 'Reliability',
'finding': f"Found {len(single_az_resources)} resources in single AZ",
'impact': 'HIGH',
'recommendation': 'Deploy across multiple Availability Zones',
'resources': single_az_resources,
'improvement_steps': [
'Use Auto Scaling Groups across multiple AZs',
'Deploy RDS with Multi-AZ configuration',
'Use Application Load Balancers with multiple AZ targets',
'Implement cross-AZ data replication'
]
})
# 2. Check for missing health checks
resources_without_health_checks = self.find_resources_without_health_checks()
if resources_without_health_checks:
reliability_findings.append({
'pillar': 'Reliability',
'finding': f"Found {len(resources_without_health_checks)} resources without health checks",
'impact': 'MEDIUM',
'recommendation': 'Implement comprehensive health monitoring',
'resources': resources_without_health_checks,
'improvement_steps': [
'Configure ELB health checks',
'Set up CloudWatch alarms',
'Implement application-level health endpoints',
'Configure automatic failure detection and recovery'
]
})
return {
'pillar': 'Reliability',
'total_findings': len(reliability_findings),
'critical_issues': len([f for f in reliability_findings if f['impact'] == 'HIGH']),
'findings': reliability_findings,
'assessment_date': datetime.now().isoformat()
}
def generate_comprehensive_report(self, workload_name: str) -> Dict[str, Any]:
"""
Generate a comprehensive Well-Architected assessment report
"""
print(f"Generating Well-Architected assessment for: {workload_name}")
# Run all assessments
cost_analysis = self.analyze_cost_optimization()
security_analysis = self.analyze_security_posture()
reliability_analysis = self.analyze_reliability()
# Calculate overall score
total_findings = (len(cost_analysis['recommendations']) +
len(security_analysis['findings']) +
len(reliability_analysis['findings']))
high_priority_issues = (
len([r for r in cost_analysis['recommendations'] if r['impact'] == 'HIGH']) +
len([f for f in security_analysis['findings'] if f['severity'] == 'HIGH']) +
len([f for f in reliability_analysis['findings'] if f['impact'] == 'HIGH'])
)
# Overall health score (simplified)
if high_priority_issues == 0 and total_findings <= 5:
health_score = "EXCELLENT"
elif high_priority_issues <= 2 and total_findings <= 10:
health_score = "GOOD"
elif high_priority_issues <= 5 and total_findings <= 20:
health_score = "FAIR"
else:
health_score = "NEEDS_IMPROVEMENT"
report = {
'workload_name': workload_name,
'assessment_date': datetime.now().isoformat(),
'overall_health_score': health_score,
'summary': {
'total_findings': total_findings,
'high_priority_issues': high_priority_issues,
'potential_annual_savings': cost_analysis['annual_savings']
},
'pillars': {
'cost_optimization': cost_analysis,
'security': security_analysis,
'reliability': reliability_analysis
},
'next_steps': self.generate_action_plan(cost_analysis, security_analysis, reliability_analysis)
}
return report
def generate_action_plan(self, cost_analysis, security_analysis, reliability_analysis) -> List[Dict]:
"""
Generate prioritized action plan based on findings
"""
all_actions = []
# High-priority security issues first
for finding in security_analysis['findings']:
if finding['severity'] == 'HIGH':
all_actions.append({
'priority': 1,
'pillar': 'Security',
'action': finding['recommendation'],
'effort': 'HIGH',
'impact': 'HIGH',
'timeline': 'Immediate (1-2 weeks)'
})
# High-impact cost optimizations
for rec in cost_analysis['recommendations']:
if rec['impact'] == 'HIGH' and rec['effort'] == 'LOW':
all_actions.append({
'priority': 2,
'pillar': 'Cost Optimization',
'action': rec['recommendation'],
'effort': rec['effort'],
'impact': rec['impact'],
'savings': f"£{rec['savings_per_month']:.2f}/month",
'timeline': 'Short term (2-4 weeks)'
})
# Critical reliability issues
for finding in reliability_analysis['findings']:
if finding['impact'] == 'HIGH':
all_actions.append({
'priority': 3,
'pillar': 'Reliability',
'action': finding['recommendation'],
'effort': 'MEDIUM',
'impact': finding['impact'],
'timeline': 'Medium term (1-2 months)'
})
return sorted(all_actions, key=lambda x: x['priority'])
# Helper methods for resource analysis
def find_unused_ebs_volumes(self) -> List[str]:
"""Find unattached EBS volumes"""
try:
response = self.ec2.describe_volumes(
Filters=[{'Name': 'status', 'Values': ['available']}]
)
return [vol['VolumeId'] for vol in response['Volumes']]
except Exception:
return []
def find_unused_elastic_ips(self) -> List[str]:
"""Find unallocated Elastic IPs"""
try:
response = self.ec2.describe_addresses()
return [addr['AllocationId'] for addr in response['Addresses']
if 'InstanceId' not in addr]
except Exception:
return []
def find_public_s3_buckets(self) -> List[str]:
"""Find S3 buckets with public access"""
public_buckets = []
try:
buckets = self.s3.list_buckets()['Buckets']
for bucket in buckets:
try:
# Check bucket ACL and policy for public access
acl = self.s3.get_bucket_acl(Bucket=bucket['Name'])
for grant in acl['Grants']:
if 'URI' in grant.get('Grantee', {}):
if 'AllUsers' in grant['Grantee']['URI']:
public_buckets.append(bucket['Name'])
break
except Exception:
continue
except Exception:
pass
return public_buckets
def find_overly_permissive_security_groups(self) -> List[Dict]:
"""Find security groups with 0.0.0.0/0 access"""
permissive_sgs = []
try:
response = self.ec2.describe_security_groups()
for sg in response['SecurityGroups']:
for rule in sg['IpPermissions']:
for ip_range in rule.get('IpRanges', []):
if ip_range.get('CidrIp') == '0.0.0.0/0':
permissive_sgs.append({
'group_id': sg['GroupId'],
'group_name': sg['GroupName'],
'port': rule.get('FromPort'),
'protocol': rule.get('IpProtocol')
})
break
except Exception:
pass
return permissive_sgs
def find_unencrypted_ebs_volumes(self) -> List[str]:
"""Find unencrypted EBS volumes"""
try:
response = self.ec2.describe_volumes(
Filters=[{'Name': 'encrypted', 'Values': ['false']}]
)
return [vol['VolumeId'] for vol in response['Volumes']]
except Exception:
return []
def find_oversized_instances(self) -> List[Dict]:
"""Find potentially oversized EC2 instances based on CloudWatch metrics"""
oversized = []
try:
# Get running instances
instances = self.ec2.describe_instances(
Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]
)
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
instance_id = instance['InstanceId']
instance_type = instance['InstanceType']
# Get CPU utilization for last 7 days
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7)
cpu_metrics = self.cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}],
StartTime=start_time,
EndTime=end_time,
Period=3600,
Statistics=['Average']
)
if cpu_metrics['Datapoints']:
avg_cpu = sum([dp['Average'] for dp in cpu_metrics['Datapoints']]) / len(cpu_metrics['Datapoints'])
if avg_cpu < 20: # Less than 20% average CPU utilization
oversized.append({
'instance_id': instance_id,
'instance_type': instance_type,
'avg_cpu_utilization': round(avg_cpu, 2),
'potential_savings': 50 # Simplified calculation
})
except Exception:
pass
return oversized
def find_old_snapshots(self) -> List[Dict]:
"""Find snapshots older than 90 days"""
old_snapshots = []
try:
cutoff_date = datetime.utcnow() - timedelta(days=90)
response = self.ec2.describe_snapshots(OwnerIds=['self'])
for snapshot in response['Snapshots']:
if snapshot['StartTime'].replace(tzinfo=None) < cutoff_date:
old_snapshots.append({
'snapshot_id': snapshot['SnapshotId'],
'start_time': snapshot['StartTime'].isoformat(),
'size_gb': snapshot['VolumeSize']
})
except Exception:
pass
return old_snapshots
def find_rds_without_backup(self) -> List[Dict]:
"""Find RDS instances with inadequate backup configuration"""
unprotected_db = []
try:
response = self.rds.describe_db_instances()
for db in response['DBInstances']:
if (db['BackupRetentionPeriod'] < 7 or
not db.get('DeletionProtection', False)):
unprotected_db.append({
'db_instance_id': db['DBInstanceIdentifier'],
'backup_retention': db['BackupRetentionPeriod'],
'deletion_protection': db.get('DeletionProtection', False)
})
except Exception:
pass
return unprotected_db
def find_single_az_resources(self) -> List[Dict]:
"""Find resources deployed in single AZ"""
single_az = []
# Implementation would check RDS, ELB, Auto Scaling Groups, etc.
# Simplified for example
return single_az
def find_resources_without_health_checks(self) -> List[Dict]:
"""Find resources without proper health checks"""
no_health_checks = []
# Implementation would check ELB targets, Auto Scaling Groups, etc.
# Simplified for example
return no_health_checks
# Usage example
if __name__ == "__main__":
wa_automation = WellArchitectedAutomation()
# Generate comprehensive report
report = wa_automation.generate_comprehensive_report("production-web-app")
# Print summary
print(f"=== Well-Architected Assessment Report ===")
print(f"Workload: {report['workload_name']}")
print(f"Overall Health Score: {report['overall_health_score']}")
print(f"Total Findings: {report['summary']['total_findings']}")
print(f"High Priority Issues: {report['summary']['high_priority_issues']}")
print(f"Potential Annual Savings: £{report['summary']['potential_annual_savings']:.2f}")
print()
print("=== Top Priority Actions ===")
for i, action in enumerate(report['next_steps'][:5], 1):
print(f"{i}. {action['action']} ({action['pillar']})")
print(f" Priority: {action['priority']}, Impact: {action['impact']}, Timeline: {action['timeline']}")
if 'savings' in action:
print(f" Savings: {action['savings']}")
print()
2. AWS Config Rules for Continuous Compliance
{
"ConfigRuleName": "wa-security-compliance-pack",
"Description": "Well-Architected Security pillar compliance rules",
"Source": {
"Owner": "AWS",
"SourceIdentifier": "SECURITY_GROUPS_NOT_ALLOWING_UNRESTRICTED_TRAFFIC"
},
"Scope": {
"ComplianceResourceTypes": [
"AWS::EC2::SecurityGroup"
]
},
"InputParameters": {
"blockedPort1": "22",
"blockedPort2": "3389",
"blockedPort3": "80",
"blockedPort4": "443"
}
}
#!/bin/bash
# Deploy Well-Architected Config Rules
echo "=== Deploying Well-Architected Config Rules ==="
# Cost Optimization Rules
aws configservice put-config-rule \
--config-rule '{
"ConfigRuleName": "wa-unused-ebs-volumes",
"Source": {
"Owner": "AWS",
"SourceIdentifier": "EBS_OPTIMIZED_INSTANCE"
}
}'
aws configservice put-config-rule \
--config-rule '{
"ConfigRuleName": "wa-unattached-elastic-ips",
"Source": {
"Owner": "AWS",
"SourceIdentifier": "EIP_ATTACHED"
}
}'
# Security Rules
aws configservice put-config-rule \
--config-rule '{
"ConfigRuleName": "wa-s3-bucket-public-access-prohibited",
"Source": {
"Owner": "AWS",
"SourceIdentifier": "S3_BUCKET_PUBLIC_ACCESS_PROHIBITED"
}
}'
aws configservice put-config-rule \
--config-rule '{
"ConfigRuleName": "wa-ebs-snapshot-public-restorable-check",
"Source": {
"Owner": "AWS",
"SourceIdentifier": "EBS_SNAPSHOT_PUBLIC_RESTORABLE_CHECK"
}
}'
# Reliability Rules
aws configservice put-config-rule \
--config-rule '{
"ConfigRuleName": "wa-rds-multi-az-support",
"Source": {
"Owner": "AWS",
"SourceIdentifier": "RDS_MULTI_AZ_SUPPORT"
}
}'
aws configservice put-config-rule \
--config-rule '{
"ConfigRuleName": "wa-elb-cross-zone-load-balancing-enabled",
"Source": {
"Owner": "AWS",
"SourceIdentifier": "ELB_CROSS_ZONE_LOAD_BALANCING_ENABLED"
}
}'
echo "✅ Well-Architected Config Rules deployed successfully"
3. CloudFormation Template for Automated Review Infrastructure
# well-architected-automation.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Well-Architected Review Automation Infrastructure'
Parameters:
NotificationEmail:
Type: String
Description: Email address for Well-Architected findings notifications
Default: architecture-team@company.com
Resources:
# S3 Bucket for storing assessment reports
AssessmentReportsBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub 'wa-assessment-reports-${AWS::AccountId}'
VersioningConfiguration:
Status: Enabled
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
LifecycleConfiguration:
Rules:
- Id: DeleteOldReports
Status: Enabled
ExpirationInDays: 365
# IAM Role for Lambda execution
WellArchitectedLambdaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: WellArchitectedAssessmentPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- wellarchitected:*
- config:DescribeConfigRules
- config:GetComplianceDetailsByConfigRule
- ec2:Describe*
- rds:Describe*
- s3:GetBucket*
- s3:ListBucket*
- s3:ListAllMyBuckets
- cloudwatch:GetMetricStatistics
- cloudwatch:ListMetrics
- sns:Publish
Resource: '*'
- Effect: Allow
Action:
- s3:PutObject
- s3:GetObject
Resource: !Sub '${AssessmentReportsBucket}/*'
# Lambda function for automated assessment
WellArchitectedAssessmentFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: well-architected-assessment
Runtime: python3.9
Handler: index.lambda_handler
Role: !GetAtt WellArchitectedLambdaRole.Arn
Timeout: 300
MemorySize: 512
Environment:
Variables:
REPORTS_BUCKET: !Ref AssessmentReportsBucket
SNS_TOPIC_ARN: !Ref NotificationTopic
Code:
ZipFile: |
import json
import boto3
import os
from datetime import datetime
def lambda_handler(event, context):
# Import the WellArchitectedAutomation class here
# (In practice, this would be packaged as a deployment artifact)
reports_bucket = os.environ['REPORTS_BUCKET']
sns_topic = os.environ['SNS_TOPIC_ARN']
try:
# Initialize automation
wa_automation = WellArchitectedAutomation()
# Generate report
workload_name = event.get('workload_name', 'automated-assessment')
report = wa_automation.generate_comprehensive_report(workload_name)
# Save report to S3
s3 = boto3.client('s3')
report_key = f"reports/{workload_name}/{datetime.now().strftime('%Y-%m-%d')}.json"
s3.put_object(
Bucket=reports_bucket,
Key=report_key,
Body=json.dumps(report, indent=2),
ContentType='application/json'
)
# Send notification if high-priority issues found
if report['summary']['high_priority_issues'] > 0:
sns = boto3.client('sns')
sns.publish(
TopicArn=sns_topic,
Subject=f"Well-Architected Alert: {workload_name}",
Message=f"High priority issues found: {report['summary']['high_priority_issues']}\n"
f"Potential annual savings: £{report['summary']['potential_annual_savings']:.2f}\n"
f"Report: s3://{reports_bucket}/{report_key}"
)
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Assessment completed successfully',
'report_location': f's3://{reports_bucket}/{report_key}',
'health_score': report['overall_health_score']
})
}
except Exception as e:
print(f"Error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
# SNS Topic for notifications
NotificationTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: well-architected-notifications
DisplayName: Well-Architected Assessment Notifications
# SNS Subscription
NotificationSubscription:
Type: AWS::SNS::Subscription
Properties:
Protocol: email
TopicArn: !Ref NotificationTopic
Endpoint: !Ref NotificationEmail
# EventBridge Rule for scheduled assessments
ScheduledAssessmentRule:
Type: AWS::Events::Rule
Properties:
Name: well-architected-weekly-assessment
Description: Trigger Well-Architected assessment weekly
ScheduleExpression: 'cron(0 9 ? * MON *)' # Every Monday at 9 AM
State: ENABLED
Targets:
- Arn: !GetAtt WellArchitectedAssessmentFunction.Arn
Id: WellArchitectedAssessmentTarget
Input: |
{
"workload_name": "production-infrastructure",
"assessment_type": "scheduled"
}
# Permission for EventBridge to invoke Lambda
LambdaInvokePermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref WellArchitectedAssessmentFunction
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceArn: !GetAtt ScheduledAssessmentRule.Arn
# CloudWatch Dashboard for Well-Architected metrics
WellArchitectedDashboard:
Type: AWS::CloudWatch::Dashboard
Properties:
DashboardName: well-architected-overview
DashboardBody: !Sub |
{
"widgets": [
{
"type": "metric",
"x": 0,
"y": 0,
"width": 12,
"height": 6,
"properties": {
"metrics": [
[ "AWS/Lambda", "Duration", "FunctionName", "${WellArchitectedAssessmentFunction}" ],
[ ".", "Errors", ".", "." ],
[ ".", "Invocations", ".", "." ]
],
"period": 300,
"stat": "Average",
"region": "${AWS::Region}",
"title": "Assessment Function Metrics"
}
},
{
"type": "log",
"x": 0,
"y": 6,
"width": 24,
"height": 6,
"properties": {
"query": "SOURCE '/aws/lambda/${WellArchitectedAssessmentFunction}'\n| fields @timestamp, @message\n| filter @message like /ERROR/\n| sort @timestamp desc\n| limit 100",
"region": "${AWS::Region}",
"title": "Assessment Errors"
}
}
]
}
Outputs:
AssessmentReportsBucket:
Description: S3 bucket containing Well-Architected assessment reports
Value: !Ref AssessmentReportsBucket
Export:
Name: !Sub '${AWS::StackName}-ReportsBucket'
LambdaFunctionName:
Description: Lambda function for Well-Architected assessments
Value: !Ref WellArchitectedAssessmentFunction
Export:
Name: !Sub '${AWS::StackName}-LambdaFunction'
DashboardURL:
Description: CloudWatch dashboard for Well-Architected metrics
Value: !Sub 'https://${AWS::Region}.console.aws.amazon.com/cloudwatch/home?region=${AWS::Region}#dashboards:name=${WellArchitectedDashboard}'
4. Integration with CI/CD for Continuous Architecture Review
# .github/workflows/well-architected-review.yml
name: Well-Architected Continuous Review
on:
schedule:
- cron: '0 9 * * 1' # Every Monday at 9 AM
workflow_dispatch:
inputs:
workload_name:
description: 'Workload name to assess'
required: true
default: 'production-workload'
create_github_issue:
description: 'Create GitHub issue for findings'
type: boolean
default: true
jobs:
well-architected-review:
runs-on: ubuntu-latest
permissions:
issues: write
contents: read
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: eu-west-1
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install boto3 requests
- name: Run Well-Architected Assessment
id: assessment
run: |
python scripts/well_architected_automation.py \
--workload-name "${{ github.event.inputs.workload_name || 'production-workload' }}" \
--output-format json > assessment_report.json
# Extract key metrics
HEALTH_SCORE=$(jq -r '.overall_health_score' assessment_report.json)
HIGH_PRIORITY_ISSUES=$(jq -r '.summary.high_priority_issues' assessment_report.json)
TOTAL_FINDINGS=$(jq -r '.summary.total_findings' assessment_report.json)
POTENTIAL_SAVINGS=$(jq -r '.summary.potential_annual_savings' assessment_report.json)
echo "health_score=$HEALTH_SCORE" >> $GITHUB_OUTPUT
echo "high_priority_issues=$HIGH_PRIORITY_ISSUES" >> $GITHUB_OUTPUT
echo "total_findings=$TOTAL_FINDINGS" >> $GITHUB_OUTPUT
echo "potential_savings=$POTENTIAL_SAVINGS" >> $GITHUB_OUTPUT
- name: Upload assessment report
uses: actions/upload-artifact@v3
with:
name: well-architected-report
path: assessment_report.json
- name: Create GitHub Issue for High Priority Findings
if: steps.assessment.outputs.high_priority_issues > 0 && github.event.inputs.create_github_issue == 'true'
uses: actions/github-script@v6
with:
script: |
const fs = require('fs');
const report = JSON.parse(fs.readFileSync('assessment_report.json', 'utf8'));
let issueBody = `## Well-Architected Assessment Results\n\n`;
issueBody += `**Workload:** ${report.workload_name}\n`;
issueBody += `**Assessment Date:** ${report.assessment_date}\n`;
issueBody += `**Health Score:** ${report.overall_health_score}\n`;
issueBody += `**High Priority Issues:** ${report.summary.high_priority_issues}\n`;
issueBody += `**Total Findings:** ${report.summary.total_findings}\n`;
issueBody += `**Potential Annual Savings:** £${report.summary.potential_annual_savings}\n\n`;
issueBody += `## Top Priority Actions\n\n`;
report.next_steps.slice(0, 5).forEach((action, index) => {
issueBody += `${index + 1}. **${action.action}** (${action.pillar})\n`;
issueBody += ` - Priority: ${action.priority}\n`;
issueBody += ` - Impact: ${action.impact}\n`;
issueBody += ` - Timeline: ${action.timeline}\n`;
if (action.savings) {
issueBody += ` - Savings: ${action.savings}\n`;
}
issueBody += `\n`;
});
issueBody += `## Security Findings\n\n`;
report.pillars.security.findings.forEach(finding => {
issueBody += `- **${finding.finding}** (${finding.severity})\n`;
issueBody += ` - Risk: ${finding.risk}\n`;
issueBody += ` - Recommendation: ${finding.recommendation}\n\n`;
});
await github.rest.issues.create({
owner: context.repo.owner,
repo: context.repo.repo,
title: `Well-Architected Review: ${report.summary.high_priority_issues} High Priority Issues Found`,
body: issueBody,
labels: ['well-architected', 'architecture', 'high-priority']
});
- name: Comment on PR if triggered by pull request
if: github.event_name == 'pull_request'
uses: actions/github-script@v6
with:
script: |
const fs = require('fs');
const report = JSON.parse(fs.readFileSync('assessment_report.json', 'utf8'));
const comment = `## 🏗️ Well-Architected Assessment Results
**Health Score:** ${report.overall_health_score}
**High Priority Issues:** ${report.summary.high_priority_issues}
**Potential Annual Savings:** £${report.summary.potential_annual_savings}
${report.summary.high_priority_issues > 0 ? '⚠️ High priority architecture issues detected. Please review the full report.' : '✅ No high priority issues found.'}
`;
await github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: comment
});
Why It Matters
- Continuous Improvement: Regular assessments prevent architecture drift and technical debt
- Cost Optimization: Automated discovery of savings opportunities worth thousands annually
- Risk Mitigation: Early detection of security and reliability issues before they cause problems
- Team Efficiency: Eliminate manual review overhead while improving assessment quality
- Compliance: Systematic documentation of architecture decisions and improvements
Try This Week
- Run the assessment script – Analyze your current AWS environment
- Deploy Config rules – Set up automated compliance monitoring for one pillar
- Create a workload assessment – Use AWS Well-Architected Tool for one application
- Implement one quick win – Address the highest-impact, lowest-effort finding
Quick Well-Architected Assessment
#!/bin/bash
# Quick manual Well-Architected assessment
echo "=== Quick Well-Architected Assessment ==="
echo
# Cost Optimization Quick Checks
echo "💰 Cost Optimization Checks:"
echo "Unused EBS volumes:"
aws ec2 describe-volumes --filters "Name=status,Values=available" --query 'Volumes[*].[VolumeId,Size,CreateTime]' --output table
echo
echo "Unattached Elastic IPs:"
aws ec2 describe-addresses --query 'Addresses[?!InstanceId].[AllocationId,PublicIp]' --output table
echo
echo "🔒 Security Quick Checks:"
echo "Public S3 buckets (manual review required):"
aws s3api list-buckets --query 'Buckets[*].Name' --output text | head -5
echo
echo "Security groups with 0.0.0.0/0:"
aws ec2 describe-security-groups --query 'SecurityGroups[?IpPermissions[?IpRanges[?CidrIp==`0.0.0.0/0`]]].[GroupId,GroupName]' --output table
echo
echo "⚡ Reliability Quick Checks:"
echo "RDS instances without Multi-AZ:"
aws rds describe-db-instances --query 'DBInstances[?!MultiAZ].[DBInstanceIdentifier,Engine,MultiAZ]' --output table
echo
echo "🎯 Quick recommendations:"
echo "1. Delete unused EBS volumes and Elastic IPs"
echo "2. Review and restrict overly permissive security groups"
echo "3. Enable Multi-AZ for production RDS instances"
echo "4. Implement S3 bucket policies and block public access"
echo "5. Set up automated backups for all critical resources"
echo "6. Use the full automation script for comprehensive analysis"
Common Well-Architected Mistakes
- One-time reviews: Treating architecture review as a checkbox rather than ongoing practice
- Manual processes: Relying on spreadsheets and manual checks that become outdated
- Ignoring cost optimization: Focusing only on security and reliability while missing savings
- No prioritization: Trying to fix everything at once instead of addressing high-impact issues
- No automation: Missing opportunities to catch issues early through continuous monitoring
Advanced Automation Patterns
- Policy as Code: Infrastructure validation rules in CI/CD pipelines
- Custom Lenses: Industry-specific Well-Architected assessments
- Integration with ticketing: Automatic creation of remediation tasks
- Trend analysis: Tracking architecture health improvements over time
Pro Tip: Start with cost optimization automation – it provides immediate, measurable value that funds further architecture improvements. A single automated cost optimization run often pays for months of additional tooling and team time.








