Monday Cloud Tip: AWS EBS Volume Optimization: Finding and Fixing Waste

Your weekly dose of actionable cloud wisdom to start the week right

The Problem

Your AWS bill shows £3,500/month in EBS storage costs, but you only have 50 active EC2 instances. You’ve got 200 unattached volumes from terminated test instances still racking up charges, your production workloads are running on expensive gp2 volumes when gp3 would be 20% cheaper, and you’re storing 18 months of snapshots that nobody will ever restore. Meanwhile, your finance team is asking why storage costs have grown 300% in six months while compute costs stayed flat.

The Solution

Implement a comprehensive EBS optimization strategy using automated detection of zombie volumes, gp2-to-gp3 migration, snapshot lifecycle policies, and volume right-sizing. EBS costs can typically be reduced by 30-50% through systematic cleanup, modernization, and intelligent capacity management – without sacrificing performance or reliability.

Essential EBS Cost Optimization Strategies

1. Unattached Volume Detection and Automated Cleanup


# Python
# AWS EBS volume waste detection and cleanup automation
import boto3
import json
from datetime import datetime, timedelta
from typing import List, Dict, Any
import os

class EBSVolumeOptimizer:
    def __init__(self, regions: List[str] = None):
        """
        Initialize EBS optimizer for multiple regions
        """
        if regions is None:
            # Get all available regions
            ec2_client = boto3.client('ec2')
            regions = [region['RegionName'] 
                      for region in ec2_client.describe_regions()['Regions']]
        
        self.regions = regions
        self.sns_topic_arn = os.environ.get('SNS_TOPIC_ARN')
        self.cost_savings = {
            'unattached_volumes': 0.0,
            'gp2_to_gp3_migration': 0.0,
            'oversized_volumes': 0.0,
            'snapshot_cleanup': 0.0
        }
        
    def find_unattached_volumes(self, min_age_days: int = 7) -> Dict[str, List[Dict]]:
        """
        Find unattached EBS volumes across all regions
        
        Args:
            min_age_days: Only flag volumes unattached for this many days
        
        Returns:
            Dictionary of region -> list of unattached volumes
        """
        cutoff_date = datetime.now() - timedelta(days=min_age_days)
        results = {}
        
        for region in self.regions:
            try:
                ec2 = boto3.client('ec2', region_name=region)
                
                # Get all available (unattached) volumes
                volumes = ec2.describe_volumes(
                    Filters=[{'Name': 'status', 'Values': ['available']}]
                )['Volumes']
                
                unattached_volumes = []
                
                for volume in volumes:
                    volume_age = datetime.now() - volume['CreateTime'].replace(tzinfo=None)
                    
                    if volume_age.days >= min_age_days:
                        # Calculate monthly cost
                        size_gb = volume['Size']
                        volume_type = volume['VolumeType']
                        
                        # Pricing per GB-month (us-east-1 rates)
                        pricing = {
                            'gp2': 0.10,
                            'gp3': 0.08,
                            'io1': 0.125,
                            'io2': 0.125,
                            'st1': 0.045,
                            'sc1': 0.015
                        }
                        
                        monthly_cost = size_gb * pricing.get(volume_type, 0.10)
                        
                        volume_info = {
                            'volume_id': volume['VolumeId'],
                            'size_gb': size_gb,
                            'volume_type': volume_type,
                            'create_time': volume['CreateTime'].isoformat(),
                            'age_days': volume_age.days,
                            'monthly_cost_usd': round(monthly_cost, 2),
                            'annual_waste_usd': round(monthly_cost * 12, 2),
                            'tags': {tag['Key']: tag['Value'] 
                                   for tag in volume.get('Tags', [])}
                        }
                        
                        unattached_volumes.append(volume_info)
                        self.cost_savings['unattached_volumes'] += monthly_cost
                
                if unattached_volumes:
                    results[region] = unattached_volumes
                    print(f"✓ Found {len(unattached_volumes)} unattached volumes in {region}")
                    
            except Exception as e:
                print(f"✗ Error scanning {region}: {str(e)}")
                
        return results
    
    def create_snapshots_before_deletion(
        self, 
        volumes: List[Dict], 
        region: str
    ) -> Dict[str, str]:
        """
        Create final snapshots before deleting volumes
        
        Returns:
            Dictionary mapping volume_id -> snapshot_id
        """
        ec2 = boto3.client('ec2', region_name=region)
        snapshot_map = {}
        
        for volume in volumes:
            volume_id = volume['volume_id']
            
            try:
                # Create snapshot with descriptive tags
                snapshot = ec2.create_snapshot(
                    VolumeId=volume_id,
                    Description=f"Final snapshot before cleanup - {datetime.now().strftime('%Y-%m-%d')}",
                    TagSpecifications=[{
                        'ResourceType': 'snapshot',
                        'Tags': [
                            {'Key': 'Name', 'Value': f"cleanup-{volume_id}"},
                            {'Key': 'OriginalVolumeId', 'Value': volume_id},
                            {'Key': 'CleanupDate', 'Value': datetime.now().isoformat()},
                            {'Key': 'RetentionPolicy', 'Value': '90-days'}
                        ]
                    }]
                )
                
                snapshot_map[volume_id] = snapshot['SnapshotId']
                print(f"✓ Created snapshot {snapshot['SnapshotId']} for {volume_id}")
                
            except Exception as e:
                print(f"✗ Failed to snapshot {volume_id}: {str(e)}")
                
        return snapshot_map
    
    def delete_volumes_with_safety(
        self, 
        volumes: List[Dict], 
        region: str,
        require_snapshot: bool = True,
        dry_run: bool = True
    ) -> Dict[str, Any]:
        """
        Safely delete volumes with optional snapshot creation
        
        Args:
            require_snapshot: Create snapshot before deletion
            dry_run: If True, only simulate deletion
        """
        ec2 = boto3.client('ec2', region_name=region)
        results = {
            'deleted': [],
            'failed': [],
            'snapshotted': []
        }
        
        # Create snapshots first if required
        if require_snapshot and not dry_run:
            snapshot_map = self.create_snapshots_before_deletion(volumes, region)
            results['snapshotted'] = list(snapshot_map.values())
        
        # Delete volumes
        for volume in volumes:
            volume_id = volume['volume_id']
            
            try:
                if dry_run:
                    print(f"[DRY RUN] Would delete {volume_id} "
                          f"({volume['size_gb']}GB, ${volume['monthly_cost_usd']}/mo)")
                else:
                    ec2.delete_volume(VolumeId=volume_id)
                    print(f"✓ Deleted {volume_id}")
                    
                results['deleted'].append(volume_id)
                
            except Exception as e:
                print(f"✗ Failed to delete {volume_id}: {str(e)}")
                results['failed'].append({
                    'volume_id': volume_id,
                    'error': str(e)
                })
        
        return results
    
    def generate_cleanup_report(
        self, 
        unattached_volumes: Dict[str, List[Dict]]
    ) -> str:
        """
        Generate a detailed cleanup report
        """
        total_volumes = sum(len(vols) for vols in unattached_volumes.values())
        total_size_gb = sum(
            vol['size_gb'] 
            for vols in unattached_volumes.values() 
            for vol in vols
        )
        monthly_waste = sum(
            vol['monthly_cost_usd'] 
            for vols in unattached_volumes.values() 
            for vol in vols
        )
        annual_waste = monthly_waste * 12
        
        report = f"""
╔═══════════════════════════════════════════════════════════╗
║          AWS EBS UNATTACHED VOLUMES REPORT                ║
╚═══════════════════════════════════════════════════════════╝

📊 Summary:
  • Total unattached volumes: {total_volumes}
  • Total capacity wasted: {total_size_gb:,} GB
  • Monthly waste: ${monthly_waste:,.2f}
  • Annual waste: ${annual_waste:,.2f}
  • Regions affected: {len(unattached_volumes)}

📍 Breakdown by Region:
"""
        
        for region, volumes in sorted(unattached_volumes.items()):
            region_cost = sum(v['monthly_cost_usd'] for v in volumes)
            report += f"\n  {region}:\n"
            report += f"    - Volumes: {len(volumes)}\n"
            report += f"    - Monthly cost: ${region_cost:,.2f}\n"
            
            # Show top 3 most expensive volumes
            top_volumes = sorted(volumes, 
                               key=lambda x: x['monthly_cost_usd'], 
                               reverse=True)[:3]
            
            for vol in top_volumes:
                report += f"      • {vol['volume_id']}: {vol['size_gb']}GB " \
                         f"({vol['volume_type']}) - ${vol['monthly_cost_usd']}/mo, " \
                         f"unattached for {vol['age_days']} days\n"
        
        report += "\n💡 Recommended Actions:\n"
        report += "  1. Review volumes with 'Name' or 'Environment' tags\n"
        report += "  2. Create final snapshots for volumes > 30 days old\n"
        report += "  3. Delete volumes unattached > 60 days (after snapshot)\n"
        report += "  4. Set up 'DeleteOnTermination' for future instances\n"
        
        return report

