Untitled

 avatar
unknown
plain_text
2 years ago
4.7 kB
4
Indexable
import boto3
import os
import time
import logging
import json
import os
import urllib3


http = urllib3.PoolManager()
logger = logging.getLogger()
logger.setLevel(logging.INFO)

WEBHOOK_URL=os.environ['slackWebhookUrl']
SLACK_CHANNEL = os.environ['slackChannel']


def lambda_handler(event, context):
    """lambda function to monitor EBS volumes in AWS account

    Args:
        event (dict)): JSON-formatted document that contains data for a Lambda function to process
        context (object): provide information about the invocation, function, and runtime environment
    """
    ec2_client = boto3.client('ec2')
    ssm_client = boto3.client('ssm')

    # throttling - paginator to iterate over the results 
    paginator = ec2_client.get_paginator('describe_instances')
    page_iterator = paginator.paginate()
    instance_ids = []
    for page in page_iterator:
        for reservation in page['Reservations']:
            for instance in reservation['Instances']:
                # Get the instance IDs of all EC2 instances in the account
                instance_ids.append(instance['InstanceId']) 

    # Get the EBS volumes attached to each instance
    for instance_id in instance_ids:
        try:
            response = ssm_client.send_command( 
                InstanceIds=[instance_id],
                DocumentName="AWS-RunShellScript",
                TimeoutSeconds=240,
                Parameters={
                    "commands": [
                        "df -h | awk 'NR>1{print $1,$2,$3,$4,$5,$6}'"
                    ]
                }
            )

            # Retrieve the output of the command
            command_id = response['Command']['CommandId']
            logger.info(f"instance_id: {instance_id}")
            tries = 0
            output = 'False'
            ''' In this way, the code is not iteratively making API calls, but adding a backoff in the code so it 
             can wait before making a new call and also with this logic, it will only retry 10 times 
            '''
            while tries < 10:
                tries = tries + 1
                try:
                    # Add a delay to avoid hitting API rate limits
                    time.sleep(0.5)  
                    result = ssm_client.get_command_invocation(
                        CommandId=command_id,
                        InstanceId=instance_id,
                    )
                    if result['Status'] == 'InProgress':
                        continue
                    output = result['StandardOutputContent']
                    break
                # in case send_command API call fails and command_id doen't exist
                except ssm_client.exceptions.InvocationDoesNotExist:
                    continue
            
            volumes = output.splitlines()
            for volume in volumes:
                mount, size, used, available, percentage_used, name = volume.split()
                try:
                    #check only nsg named volumes
                    if name.startswith('/nsg'):
                        if percentage_used.endswith('%'):
                            percentage_used = int(percentage_used[:-1])
                        else:
                            percentage_used = 0
                        if int(percentage_used) > 80:
                            message = f"EBS volume {name} on instance ({instance_id} / {instance_name(instance_id)}) is {percentage_used}% full\n size: {size} | available: {available} | used: {used}"
                            # Send message to Slack or raise alarm
                            send_alert(message)
                except Exception as e:
                    logger.error(e)

        except Exception as e:
            logger.error(e)

 # send message to Slack or raise alarm
def send_alert(message):
    slack_message = {
        'channel': SLACK_CHANNEL,
        'text': f"{message}"
    }
    headers={'Content-Type': 'application/json'}
    try:
        response = http.request('PUT', WEBHOOK_URL, headers=headers, body=json.dumps(slack_message))
        logger.info(f"Status code: {response.status}")
    except Exception as e:
        logger.error(e)

# tag related information
def instance_name(instance_id):
    ec2_resource = boto3.resource('ec2')
    instance_info = ec2_resource.instances.filter(
        InstanceIds=[
            instance_id,
        ],
    )
    for instance in instance_info:
        logger.info(f"EC2 instance {instance.id} tags")
        if len(instance.tags) > 0:
            for tag in instance.tags:
                if tag["Key"] == "Name":
                   return tag["Value"]
    
    return "Name tag missing"