A flat vector illustration split diagonally. On the left, an orange background with a steaming coffee mug labeled ‘MONDAY.’ On the right, a blue background with a central clipboard checklist icon connected to surrounding icons: shield for security, database for storage, magnifying glass for compliance, dollar sign for cost optimization, cog for operations, and speedometer for performance. Minimalist design in warm and cool tones.

Monday Cloud Tip: AWS Well-Architected Review Automation – Architecture That Scales

Your weekly dose of actionable cloud wisdom to start the week right

The Problem

Your AWS architecture has grown organically over years, nobody’s sure if it follows best practices, and manual Well-Architected reviews happen once a year (if at all). You’re missing cost optimization opportunities, security gaps, and performance improvements because there’s no systematic way to assess your architecture against AWS standards. Meanwhile, your team spends weeks preparing for architecture reviews instead of building features.

The Solution

Automate AWS Well-Architected Framework assessments using AWS Config rules, custom Lambda functions, and infrastructure analysis tools. Create continuous architecture monitoring that identifies improvements, tracks compliance, and provides actionable recommendations without manual intervention.

Essential Well-Architected Automation Techniques:

1. Automated Well-Architected Assessment Framework

# well_architected_automation.py
import boto3
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any

class WellArchitectedAutomation:
    def __init__(self, region='eu-west-1'):
        self.wellarchitected = boto3.client('wellarchitected', region_name=region)
        self.config = boto3.client('config', region_name=region)
        self.cloudwatch = boto3.client('cloudwatch', region_name=region)
        self.ec2 = boto3.client('ec2', region_name=region)
        self.rds = boto3.client('rds', region_name=region)
        self.s3 = boto3.client('s3')
        
    def create_workload_assessment(self, workload_name: str, description: str, 
                                 environment: str = 'PRODUCTION') -> str:
        """
        Create a new Well-Architected workload assessment
        """
        try:
            response = self.wellarchitected.create_workload(
                WorkloadName=workload_name,
                Description=description,
                Environment=environment,
                ReviewOwner='architecture-team@company.com',
                ArchitecturalDesign='https://wiki.company.com/architecture/' + workload_name.lower(),
                PillarPriorities=[
                    'costOptimization',
                    'security', 
                    'reliability',
                    'performance',
                    'operationalExcellence',
                    'sustainability'
                ],
                Tags={
                    'Environment': environment,
                    'ReviewType': 'Automated',
                    'CreatedBy': 'WellArchitectedAutomation',
                    'CreatedDate': datetime.now().strftime('%Y-%m-%d')
                }
            )
            
            workload_id = response['WorkloadId']
            print(f"Created workload assessment: {workload_id}")
            
            # Start lens review
            self.wellarchitected.create_lens_review(
                WorkloadId=workload_id,
                LensAlias='wellarchitected'
            )
            
            return workload_id
            
        except Exception as e:
            print(f"Error creating workload: {str(e)}")
            return None
    
    def analyze_cost_optimization(self) -> Dict[str, Any]:
        """
        Automated cost optimization analysis
        """
        recommendations = []
        potential_savings = 0
        
        # 1. Analyze unused EBS volumes
        unused_volumes = self.find_unused_ebs_volumes()
        if unused_volumes:
            volume_cost = len(unused_volumes) * 0.08 * 30  # £0.08/GB/month estimate
            recommendations.append({
                'pillar': 'Cost Optimization',
                'finding': f"Found {len(unused_volumes)} unused EBS volumes",
                'impact': 'MEDIUM',
                'effort': 'LOW',
                'savings_per_month': volume_cost,
                'recommendation': 'Delete unused EBS volumes or create snapshots if needed',
                'resources': unused_volumes
            })
            potential_savings += volume_cost
        
        # 2. Analyze unattached Elastic IPs
        unused_eips = self.find_unused_elastic_ips()
        if unused_eips:
            eip_cost = len(unused_eips) * 3.65  # £3.65/month per unused EIP
            recommendations.append({
                'pillar': 'Cost Optimization',
                'finding': f"Found {len(unused_eips)} unused Elastic IPs",
                'impact': 'MEDIUM',
                'effort': 'LOW',
                'savings_per_month': eip_cost,
                'recommendation': 'Release unused Elastic IP addresses',
                'resources': unused_eips
            })
            potential_savings += eip_cost
        
        # 3. Analyze oversized instances
        oversized_instances = self.find_oversized_instances()
        if oversized_instances:
            instance_savings = sum([inst['potential_savings'] for inst in oversized_instances])
            recommendations.append({
                'pillar': 'Cost Optimization',
                'finding': f"Found {len(oversized_instances)} potentially oversized instances",
                'impact': 'HIGH',
                'effort': 'MEDIUM',
                'savings_per_month': instance_savings,
                'recommendation': 'Right-size EC2 instances based on utilization metrics',
                'resources': oversized_instances
            })
            potential_savings += instance_savings
        
        # 4. Analyze old snapshots
        old_snapshots = self.find_old_snapshots()
        if old_snapshots:
            snapshot_cost = sum([snap['size_gb'] for snap in old_snapshots]) * 0.05  # £0.05/GB/month
            recommendations.append({
                'pillar': 'Cost Optimization',
                'finding': f"Found {len(old_snapshots)} snapshots older than 90 days",
                'impact': 'MEDIUM',
                'effort': 'LOW',
                'savings_per_month': snapshot_cost,
                'recommendation': 'Implement snapshot lifecycle policies',
                'resources': old_snapshots
            })
            potential_savings += snapshot_cost
        
        return {
            'pillar': 'Cost Optimization',
            'total_potential_savings': potential_savings,
            'annual_savings': potential_savings * 12,
            'recommendations': recommendations,
            'assessment_date': datetime.now().isoformat()
        }
    
    def analyze_security_posture(self) -> Dict[str, Any]:
        """
        Automated security analysis based on Well-Architected principles
        """
        security_findings = []
        
        # 1. Check for public S3 buckets
        public_buckets = self.find_public_s3_buckets()
        if public_buckets:
            security_findings.append({
                'pillar': 'Security',
                'finding': f"Found {len(public_buckets)} publicly accessible S3 buckets",
                'severity': 'HIGH',
                'risk': 'Data exposure, potential data breach',
                'recommendation': 'Review bucket policies and remove public access unless required',
                'resources': public_buckets,
                'remediation_steps': [
                    'Review bucket contents for sensitive data',
                    'Implement bucket policies with least privilege',
                    'Enable S3 Block Public Access',
                    'Set up CloudTrail logging for bucket access'
                ]
            })
        
        # 2. Check for security groups with overly permissive rules
        open_security_groups = self.find_overly_permissive_security_groups()
        if open_security_groups:
            security_findings.append({
                'pillar': 'Security',
                'finding': f"Found {len(open_security_groups)} security groups with 0.0.0.0/0 access",
                'severity': 'HIGH',
                'risk': 'Unauthorized access, potential intrusion',
                'recommendation': 'Implement least privilege access controls',
                'resources': open_security_groups,
                'remediation_steps': [
                    'Review and restrict CIDR ranges',
                    'Use specific IP ranges or security group references',
                    'Implement network ACLs for additional protection',
                    'Enable VPC Flow Logs for monitoring'
                ]
            })
        
        # 3. Check for unencrypted EBS volumes
        unencrypted_volumes = self.find_unencrypted_ebs_volumes()
        if unencrypted_volumes:
            security_findings.append({
                'pillar': 'Security',
                'finding': f"Found {len(unencrypted_volumes)} unencrypted EBS volumes",
                'severity': 'MEDIUM',
                'risk': 'Data exposure if storage media is compromised',
                'recommendation': 'Enable encryption for all EBS volumes',
                'resources': unencrypted_volumes,
                'remediation_steps': [
                    'Create encrypted snapshots of existing volumes',
                    'Replace volumes with encrypted versions',
                    'Enable encryption by default for new volumes',
                    'Use AWS KMS for key management'
                ]
            })
        
        # 4. Check for RDS instances without backup
        unprotected_databases = self.find_rds_without_backup()
        if unprotected_databases:
            security_findings.append({
                'pillar': 'Security',
                'finding': f"Found {len(unprotected_databases)} RDS instances with inadequate backup",
                'severity': 'MEDIUM',
                'risk': 'Data loss, inability to recover from incidents',
                'recommendation': 'Configure automated backups and point-in-time recovery',
                'resources': unprotected_databases,
                'remediation_steps': [
                    'Enable automated backups with 7+ day retention',
                    'Enable point-in-time recovery',
                    'Configure backup encryption',
                    'Test backup restoration procedures'
                ]
            })
        
        return {
            'pillar': 'Security',
            'total_findings': len(security_findings),
            'high_severity': len([f for f in security_findings if f['severity'] == 'HIGH']),
            'medium_severity': len([f for f in security_findings if f['severity'] == 'MEDIUM']),
            'findings': security_findings,
            'assessment_date': datetime.now().isoformat()
        }
    
    def analyze_reliability(self) -> Dict[str, Any]:
        """
        Automated reliability analysis
        """
        reliability_findings = []
        
        # 1. Check for single AZ deployments
        single_az_resources = self.find_single_az_resources()
        if single_az_resources:
            reliability_findings.append({
                'pillar': 'Reliability',
                'finding': f"Found {len(single_az_resources)} resources in single AZ",
                'impact': 'HIGH',
                'recommendation': 'Deploy across multiple Availability Zones',
                'resources': single_az_resources,
                'improvement_steps': [
                    'Use Auto Scaling Groups across multiple AZs',
                    'Deploy RDS with Multi-AZ configuration',
                    'Use Application Load Balancers with multiple AZ targets',
                    'Implement cross-AZ data replication'
                ]
            })
        
        # 2. Check for missing health checks
        resources_without_health_checks = self.find_resources_without_health_checks()
        if resources_without_health_checks:
            reliability_findings.append({
                'pillar': 'Reliability',
                'finding': f"Found {len(resources_without_health_checks)} resources without health checks",
                'impact': 'MEDIUM',
                'recommendation': 'Implement comprehensive health monitoring',
                'resources': resources_without_health_checks,
                'improvement_steps': [
                    'Configure ELB health checks',
                    'Set up CloudWatch alarms',
                    'Implement application-level health endpoints',
                    'Configure automatic failure detection and recovery'
                ]
            })
        
        return {
            'pillar': 'Reliability',
            'total_findings': len(reliability_findings),
            'critical_issues': len([f for f in reliability_findings if f['impact'] == 'HIGH']),
            'findings': reliability_findings,
            'assessment_date': datetime.now().isoformat()
        }
    
    def generate_comprehensive_report(self, workload_name: str) -> Dict[str, Any]:
        """
        Generate a comprehensive Well-Architected assessment report
        """
        print(f"Generating Well-Architected assessment for: {workload_name}")
        
        # Run all assessments
        cost_analysis = self.analyze_cost_optimization()
        security_analysis = self.analyze_security_posture()
        reliability_analysis = self.analyze_reliability()
        
        # Calculate overall score
        total_findings = (len(cost_analysis['recommendations']) + 
                         len(security_analysis['findings']) + 
                         len(reliability_analysis['findings']))
        
        high_priority_issues = (
            len([r for r in cost_analysis['recommendations'] if r['impact'] == 'HIGH']) +
            len([f for f in security_analysis['findings'] if f['severity'] == 'HIGH']) +
            len([f for f in reliability_analysis['findings'] if f['impact'] == 'HIGH'])
        )
        
        # Overall health score (simplified)
        if high_priority_issues == 0 and total_findings <= 5:
            health_score = "EXCELLENT"
        elif high_priority_issues <= 2 and total_findings <= 10:
            health_score = "GOOD"
        elif high_priority_issues <= 5 and total_findings <= 20:
            health_score = "FAIR"
        else:
            health_score = "NEEDS_IMPROVEMENT"
        
        report = {
            'workload_name': workload_name,
            'assessment_date': datetime.now().isoformat(),
            'overall_health_score': health_score,
            'summary': {
                'total_findings': total_findings,
                'high_priority_issues': high_priority_issues,
                'potential_annual_savings': cost_analysis['annual_savings']
            },
            'pillars': {
                'cost_optimization': cost_analysis,
                'security': security_analysis,
                'reliability': reliability_analysis
            },
            'next_steps': self.generate_action_plan(cost_analysis, security_analysis, reliability_analysis)
        }
        
        return report
    
    def generate_action_plan(self, cost_analysis, security_analysis, reliability_analysis) -> List[Dict]:
        """
        Generate prioritized action plan based on findings
        """
        all_actions = []
        
        # High-priority security issues first
        for finding in security_analysis['findings']:
            if finding['severity'] == 'HIGH':
                all_actions.append({
                    'priority': 1,
                    'pillar': 'Security',
                    'action': finding['recommendation'],
                    'effort': 'HIGH',
                    'impact': 'HIGH',
                    'timeline': 'Immediate (1-2 weeks)'
                })
        
        # High-impact cost optimizations
        for rec in cost_analysis['recommendations']:
            if rec['impact'] == 'HIGH' and rec['effort'] == 'LOW':
                all_actions.append({
                    'priority': 2,
                    'pillar': 'Cost Optimization',
                    'action': rec['recommendation'],
                    'effort': rec['effort'],
                    'impact': rec['impact'],
                    'savings': f"£{rec['savings_per_month']:.2f}/month",
                    'timeline': 'Short term (2-4 weeks)'
                })
        
        # Critical reliability issues
        for finding in reliability_analysis['findings']:
            if finding['impact'] == 'HIGH':
                all_actions.append({
                    'priority': 3,
                    'pillar': 'Reliability',
                    'action': finding['recommendation'],
                    'effort': 'MEDIUM',
                    'impact': finding['impact'],
                    'timeline': 'Medium term (1-2 months)'
                })
        
        return sorted(all_actions, key=lambda x: x['priority'])
    
    # Helper methods for resource analysis
    def find_unused_ebs_volumes(self) -> List[str]:
        """Find unattached EBS volumes"""
        try:
            response = self.ec2.describe_volumes(
                Filters=[{'Name': 'status', 'Values': ['available']}]
            )
            return [vol['VolumeId'] for vol in response['Volumes']]
        except Exception:
            return []
    
    def find_unused_elastic_ips(self) -> List[str]:
        """Find unallocated Elastic IPs"""
        try:
            response = self.ec2.describe_addresses()
            return [addr['AllocationId'] for addr in response['Addresses'] 
                   if 'InstanceId' not in addr]
        except Exception:
            return []
    
    def find_public_s3_buckets(self) -> List[str]:
        """Find S3 buckets with public access"""
        public_buckets = []
        try:
            buckets = self.s3.list_buckets()['Buckets']
            for bucket in buckets:
                try:
                    # Check bucket ACL and policy for public access
                    acl = self.s3.get_bucket_acl(Bucket=bucket['Name'])
                    for grant in acl['Grants']:
                        if 'URI' in grant.get('Grantee', {}):
                            if 'AllUsers' in grant['Grantee']['URI']:
                                public_buckets.append(bucket['Name'])
                                break
                except Exception:
                    continue
        except Exception:
            pass
        return public_buckets
    
    def find_overly_permissive_security_groups(self) -> List[Dict]:
        """Find security groups with 0.0.0.0/0 access"""
        permissive_sgs = []
        try:
            response = self.ec2.describe_security_groups()
            for sg in response['SecurityGroups']:
                for rule in sg['IpPermissions']:
                    for ip_range in rule.get('IpRanges', []):
                        if ip_range.get('CidrIp') == '0.0.0.0/0':
                            permissive_sgs.append({
                                'group_id': sg['GroupId'],
                                'group_name': sg['GroupName'],
                                'port': rule.get('FromPort'),
                                'protocol': rule.get('IpProtocol')
                            })
                            break
        except Exception:
            pass
        return permissive_sgs
    
    def find_unencrypted_ebs_volumes(self) -> List[str]:
        """Find unencrypted EBS volumes"""
        try:
            response = self.ec2.describe_volumes(
                Filters=[{'Name': 'encrypted', 'Values': ['false']}]
            )
            return [vol['VolumeId'] for vol in response['Volumes']]
        except Exception:
            return []
    
    def find_oversized_instances(self) -> List[Dict]:
        """Find potentially oversized EC2 instances based on CloudWatch metrics"""
        oversized = []
        try:
            # Get running instances
            instances = self.ec2.describe_instances(
                Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]
            )
            
            for reservation in instances['Reservations']:
                for instance in reservation['Instances']:
                    instance_id = instance['InstanceId']
                    instance_type = instance['InstanceType']
                    
                    # Get CPU utilization for last 7 days
                    end_time = datetime.utcnow()
                    start_time = end_time - timedelta(days=7)
                    
                    cpu_metrics = self.cloudwatch.get_metric_statistics(
                        Namespace='AWS/EC2',
                        MetricName='CPUUtilization',
                        Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}],
                        StartTime=start_time,
                        EndTime=end_time,
                        Period=3600,
                        Statistics=['Average']
                    )
                    
                    if cpu_metrics['Datapoints']:
                        avg_cpu = sum([dp['Average'] for dp in cpu_metrics['Datapoints']]) / len(cpu_metrics['Datapoints'])
                        if avg_cpu < 20:  # Less than 20% average CPU utilization
                            oversized.append({
                                'instance_id': instance_id,
                                'instance_type': instance_type,
                                'avg_cpu_utilization': round(avg_cpu, 2),
                                'potential_savings': 50  # Simplified calculation
                            })
        except Exception:
            pass
        return oversized
    
    def find_old_snapshots(self) -> List[Dict]:
        """Find snapshots older than 90 days"""
        old_snapshots = []
        try:
            cutoff_date = datetime.utcnow() - timedelta(days=90)
            response = self.ec2.describe_snapshots(OwnerIds=['self'])
            
            for snapshot in response['Snapshots']:
                if snapshot['StartTime'].replace(tzinfo=None) < cutoff_date:
                    old_snapshots.append({
                        'snapshot_id': snapshot['SnapshotId'],
                        'start_time': snapshot['StartTime'].isoformat(),
                        'size_gb': snapshot['VolumeSize']
                    })
        except Exception:
            pass
        return old_snapshots
    
    def find_rds_without_backup(self) -> List[Dict]:
        """Find RDS instances with inadequate backup configuration"""
        unprotected_db = []
        try:
            response = self.rds.describe_db_instances()
            for db in response['DBInstances']:
                if (db['BackupRetentionPeriod'] < 7 or 
                    not db.get('DeletionProtection', False)):
                    unprotected_db.append({
                        'db_instance_id': db['DBInstanceIdentifier'],
                        'backup_retention': db['BackupRetentionPeriod'],
                        'deletion_protection': db.get('DeletionProtection', False)
                    })
        except Exception:
            pass
        return unprotected_db
    
    def find_single_az_resources(self) -> List[Dict]:
        """Find resources deployed in single AZ"""
        single_az = []
        # Implementation would check RDS, ELB, Auto Scaling Groups, etc.
        # Simplified for example
        return single_az
    
    def find_resources_without_health_checks(self) -> List[Dict]:
        """Find resources without proper health checks"""
        no_health_checks = []
        # Implementation would check ELB targets, Auto Scaling Groups, etc.
        # Simplified for example
        return no_health_checks