# Example usage
optimizer = EBSVolumeOptimizer(regions=['us-east-1', 'eu-west-1', 'ap-southeast-1'])

# Find unattached volumes (older than 7 days)
unattached = optimizer.find_unattached_volumes(min_age_days=7)

# Generate and print report
report = optimizer.generate_cleanup_report(unattached)
print(report)

# Cleanup workflow (with dry run first)
for region, volumes in unattached.items():
    # Filter: only delete volumes > 60 days old
    old_volumes = [v for v in volumes if v['age_days'] > 60]
    
    if old_volumes:
        # First, do a dry run
        print(f"\n🔍 Dry run for {region}:")
        optimizer.delete_volumes_with_safety(
            old_volumes, 
            region, 
            require_snapshot=True,
            dry_run=True
        )
        
        # Uncomment to actually delete (after review):
        # optimizer.delete_volumes_with_safety(
        #     old_volumes, 
        #     region, 
        #     require_snapshot=True,
        #     dry_run=False
        # )

2. gp2 to gp3 Migration for Instant Savings

# Python
# Automated gp2 to gp3 migration with cost analysis
import boto3
from typing import List, Dict, Tuple

class GP2ToGP3Migrator:
    def __init__(self, region: str = 'us-east-1'):
        self.ec2 = boto3.client('ec2', region_name=region)
        self.region = region
        
        # Pricing per GB-month
        self.gp2_price = 0.10
        self.gp3_price = 0.08
        
    def analyze_gp2_volumes(self) -> List[Dict]:
        """
        Find all gp2 volumes and calculate migration savings
        """
        volumes = self.ec2.describe_volumes(
            Filters=[{'Name': 'volume-type', 'Values': ['gp2']}]
        )['Volumes']
        
        migration_candidates = []
        
        for volume in volumes:
            size_gb = volume['Size']
            volume_id = volume['VolumeId']
            
            # gp2 performance characteristics
            # 3 IOPS per GB, min 100, max 16,000
            gp2_iops = min(max(size_gb * 3, 100), 16000)
            
            # gp3 baseline: 3,000 IOPS, 125 MB/s (included free)
            gp3_baseline_iops = 3000
            
            # Calculate costs
            current_monthly_cost = size_gb * self.gp2_price
            gp3_storage_cost = size_gb * self.gp3_price
            
            # Additional IOPS cost (if gp2 IOPS > gp3 baseline)
            extra_iops_needed = max(0, gp2_iops - gp3_baseline_iops)
            iops_cost = (extra_iops_needed / 1000) * 0.005  # $0.005 per IOPS-month
            
            new_monthly_cost = gp3_storage_cost + iops_cost
            monthly_savings = current_monthly_cost - new_monthly_cost
            annual_savings = monthly_savings * 12
            
            # Determine if migration is beneficial
            is_beneficial = monthly_savings > 0
            
            migration_candidates.append({
                'volume_id': volume_id,
                'size_gb': size_gb,
                'current_type': 'gp2',
                'gp2_iops': gp2_iops,
                'gp3_iops_needed': max(gp2_iops, gp3_baseline_iops),
                'current_monthly_cost': round(current_monthly_cost, 2),
                'new_monthly_cost': round(new_monthly_cost, 2),
                'monthly_savings': round(monthly_savings, 2),
                'annual_savings': round(annual_savings, 2),
                'savings_percentage': round((monthly_savings / current_monthly_cost * 100), 1),
                'is_beneficial': is_beneficial,
                'state': volume['State'],
                'attachments': volume.get('Attachments', []),
                'tags': {tag['Key']: tag['Value'] for tag in volume.get('Tags', [])}
            })
        
        return sorted(migration_candidates, 
                     key=lambda x: x['annual_savings'], 
                     reverse=True)
    
    def migrate_volume(
        self, 
        volume_id: str, 
        target_iops: int = 3000,
        target_throughput: int = 125,
        dry_run: bool = True
    ) -> Dict:
        """
        Migrate a single volume from gp2 to gp3
        
        Note: This is a zero-downtime operation
        """
        try:
            if dry_run:
                print(f"[DRY RUN] Would migrate {volume_id} to gp3 "
                      f"with {target_iops} IOPS and {target_throughput} MB/s")
                return {
                    'status': 'dry_run',
                    'volume_id': volume_id
                }
            
            # Modify volume type
            response = self.ec2.modify_volume(
                VolumeId=volume_id,
                VolumeType='gp3',
                Iops=target_iops,
                Throughput=target_throughput
            )
            
            modification_state = response['VolumeModification']
            
            print(f"✓ Migration initiated for {volume_id}")
            print(f"  Status: {modification_state['ModificationState']}")
            print(f"  Progress: {modification_state.get('Progress', 0)}%")
            
            return {
                'status': 'success',
                'volume_id': volume_id,
                'modification_id': modification_state['ModificationId'],
                'state': modification_state['ModificationState']
            }
            
        except Exception as e:
            print(f"✗ Failed to migrate {volume_id}: {str(e)}")
            return {
                'status': 'failed',
                'volume_id': volume_id,
                'error': str(e)
            }
    
    def check_migration_status(self, volume_id: str) -> Dict:
        """
        Check the status of an ongoing volume modification
        """
        try:
            modifications = self.ec2.describe_volumes_modifications(
                VolumeIds=[volume_id]
            )['VolumesModifications']
            
            if not modifications:
                return {'status': 'no_modifications'}
            
            latest = modifications[0]
            
            return {
                'volume_id': volume_id,
                'state': latest['ModificationState'],
                'progress': latest.get('Progress', 0),
                'start_time': latest['StartTime'].isoformat(),
                'target_volume_type': latest.get('TargetVolumeType'),
                'target_iops': latest.get('TargetIops'),
                'target_throughput': latest.get('TargetThroughput')
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'error': str(e)
            }
    
    def generate_migration_plan(self, candidates: List[Dict]) -> str:
        """
        Generate a migration execution plan
        """
        total_volumes = len(candidates)
        beneficial = [c for c in candidates if c['is_beneficial']]
        total_annual_savings = sum(c['annual_savings'] for c in beneficial)
        
        plan = f"""
╔═══════════════════════════════════════════════════════════╗
║          GP2 TO GP3 MIGRATION PLAN                        ║
╚═══════════════════════════════════════════════════════════╝

📊 Summary:
  • Total gp2 volumes found: {total_volumes}
  • Beneficial migrations: {len(beneficial)}
  • Total annual savings: ${total_annual_savings:,.2f}
  • Average savings per volume: ${total_annual_savings/len(beneficial):,.2f}

🎯 Top 10 Volumes by Savings:
"""
        
        for i, candidate in enumerate(beneficial[:10], 1):
            plan += f"\n  {i}. {candidate['volume_id']}\n"
            plan += f"     Size: {candidate['size_gb']} GB\n"
            plan += f"     Current: gp2 ({candidate['gp2_iops']} IOPS)\n"
            plan += f"     New: gp3 ({candidate['gp3_iops_needed']} IOPS)\n"
            plan += f"     Savings: ${candidate['monthly_savings']}/mo " \
                   f"(${candidate['annual_savings']}/yr, {candidate['savings_percentage']}%)\n"
            
            if candidate['attachments']:
                instance_id = candidate['attachments'][0]['InstanceId']
                plan += f"     Attached to: {instance_id}\n"
        
        plan += "\n📋 Migration Steps:\n"
        plan += "  1. Start with non-production volumes for testing\n"
        plan += "  2. Migrate during maintenance window (zero downtime, but monitor)\n"
        plan += "  3. Monitor IOPS/throughput for 24 hours post-migration\n"
        plan += "  4. Proceed with production volumes in batches\n"
        plan += "  5. Track savings in AWS Cost Explorer\n"
        
        plan += "\n⚠️  Notes:\n"
        plan += "  • Migration is zero-downtime but volume enters 'optimizing' state\n"
        plan += "  • Full performance available immediately, optimization continues in background\n"
        plan += "  • Can only modify a volume once every 6 hours\n"
        plan += "  • No rollback after 6 hours - plan carefully\n"
        
        return plan

# Example usage
migrator = GP2ToGP3Migrator(region='us-east-1')

# Analyze all gp2 volumes
print("🔍 Analyzing gp2 volumes...")
candidates = migrator.analyze_gp2_volumes()

# Generate migration plan
plan = migrator.generate_migration_plan(candidates)
print(plan)

# Execute migrations (start with top 5)
print("\n🚀 Starting migrations (dry run)...")
for candidate in candidates[:5]:
    if candidate['is_beneficial']:
        result = migrator.migrate_volume(
            candidate['volume_id'],
            target_iops=candidate['gp3_iops_needed'],
            dry_run=True  # Set to False to actually migrate
        )
        
        # Check status after a few minutes
        # status = migrator.check_migration_status(candidate['volume_id'])
        # print(f"Migration status: {status}")

3. Snapshot Lifecycle Management with Amazon DLM

python

# Python
# Automated snapshot lifecycle policy implementation
import boto3
import json
from datetime import datetime

class SnapshotLifecycleManager:
    def __init__(self, region: str = 'us-east-1'):
        self.dlm = boto3.client('dlm', region_name=region)
        self.ec2 = boto3.client('ec2', region_name=region)
        self.region = region
        
    def create_snapshot_policy(
        self,
        policy_name: str,
        target_tags: dict,
        retention_days: int = 7,
        schedule_interval: int = 24,
        schedule_times: list = ['03:00'],
        copy_to_regions: list = None
    ) -> str:
        """
        Create a DLM snapshot lifecycle policy
        
        Args:
            policy_name: Name for the policy
            target_tags: Tags to identify volumes (e.g., {'Backup': 'Daily'})
            retention_days: How many days to keep snapshots
            schedule_interval: Hours between snapshots
            schedule_times: Times to create snapshots (UTC)
            copy_to_regions: Optional list of regions for DR copies
        """
        
        # Build schedule configuration
        schedule = {
            'Name': f'{policy_name}-schedule',
            'CreateRule': {
                'Interval': schedule_interval,
                'IntervalUnit': 'HOURS',
                'Times': schedule_times
            },
            'RetainRule': {
                'Count': retention_days if retention_days <= 7 else None,
                'Interval': 1 if retention_days > 7 else None,
                'IntervalUnit': 'DAYS' if retention_days > 7 else None
            },
            'TagsToAdd': [
                {'Key': 'CreatedBy', 'Value': 'DLM'},
                {'Key': 'PolicyName', 'Value': policy_name}
            ],
            'CopyTags': True
        }
        
        # Add cross-region copy if specified
        if copy_to_regions:
            schedule['CrossRegionCopyRules'] = [
                {
                    'TargetRegion': region,
                    'Encrypted': True,
                    'CopyTags': True,
                    'RetainRule': {
                        'Interval': 7,
                        'IntervalUnit': 'DAYS'
                    }
                }
                for region in copy_to_regions
            ]
        
        # Build policy document
        policy_details = {
            'PolicyType': 'EBS_SNAPSHOT_MANAGEMENT',
            'ResourceTypes': ['VOLUME'],
            'TargetTags': [
                {'Key': k, 'Value': v} 
                for k, v in target_tags.items()
            ],
            'Schedules': [schedule]
        }
        
        try:
            response = self.dlm.create_lifecycle_policy(
                ExecutionRoleArn=f'arn:aws:iam::{self._get_account_id()}:role/AWSDataLifecycleManagerDefaultRole',
                Description=f'Automated snapshot management: {policy_name}',
                State='ENABLED',
                PolicyDetails=policy_details
            )
            
            policy_id = response['PolicyId']
            print(f"✓ Created snapshot policy: {policy_id}")
            return policy_id
            
        except Exception as e:
            print(f"✗ Failed to create policy: {str(e)}")
            raise
    
    def _get_account_id(self) -> str:
        """Get AWS account ID"""
        sts = boto3.client('sts')
        return sts.get_caller_identity()['Account']
    
    def analyze_snapshot_waste(self, max_age_days: int = 90) -> Dict:
        """
        Find old or redundant snapshots for cleanup
        """
        snapshots = self.ec2.describe_snapshots(OwnerIds=['self'])['Snapshots']
        
        analysis = {
            'total_snapshots': len(snapshots),
            'old_snapshots': [],
            'orphaned_snapshots': [],
            'large_snapshots': [],
            'total_size_gb': 0,
            'estimated_monthly_cost': 0
        }
        
        # Get all current volume IDs
        volumes = self.ec2.describe_volumes()['Volumes']
        current_volume_ids = {v['VolumeId'] for v in volumes}
        
        for snapshot in snapshots:
            snapshot_age = (datetime.now() - snapshot['StartTime'].replace(tzinfo=None)).days
            size_gb = snapshot['VolumeSize']
            
            analysis['total_size_gb'] += size_gb
            
            # Flag old snapshots
            if snapshot_age > max_age_days:
                analysis['old_snapshots'].append({
                    'snapshot_id': snapshot['SnapshotId'],
                    'age_days': snapshot_age,
                    'size_gb': size_gb,
                    'volume_id': snapshot.get('VolumeId', 'N/A')
                })
            
            # Flag orphaned snapshots (volume doesn't exist anymore)
            volume_id = snapshot.get('VolumeId')
            if volume_id and volume_id not in current_volume_ids:
                analysis['orphaned_snapshots'].append({
                    'snapshot_id': snapshot['SnapshotId'],
                    'volume_id': volume_id,
                    'size_gb': size_gb,
                    'age_days': snapshot_age
                })
            
            # Flag large snapshots for archive consideration
            if size_gb > 500:  # > 500 GB
                analysis['large_snapshots'].append({
                    'snapshot_id': snapshot['SnapshotId'],
                    'size_gb': size_gb,
                    'age_days': snapshot_age
                })
        
        # Calculate costs ($0.05 per GB-month)
        analysis['estimated_monthly_cost'] = round(analysis['total_size_gb'] * 0.05, 2)
        
        # Potential savings from cleanup
        old_snapshot_size = sum(s['size_gb'] for s in analysis['old_snapshots'])
        analysis['potential_monthly_savings'] = round(old_snapshot_size * 0.05, 2)
        
        return analysis
    
    def archive_old_snapshots(
        self, 
        snapshot_ids: List[str],
        dry_run: bool = True
    ) -> Dict:
        """
        Move snapshots to archive tier (75% cost reduction)
        
        Archive tier: $0.0125 per GB-month (vs $0.05 standard)
        Retrieval: 24-72 hours
        """
        results = {'archived': [], 'failed': []}
        
        for snapshot_id in snapshot_ids:
            try:
                if dry_run:
                    print(f"[DRY RUN] Would archive {snapshot_id}")
                else:
                    self.ec2.modify_snapshot_tier(
                        SnapshotId=snapshot_id,
                        StorageTier='archive'
                    )
                    print(f"✓ Archived {snapshot_id}")
                
                results['archived'].append(snapshot_id)
                
            except Exception as e:
                print(f"✗ Failed to archive {snapshot_id}: {str(e)}")
                results['failed'].append({
                    'snapshot_id': snapshot_id,
                    'error': str(e)
                })
        
        return results
    
    def generate_snapshot_report(self, analysis: Dict) -> str:
        """
        Generate snapshot optimization report
        """
        report = f"""
╔═══════════════════════════════════════════════════════════╗
║          EBS SNAPSHOT OPTIMIZATION REPORT                 ║
╚═══════════════════════════════════════════════════════════╝

📊 Current State:
  • Total snapshots: {analysis['total_snapshots']:,}
  • Total storage: {analysis['total_size_gb']:,} GB
  • Monthly cost: ${analysis['estimated_monthly_cost']:,}
  • Annual cost: ${analysis['estimated_monthly_cost'] * 12:,}

🗑️  Cleanup Opportunities:
  • Old snapshots (>90 days): {len(analysis['old_snapshots'])}
  • Orphaned snapshots: {len(analysis['orphaned_snapshots'])}
  • Large snapshots (>500GB): {len(analysis['large_snapshots'])}
  • Potential monthly savings: ${analysis['potential_monthly_savings']}

📦 Top Orphaned Snapshots:
"""
        
        for snap in sorted(analysis['orphaned_snapshots'], 
                          key=lambda x: x['size_gb'], 
                          reverse=True)[:5]:
            report += f"\n  • {snap['snapshot_id']}\n"
            report += f"    Volume: {snap['volume_id']} (deleted)\n"
            report += f"    Size: {snap['size_gb']} GB, Age: {snap['age_days']} days\n"
            report += f"    Cost: ${snap['size_gb'] * 0.05:.2f}/month\n"
        
        report += "\n💡 Recommendations:\n"
        report += "  1. Delete snapshots of deleted volumes (after verification)\n"
        report += "  2. Archive snapshots >180 days old (75% cost reduction)\n"
        report += "  3. Implement DLM policies for automated retention\n"
        report += "  4. Set up 7/14/30-day retention tiers based on criticality\n"
        
        return report