# Usage example
if __name__ == "__main__":
    wa_automation = WellArchitectedAutomation()
    
    # Generate comprehensive report
    report = wa_automation.generate_comprehensive_report("production-web-app")
    
    # Print summary
    print(f"=== Well-Architected Assessment Report ===")
    print(f"Workload: {report['workload_name']}")
    print(f"Overall Health Score: {report['overall_health_score']}")
    print(f"Total Findings: {report['summary']['total_findings']}")
    print(f"High Priority Issues: {report['summary']['high_priority_issues']}")
    print(f"Potential Annual Savings: £{report['summary']['potential_annual_savings']:.2f}")
    print()
    
    print("=== Top Priority Actions ===")
    for i, action in enumerate(report['next_steps'][:5], 1):
        print(f"{i}. {action['action']} ({action['pillar']})")
        print(f"   Priority: {action['priority']}, Impact: {action['impact']}, Timeline: {action['timeline']}")
        if 'savings' in action:
            print(f"   Savings: {action['savings']}")
        print()

2. AWS Config Rules for Continuous Compliance

{
  "ConfigRuleName": "wa-security-compliance-pack",
  "Description": "Well-Architected Security pillar compliance rules",
  "Source": {
    "Owner": "AWS",
    "SourceIdentifier": "SECURITY_GROUPS_NOT_ALLOWING_UNRESTRICTED_TRAFFIC"
  },
  "Scope": {
    "ComplianceResourceTypes": [
      "AWS::EC2::SecurityGroup"
    ]
  },
  "InputParameters": {
    "blockedPort1": "22",
    "blockedPort2": "3389",
    "blockedPort3": "80",
    "blockedPort4": "443"
  }
}
#!/bin/bash
# Deploy Well-Architected Config Rules

echo "=== Deploying Well-Architected Config Rules ==="

# Cost Optimization Rules
aws configservice put-config-rule \
    --config-rule '{
      "ConfigRuleName": "wa-unused-ebs-volumes",
      "Source": {
        "Owner": "AWS",
        "SourceIdentifier": "EBS_OPTIMIZED_INSTANCE"
      }
    }'

aws configservice put-config-rule \
    --config-rule '{
      "ConfigRuleName": "wa-unattached-elastic-ips",
      "Source": {
        "Owner": "AWS", 
        "SourceIdentifier": "EIP_ATTACHED"
      }
    }'

# Security Rules
aws configservice put-config-rule \
    --config-rule '{
      "ConfigRuleName": "wa-s3-bucket-public-access-prohibited",
      "Source": {
        "Owner": "AWS",
        "SourceIdentifier": "S3_BUCKET_PUBLIC_ACCESS_PROHIBITED"
      }
    }'

aws configservice put-config-rule \
    --config-rule '{
      "ConfigRuleName": "wa-ebs-snapshot-public-restorable-check",
      "Source": {
        "Owner": "AWS",
        "SourceIdentifier": "EBS_SNAPSHOT_PUBLIC_RESTORABLE_CHECK"
      }
    }'

# Reliability Rules
aws configservice put-config-rule \
    --config-rule '{
      "ConfigRuleName": "wa-rds-multi-az-support",
      "Source": {
        "Owner": "AWS",
        "SourceIdentifier": "RDS_MULTI_AZ_SUPPORT"
      }
    }'