# Example usage
slm = SnapshotLifecycleManager(region='us-east-1')

# Create production backup policy
production_policy = slm.create_snapshot_policy(
    policy_name='production-daily-backup',
    target_tags={'Environment': 'production', 'Backup': 'Daily'},
    retention_days=30,
    schedule_interval=24,
    schedule_times=['03:00'],
    copy_to_regions=['eu-west-1']  # DR copy
)

# Create development backup policy (less frequent, shorter retention)
dev_policy = slm.create_snapshot_policy(
    policy_name='development-weekly-backup',
    target_tags={'Environment': 'development'},
    retention_days=7,
    schedule_interval=168,  # Weekly
    schedule_times=['02:00']
)

# Analyze existing snapshots for waste
print("\n🔍 Analyzing snapshot waste...")
analysis = slm.analyze_snapshot_waste(max_age_days=90)
report = slm.generate_snapshot_report(analysis)
print(report)

# Archive old snapshots
old_snapshot_ids = [s['snapshot_id'] for s in analysis['old_snapshots']]
if old_snapshot_ids:
    print(f"\n📦 Archiving {len(old_snapshot_ids)} old snapshots...")
    slm.archive_old_snapshots(old_snapshot_ids, dry_run=True)

4. Volume Right-Sizing with CloudWatch Metrics