aws configservice put-config-rule \
    --config-rule '{
      "ConfigRuleName": "wa-elb-cross-zone-load-balancing-enabled",
      "Source": {
        "Owner": "AWS",
        "SourceIdentifier": "ELB_CROSS_ZONE_LOAD_BALANCING_ENABLED"
      }
    }'

echo "✅ Well-Architected Config Rules deployed successfully"

3. CloudFormation Template for Automated Review Infrastructure

# well-architected-automation.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Well-Architected Review Automation Infrastructure'

Parameters:
  NotificationEmail:
    Type: String
    Description: Email address for Well-Architected findings notifications
    Default: architecture-team@company.com

Resources:
  # S3 Bucket for storing assessment reports
  AssessmentReportsBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub 'wa-assessment-reports-${AWS::AccountId}'
      VersioningConfiguration:
        Status: Enabled
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      LifecycleConfiguration:
        Rules:
          - Id: DeleteOldReports
            Status: Enabled
            ExpirationInDays: 365

  # IAM Role for Lambda execution
  WellArchitectedLambdaRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: WellArchitectedAssessmentPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - wellarchitected:*
                  - config:DescribeConfigRules
                  - config:GetComplianceDetailsByConfigRule
                  - ec2:Describe*
                  - rds:Describe*
                  - s3:GetBucket*
                  - s3:ListBucket*
                  - s3:ListAllMyBuckets
                  - cloudwatch:GetMetricStatistics
                  - cloudwatch:ListMetrics
                  - sns:Publish
                Resource: '*'
              - Effect: Allow
                Action:
                  - s3:PutObject
                  - s3:GetObject
                Resource: !Sub '${AssessmentReportsBucket}/*'

  # Lambda function for automated assessment
  WellArchitectedAssessmentFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: well-architected-assessment
      Runtime: python3.9
      Handler: index.lambda_handler
      Role: !GetAtt WellArchitectedLambdaRole.Arn
      Timeout: 300
      MemorySize: 512
      Environment:
        Variables:
          REPORTS_BUCKET: !Ref AssessmentReportsBucket
          SNS_TOPIC_ARN: !Ref NotificationTopic
      Code:
        ZipFile: |
          import json
          import boto3
          import os
          from datetime import datetime
          
          def lambda_handler(event, context):
              # Import the WellArchitectedAutomation class here
              # (In practice, this would be packaged as a deployment artifact)
              
              reports_bucket = os.environ['REPORTS_BUCKET']
              sns_topic = os.environ['SNS_TOPIC_ARN']
              
              try:
                  # Initialize automation
                  wa_automation = WellArchitectedAutomation()
                  
                  # Generate report
                  workload_name = event.get('workload_name', 'automated-assessment')
                  report = wa_automation.generate_comprehensive_report(workload_name)
                  
                  # Save report to S3
                  s3 = boto3.client('s3')
                  report_key = f"reports/{workload_name}/{datetime.now().strftime('%Y-%m-%d')}.json"
                  
                  s3.put_object(
                      Bucket=reports_bucket,
                      Key=report_key,
                      Body=json.dumps(report, indent=2),
                      ContentType='application/json'
                  )
                  
                  # Send notification if high-priority issues found
                  if report['summary']['high_priority_issues'] > 0:
                      sns = boto3.client('sns')
                      sns.publish(
                          TopicArn=sns_topic,
                          Subject=f"Well-Architected Alert: {workload_name}",
                          Message=f"High priority issues found: {report['summary']['high_priority_issues']}\n"
                                 f"Potential annual savings: £{report['summary']['potential_annual_savings']:.2f}\n"
                                 f"Report: s3://{reports_bucket}/{report_key}"
                      )
                  
                  return {
                      'statusCode': 200,
                      'body': json.dumps({
                          'message': 'Assessment completed successfully',
                          'report_location': f's3://{reports_bucket}/{report_key}',
                          'health_score': report['overall_health_score']
                      })
                  }
                  
              except Exception as e:
                  print(f"Error: {str(e)}")
                  return {
                      'statusCode': 500,
                      'body': json.dumps({'error': str(e)})
                  }

  # SNS Topic for notifications
  NotificationTopic:
    Type: AWS::SNS::Topic
    Properties:
      TopicName: well-architected-notifications
      DisplayName: Well-Architected Assessment Notifications

  # SNS Subscription
  NotificationSubscription:
    Type: AWS::SNS::Subscription
    Properties:
      Protocol: email
      TopicArn: !Ref NotificationTopic
      Endpoint: !Ref NotificationEmail

  # EventBridge Rule for scheduled assessments
  ScheduledAssessmentRule:
    Type: AWS::Events::Rule
    Properties:
      Name: well-architected-weekly-assessment
      Description: Trigger Well-Architected assessment weekly
      ScheduleExpression: 'cron(0 9 ? * MON *)'  # Every Monday at 9 AM
      State: ENABLED
      Targets:
        - Arn: !GetAtt WellArchitectedAssessmentFunction.Arn
          Id: WellArchitectedAssessmentTarget
          Input: |
            {
              "workload_name": "production-infrastructure",
              "assessment_type": "scheduled"
            }

  # Permission for EventBridge to invoke Lambda
  LambdaInvokePermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref WellArchitectedAssessmentFunction
      Action: lambda:InvokeFunction
      Principal: events.amazonaws.com
      SourceArn: !GetAtt ScheduledAssessmentRule.Arn

  # CloudWatch Dashboard for Well-Architected metrics
  WellArchitectedDashboard:
    Type: AWS::CloudWatch::Dashboard
    Properties:
      DashboardName: well-architected-overview
      DashboardBody: !Sub |
        {
          "widgets": [
            {
              "type": "metric",
              "x": 0,
              "y": 0,
              "width": 12,
              "height": 6,
              "properties": {
                "metrics": [
                  [ "AWS/Lambda", "Duration", "FunctionName", "${WellArchitectedAssessmentFunction}" ],
                  [ ".", "Errors", ".", "." ],
                  [ ".", "Invocations", ".", "." ]
                ],
                "period": 300,
                "stat": "Average",
                "region": "${AWS::Region}",
                "title": "Assessment Function Metrics"
              }
            },
            {
              "type": "log",
              "x": 0,
              "y": 6,
              "width": 24,
              "height": 6,
              "properties": {
                "query": "SOURCE '/aws/lambda/${WellArchitectedAssessmentFunction}'\n| fields @timestamp, @message\n| filter @message like /ERROR/\n| sort @timestamp desc\n| limit 100",
                "region": "${AWS::Region}",
                "title": "Assessment Errors"
              }
            }
          ]
        }