# Python
# Automated EBS volume right-sizing analysis
import boto3
from datetime import datetime, timedelta
from typing import Dict, List

class VolumeRightSizer:
    def __init__(self, region: str = 'us-east-1'):
        self.ec2 = boto3.client('ec2', region_name=region)
        self.cloudwatch = boto3.client('cloudwatch', region_name=region)
        self.region = region
        
    def analyze_volume_utilization(
        self, 
        volume_id: str,
        days_back: int = 14
    ) -> Dict:
        """
        Analyze volume IOPS and throughput utilization
        """
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(days=days_back)
        
        metrics_to_check = [
            'VolumeReadOps',
            'VolumeWriteOps',
            'VolumeReadBytes',
            'VolumeWriteBytes'
        ]
        
        utilization = {}
        
        for metric_name in metrics_to_check:
            try:
                response = self.cloudwatch.get_metric_statistics(
                    Namespace='AWS/EBS',
                    MetricName=metric_name,
                    Dimensions=[
                        {'Name': 'VolumeId', 'Value': volume_id}
                    ],
                    StartTime=start_time,
                    EndTime=end_time,
                    Period=3600,  # 1 hour
                    Statistics=['Average', 'Maximum']
                )
                
                if response['Datapoints']:
                    datapoints = response['Datapoints']
                    utilization[metric_name] = {
                        'average': sum(d['Average'] for d in datapoints) / len(datapoints),
                        'maximum': max(d['Maximum'] for d in datapoints)
                    }
                else:
                    utilization[metric_name] = {'average': 0, 'maximum': 0}
                    
            except Exception as e:
                print(f"Error getting metric {metric_name}: {str(e)}")
                utilization[metric_name] = {'average': 0, 'maximum': 0}
        
        # Calculate IOPS
        read_ops = utilization.get('VolumeReadOps', {})
        write_ops = utilization.get('VolumeWriteOps', {})
        
        avg_iops = (read_ops.get('average', 0) + write_ops.get('average', 0)) / 60
        max_iops = (read_ops.get('maximum', 0) + write_ops.get('maximum', 0)) / 60
        
        # Calculate throughput (MB/s)
        read_bytes = utilization.get('VolumeReadBytes', {})
        write_bytes = utilization.get('VolumeWriteBytes', {})
        
        avg_throughput_mb = (
            read_bytes.get('average', 0) + 
            write_bytes.get('average', 0)
        ) / (1024 * 1024 * 60)
        
        max_throughput_mb = (
            read_bytes.get('maximum', 0) + 
            write_bytes.get('maximum', 0)
        ) / (1024 * 1024 * 60)
        
        return {
            'volume_id': volume_id,
            'avg_iops': round(avg_iops, 2),
            'max_iops': round(max_iops, 2),
            'avg_throughput_mb_per_sec': round(avg_throughput_mb, 2),
            'max_throughput_mb_per_sec': round(max_throughput_mb, 2),
            'analysis_period_days': days_back
        }
    
    def recommend_volume_size(
        self, 
        volume_id: str,
        utilization: Dict
    ) -> Dict:
        """
        Recommend optimal volume configuration based on utilization
        """
        # Get current volume configuration
        volume = self.ec2.describe_volumes(
            VolumeIds=[volume_id]
        )['Volumes'][0]
        
        current_type = volume['VolumeType']
        current_size = volume['Size']
        current_iops = volume.get('Iops', 0)
        current_throughput = volume.get('Throughput', 0)
        
        # Calculate current cost
        pricing = {
            'gp2': 0.10,
            'gp3': 0.08,
            'io1': 0.125,
            'io2': 0.125,
            'st1': 0.045,
            'sc1': 0.015
        }
        
        current_monthly_cost = current_size * pricing.get(current_type, 0.10)
        
        if current_type == 'gp3':
            # Add IOPS and throughput costs
            if current_iops > 3000:
                current_monthly_cost += ((current_iops - 3000) / 1000) * 0.005
            if current_throughput > 125:
                current_monthly_cost += ((current_throughput - 125) / 1000) * 0.04
        
        # Determine if volume is over-provisioned
        max_iops = utilization['max_iops']
        max_throughput = utilization['max_throughput_mb_per_sec']
        
        recommendations = []
        estimated_new_cost = current_monthly_cost
        
        # Check for gp3 optimization
        if current_type == 'gp3':
            # Check if provisioned IOPS are underutilized
            if current_iops > 3000 and max_iops < current_iops * 0.5:
                recommended_iops = max(3000, int(max_iops * 1.2))  # 20% buffer
                iops_savings = ((current_iops - recommended_iops) / 1000) * 0.005
                estimated_new_cost -= iops_savings
                
                recommendations.append({
                    'type': 'reduce_iops',
                    'current': current_iops,
                    'recommended': recommended_iops,
                    'monthly_savings': round(iops_savings, 2)
                })
            
            # Check if provisioned throughput is underutilized
            if current_throughput > 125 and max_throughput < current_throughput * 0.5:
                recommended_throughput = max(125, int(max_throughput * 1.2))
                throughput_savings = ((current_throughput - recommended_throughput) / 1000) * 0.04
                estimated_new_cost -= throughput_savings
                
                recommendations.append({
                    'type': 'reduce_throughput',
                    'current': current_throughput,
                    'recommended': recommended_throughput,
                    'monthly_savings': round(throughput_savings, 2)
                })
        
        # Check if volume type should be changed
        if current_type == 'gp2' and max_iops <= 16000:
            gp3_cost = current_size * 0.08
            # Determine if extra IOPS needed
            if max_iops > 3000:
                gp3_cost += ((max_iops - 3000) / 1000) * 0.005
            
            type_savings = current_monthly_cost - gp3_cost
            if type_savings > 0:
                recommendations.append({
                    'type': 'change_volume_type',
                    'current': 'gp2',
                    'recommended': 'gp3',
                    'monthly_savings': round(type_savings, 2)
                })
                estimated_new_cost = gp3_cost
        
        # Check for very low utilization (consider sc1 or st1)
        if max_iops < 250 and current_type in ['gp2', 'gp3', 'io1', 'io2']:
            sc1_cost = current_size * 0.015
            type_savings = current_monthly_cost - sc1_cost
            
            recommendations.append({
                'type': 'consider_cold_storage',
                'current': current_type,
                'recommended': 'sc1',
                'note': 'Low IOPS usage detected - consider Cold HDD for significant savings',
                'monthly_savings': round(type_savings, 2)
            })
        
        total_savings = current_monthly_cost - estimated_new_cost
        
        return {
            'volume_id': volume_id,
            'current_config': {
                'type': current_type,
                'size_gb': current_size,
                'iops': current_iops,
                'throughput': current_throughput,
                'monthly_cost': round(current_monthly_cost, 2)
            },
            'utilization': utilization,
            'recommendations': recommendations,
            'estimated_monthly_cost': round(estimated_new_cost, 2),
            'monthly_savings': round(total_savings, 2),
            'annual_savings': round(total_savings * 12, 2)
        }
    
    def scan_all_volumes(self) -> List[Dict]:
        """
        Scan all volumes and generate optimization recommendations
        """
        volumes = self.ec2.describe_volumes()['Volumes']
        results = []
        
        print(f"🔍 Analyzing {len(volumes)} volumes...")
        
        for i, volume in enumerate(volumes, 1):
            volume_id = volume['VolumeId']
            print(f"  [{i}/{len(volumes)}] Analyzing {volume_id}...")
            
            try:
                utilization = self.analyze_volume_utilization(volume_id)
                recommendation = self.recommend_volume_size(volume_id, utilization)
                
                if recommendation['recommendations']:
                    results.append(recommendation)
                    
            except Exception as e:
                print(f"    ✗ Error: {str(e)}")
        
        return sorted(results, key=lambda x: x['annual_savings'], reverse=True)

# Example usage
rightsizer = VolumeRightSizer(region='us-east-1')

# Scan all volumes for optimization opportunities
print("🔍 Starting comprehensive volume analysis...\n")
recommendations = rightsizer.scan_all_volumes()

# Display results
print(f"\n╔═══════════════════════════════════════════════════════════╗")
print(f"║          VOLUME RIGHT-SIZING RECOMMENDATIONS              ║")
print(f"╚═══════════════════════════════════════════════════════════╝\n")

total_savings = sum(r['annual_savings'] for r in recommendations)
print(f"📊 Found {len(recommendations)} optimization opportunities")
print(f"💰 Total potential annual savings: ${total_savings:,.2f}\n")

for i, rec in enumerate(recommendations[:10], 1):
    print(f"{i}. Volume: {rec['volume_id']}")
    print(f"   Current: {rec['current_config']['type']} "
          f"({rec['current_config']['size_gb']}GB) - "
          f"${rec['current_config']['monthly_cost']}/mo")
    print(f"   Utilization: {rec['utilization']['avg_iops']:.0f} avg IOPS, "
          f"{rec['utilization']['max_iops']:.0f} max IOPS")
    
    for recommendation in rec['recommendations']:
        print(f"   → {recommendation['type']}: ${recommendation['monthly_savings']}/mo savings")
        if recommendation['type'] == 'reduce_iops':
            print(f"      Reduce IOPS from {recommendation['current']} to {recommendation['recommended']}")
        elif recommendation['type'] == 'change_volume_type':
            print(f"      Migrate from {recommendation['current']} to {recommendation['recommended']}")
    
    print(f"   💡 Annual savings: ${rec['annual_savings']:.2f}\n")