Outputs:
  AssessmentReportsBucket:
    Description: S3 bucket containing Well-Architected assessment reports
    Value: !Ref AssessmentReportsBucket
    Export:
      Name: !Sub '${AWS::StackName}-ReportsBucket'

  LambdaFunctionName:
    Description: Lambda function for Well-Architected assessments
    Value: !Ref WellArchitectedAssessmentFunction
    Export:
      Name: !Sub '${AWS::StackName}-LambdaFunction'

  DashboardURL:
    Description: CloudWatch dashboard for Well-Architected metrics
    Value: !Sub 'https://${AWS::Region}.console.aws.amazon.com/cloudwatch/home?region=${AWS::Region}#dashboards:name=${WellArchitectedDashboard}'

4. Integration with CI/CD for Continuous Architecture Review

# .github/workflows/well-architected-review.yml
name: Well-Architected Continuous Review

on:
  schedule:
    - cron: '0 9 * * 1'  # Every Monday at 9 AM
  workflow_dispatch:
    inputs:
      workload_name:
        description: 'Workload name to assess'
        required: true
        default: 'production-workload'
      create_github_issue:
        description: 'Create GitHub issue for findings'
        type: boolean
        default: true

jobs:
  well-architected-review:
    runs-on: ubuntu-latest
    permissions:
      issues: write
      contents: read
    
    steps:
      - name: Checkout code
        uses: actions/checkout@v3
      
      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v2
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: eu-west-1
      
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.9'
      
      - name: Install dependencies
        run: |
          pip install boto3 requests
      
      - name: Run Well-Architected Assessment
        id: assessment
        run: |
          python scripts/well_architected_automation.py \
            --workload-name "${{ github.event.inputs.workload_name || 'production-workload' }}" \
            --output-format json > assessment_report.json
          
          # Extract key metrics
          HEALTH_SCORE=$(jq -r '.overall_health_score' assessment_report.json)
          HIGH_PRIORITY_ISSUES=$(jq -r '.summary.high_priority_issues' assessment_report.json)
          TOTAL_FINDINGS=$(jq -r '.summary.total_findings' assessment_report.json)
          POTENTIAL_SAVINGS=$(jq -r '.summary.potential_annual_savings' assessment_report.json)
          
          echo "health_score=$HEALTH_SCORE" >> $GITHUB_OUTPUT
          echo "high_priority_issues=$HIGH_PRIORITY_ISSUES" >> $GITHUB_OUTPUT
          echo "total_findings=$TOTAL_FINDINGS" >> $GITHUB_OUTPUT
          echo "potential_savings=$POTENTIAL_SAVINGS" >> $GITHUB_OUTPUT
      
      - name: Upload assessment report
        uses: actions/upload-artifact@v3
        with:
          name: well-architected-report
          path: assessment_report.json
      
      - name: Create GitHub Issue for High Priority Findings
        if: steps.assessment.outputs.high_priority_issues > 0 && github.event.inputs.create_github_issue == 'true'
        uses: actions/github-script@v6
        with:
          script: |
            const fs = require('fs');
            const report = JSON.parse(fs.readFileSync('assessment_report.json', 'utf8'));
            
            let issueBody = `## Well-Architected Assessment Results\n\n`;
            issueBody += `**Workload:** ${report.workload_name}\n`;
            issueBody += `**Assessment Date:** ${report.assessment_date}\n`;
            issueBody += `**Health Score:** ${report.overall_health_score}\n`;
            issueBody += `**High Priority Issues:** ${report.summary.high_priority_issues}\n`;
            issueBody += `**Total Findings:** ${report.summary.total_findings}\n`;
            issueBody += `**Potential Annual Savings:** £${report.summary.potential_annual_savings}\n\n`;
            
            issueBody += `## Top Priority Actions\n\n`;
            report.next_steps.slice(0, 5).forEach((action, index) => {
              issueBody += `${index + 1}. **${action.action}** (${action.pillar})\n`;
              issueBody += `   - Priority: ${action.priority}\n`;
              issueBody += `   - Impact: ${action.impact}\n`;
              issueBody += `   - Timeline: ${action.timeline}\n`;
              if (action.savings) {
                issueBody += `   - Savings: ${action.savings}\n`;
              }
              issueBody += `\n`;
            });
            
            issueBody += `## Security Findings\n\n`;
            report.pillars.security.findings.forEach(finding => {
              issueBody += `- **${finding.finding}** (${finding.severity})\n`;
              issueBody += `  - Risk: ${finding.risk}\n`;
              issueBody += `  - Recommendation: ${finding.recommendation}\n\n`;
            });
            
            await github.rest.issues.create({
              owner: context.repo.owner,
              repo: context.repo.repo,
              title: `Well-Architected Review: ${report.summary.high_priority_issues} High Priority Issues Found`,
              body: issueBody,
              labels: ['well-architected', 'architecture', 'high-priority']
            });
      
      - name: Comment on PR if triggered by pull request
        if: github.event_name == 'pull_request'
        uses: actions/github-script@v6
        with:
          script: |
            const fs = require('fs');
            const report = JSON.parse(fs.readFileSync('assessment_report.json', 'utf8'));
            
            const comment = `## 🏗️ Well-Architected Assessment Results
            
            **Health Score:** ${report.overall_health_score}
            **High Priority Issues:** ${report.summary.high_priority_issues}
            **Potential Annual Savings:** £${report.summary.potential_annual_savings}
            
            ${report.summary.high_priority_issues > 0 ? '⚠️ High priority architecture issues detected. Please review the full report.' : '✅ No high priority issues found.'}
            `;
            
            await github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: comment
            });

Why It Matters

  • Continuous Improvement: Regular assessments prevent architecture drift and technical debt
  • Cost Optimization: Automated discovery of savings opportunities worth thousands annually
  • Risk Mitigation: Early detection of security and reliability issues before they cause problems
  • Team Efficiency: Eliminate manual review overhead while improving assessment quality
  • Compliance: Systematic documentation of architecture decisions and improvements

Try This Week

  1. Run the assessment script – Analyze your current AWS environment
  2. Deploy Config rules – Set up automated compliance monitoring for one pillar
  3. Create a workload assessment – Use AWS Well-Architected Tool for one application
  4. Implement one quick win – Address the highest-impact, lowest-effort finding

Quick Well-Architected Assessment

#!/bin/bash
# Quick manual Well-Architected assessment

echo "=== Quick Well-Architected Assessment ==="
echo

# Cost Optimization Quick Checks
echo "💰 Cost Optimization Checks:"
echo "Unused EBS volumes:"
aws ec2 describe-volumes --filters "Name=status,Values=available" --query 'Volumes[*].[VolumeId,Size,CreateTime]' --output table

echo
echo "Unattached Elastic IPs:"
aws ec2 describe-addresses --query 'Addresses[?!InstanceId].[AllocationId,PublicIp]' --output table

echo
echo "🔒 Security Quick Checks:"
echo "Public S3 buckets (manual review required):"
aws s3api list-buckets --query 'Buckets[*].Name' --output text | head -5