Why It Matters

  • Cost Control: EBS waste is often the largest hidden cost in AWS accounts – easily 20-30% of your total cloud bill
  • Budget Predictability: Uncontrolled volume sprawl makes capacity planning impossible
  • Performance Optimization: Over-provisioned volumes waste money while under-provisioned ones hurt performance
  • Compliance: Automated snapshot policies ensure you meet backup SLAs without manual intervention
  • Quick Wins: gp2 to gp3 migration alone typically saves 15-20% with zero downtime

Try This Week

  1. Run the unattached volume scan – Find your zombie volumes in 5 minutes
  2. Analyze gp2 volumes – Calculate your migration savings potential
  3. Set up one DLM policy – Automate snapshot management for production volumes
  4. Check volume utilization – Review CloudWatch metrics for your top 10 volumes by cost
  5. Enable ‘Delete on Termination’ – Prevent future orphaned volumes

Quick EBS Assessment Script

# Bash
#!/bin/bash
# Quick EBS cost assessment across all regions

echo "╔═══════════════════════════════════════════════════════════╗"
echo "║          AWS EBS QUICK COST ASSESSMENT                    ║"
echo "╚═══════════════════════════════════════════════════════════╝"
echo ""

# Get all regions
regions=$(aws ec2 describe-regions --query 'Regions[].RegionName' --output text)

total_unattached=0
total_gp2=0
total_snapshots=0

for region in $regions; do
    echo "📍 Scanning $region..."
    
    # Count unattached volumes
    unattached=$(aws ec2 describe-volumes \
        --region $region \
        --filters "Name=status,Values=available" \
        --query 'length(Volumes)' \
        --output text 2>/dev/null || echo "0")
    
    # Count gp2 volumes
    gp2=$(aws ec2 describe-volumes \
        --region $region \
        --filters "Name=volume-type,Values=gp2" \
        --query 'length(Volumes)' \
        --output text 2>/dev/null || echo "0")
    
    # Count snapshots
    snapshots=$(aws ec2 describe-snapshots \
        --region $region \
        --owner-ids self \
        --query 'length(Snapshots)' \
        --output text 2>/dev/null || echo "0")
    
    if [ "$unattached" != "0" ] || [ "$gp2" != "0" ] || [ "$snapshots" != "0" ]; then
        echo "  Unattached volumes: $unattached"
        echo "  gp2 volumes: $gp2"
        echo "  Snapshots: $snapshots"
        echo ""
    fi
    
    total_unattached=$((total_unattached + unattached))
    total_gp2=$((total_gp2 + gp2))
    total_snapshots=$((total_snapshots + snapshots))
done

echo "╔═══════════════════════════════════════════════════════════╗"
echo "║          SUMMARY                                          ║"
echo "╚═══════════════════════════════════════════════════════════╝"
echo ""
echo "🗑️  Unattached volumes: $total_unattached"
echo "📀 gp2 volumes (migration candidates): $total_gp2"
echo "📦 Total snapshots: $total_snapshots"
echo ""
echo "💡 Estimated quick wins:"
echo "   • Unattached volumes: Save ~\$$(($total_unattached * 8))/month"
echo "   • gp2→gp3 migration: Save ~\$$(($total_gp2 * 2))/month"
echo ""
echo "🎯 Next steps:"
echo "   1. Review unattached volumes > 30 days old"
echo "   2. Plan gp2 to gp3 migration for production volumes"
echo "   3. Set up DLM policies for automated snapshot management"
echo "   4. Enable 'Delete on Termination' for new instances"

Common EBS Cost Mistakes

  1. Not setting ‘Delete on Termination’: Default is false for root volumes, true for data volumes
  2. Keeping all snapshots forever: “Just in case” mentality leads to massive waste
  3. Ignoring gp3: Still creating new gp2 volumes by habit
  4. No snapshot lifecycle policies: Manual management always fails at scale
  5. Over-provisioning IOPS: Paying for 10,000 IOPS when using 500
  6. Not monitoring unattached volumes: Out of sight, out of mind – until the bill arrives
  7. Forgetting AMI-associated snapshots: Deregistering AMIs doesn’t delete underlying snapshots

Advanced Cost Optimization Strategies

Multi-Region Snapshot Cleanup

# Bash
# Find snapshots older than 180 days across all regions
for region in $(aws ec2 describe-regions --query 'Regions[].RegionName' --output text); do
    echo "Checking $region..."
    aws ec2 describe-snapshots \
        --region $region \
        --owner-ids self \
        --query "Snapshots[?StartTime<='$(date -d '180 days ago' -Iseconds)'].[SnapshotId,VolumeSize,StartTime]" \
        --output table
done

Lambda-based Auto-Cleanup

# Python
# Deploy this as a weekly Lambda function
import boto3
from datetime import datetime, timedelta

def lambda_handler(event, context):
    """
    Weekly cleanup of unattached volumes > 60 days old
    """
    ec2 = boto3.resource('ec2')
    sns = boto3.client('sns')
    
    cutoff = datetime.now() - timedelta(days=60)
    deleted_volumes = []
    
    for volume in ec2.volumes.filter(Filters=[{'Name': 'status', 'Values': ['available']}]):
        if volume.create_time.replace(tzinfo=None) < cutoff:
            # Create final snapshot
            snapshot = volume.create_snapshot(
                Description=f'Pre-cleanup snapshot of {volume.id}'
            )
            
            # Delete volume
            volume.delete()
            deleted_volumes.append({
                'volume_id': volume.id,
                'size': volume.size,
                'snapshot_id': snapshot.id
            })
    
    # Send notification
    if deleted_volumes:
        message = f"Cleaned up {len(deleted_volumes)} unattached volumes\n\n"
        message += "\n".join([f"- {v['volume_id']} ({v['size']}GB) → {v['snapshot_id']}" 
                             for v in deleted_volumes])
        
        sns.publish(
            TopicArn=os.environ['SNS_TOPIC_ARN'],
            Subject='EBS Weekly Cleanup Report',
            Message=message
        )
    
    return {
        'statusCode': 200,
        'body': json.dumps(f'Cleaned up {len(deleted_volumes)} volumes')
    }

Cost Allocation Tags for Chargeback

# Python
# Tag all volumes with cost center information
import boto3

def tag_volumes_by_instance():
    """
    Propagate instance tags to attached volumes
    """
    ec2 = boto3.resource('ec2')
    
    for instance in ec2.instances.all():
        instance_tags = {tag['Key']: tag['Value'] for tag in instance.tags or []}
        
        # Get cost allocation tags
        cost_tags = {
            k: v for k, v in instance_tags.items() 
            if k in ['CostCenter', 'Project', 'Environment', 'Owner']
        }
        
        # Apply to all attached volumes
        for volume in instance.volumes.all():
            volume.create_tags(Tags=[
                {'Key': k, 'Value': v} for k, v in cost_tags.items()
            ])
            
            print(f"✓ Tagged {volume.id} with {len(cost_tags)} cost allocation tags")

Pro Tips

Start with unattached volumes: This is the easiest money to save – no performance testing, no migration planning, just cleanup.

Use AWS Compute Optimizer: It provides ML-powered recommendations for volume right-sizing based on actual usage patterns. Enable it in the AWS Console under “Compute Optimizer”.

Implement tagging on day one: Every volume should have Environment, Project, Owner, and CostCenter tags. Set up tag policies using AWS Organizations to enforce this.

gp3 migration is zero-downtime: You can migrate live production volumes. The change happens online, and full performance is available immediately while optimization continues in the background.

Archive snapshots strategically: Use the archive tier for compliance snapshots you’ll never access. It’s 75% cheaper than standard tier ($0.0125 vs $0.05 per GB-month), but retrieval takes 24-72 hours.

Set up billing alerts: Create CloudWatch alarms for unexpected EBS cost increases. A sudden spike usually means someone created a large volume or io2 volume by mistake.

Leverage spot instances with instance store: For temporary compute workloads, use instance types with NVMe instance store instead of EBS. It’s included free with the instance.

The 60-6 rule: If a volume has been unattached for 60 days and you’ve modified it less than 6 times, it’s safe to snapshot and delete. Real volumes are actively managed.