echo
echo "Security groups with 0.0.0.0/0:"
aws ec2 describe-security-groups --query 'SecurityGroups[?IpPermissions[?IpRanges[?CidrIp==`0.0.0.0/0`]]].[GroupId,GroupName]' --output table

echo
echo "⚡ Reliability Quick Checks:"
echo "RDS instances without Multi-AZ:"
aws rds describe-db-instances --query 'DBInstances[?!MultiAZ].[DBInstanceIdentifier,Engine,MultiAZ]' --output table

echo
echo "🎯 Quick recommendations:"
echo "1. Delete unused EBS volumes and Elastic IPs"
echo "2. Review and restrict overly permissive security groups"
echo "3. Enable Multi-AZ for production RDS instances"
echo "4. Implement S3 bucket policies and block public access"
echo "5. Set up automated backups for all critical resources"
echo "6. Use the full automation script for comprehensive analysis"

Common Well-Architected Mistakes

  • One-time reviews: Treating architecture review as a checkbox rather than ongoing practice
  • Manual processes: Relying on spreadsheets and manual checks that become outdated
  • Ignoring cost optimization: Focusing only on security and reliability while missing savings
  • No prioritization: Trying to fix everything at once instead of addressing high-impact issues
  • No automation: Missing opportunities to catch issues early through continuous monitoring

Advanced Automation Patterns

  • Policy as Code: Infrastructure validation rules in CI/CD pipelines
  • Custom Lenses: Industry-specific Well-Architected assessments
  • Integration with ticketing: Automatic creation of remediation tasks
  • Trend analysis: Tracking architecture health improvements over time

Pro Tip: Start with cost optimization automation – it provides immediate, measurable value that funds further architecture improvements. A single automated cost optimization run often pays for months of additional tooling and team time.