2 years ago
4.7 kB
import boto3 import os import time import logging import json import os import urllib3 http = urllib3.PoolManager() logger = logging.getLogger() logger.setLevel(logging.INFO) WEBHOOK_URL=os.environ['slackWebhookUrl'] SLACK_CHANNEL = os.environ['slackChannel'] def lambda_handler(event, context): """lambda function to monitor EBS volumes in AWS account Args: event (dict)): JSON-formatted document that contains data for a Lambda function to process context (object): provide information about the invocation, function, and runtime environment """ ec2_client = boto3.client('ec2') ssm_client = boto3.client('ssm') # throttling - paginator to iterate over the results paginator = ec2_client.get_paginator('describe_instances') page_iterator = paginator.paginate() instance_ids = [] for page in page_iterator: for reservation in page['Reservations']: for instance in reservation['Instances']: # Get the instance IDs of all EC2 instances in the account instance_ids.append(instance['InstanceId']) # Get the EBS volumes attached to each instance for instance_id in instance_ids: try: response = ssm_client.send_command( InstanceIds=[instance_id], DocumentName="AWS-RunShellScript", TimeoutSeconds=240, Parameters={ "commands": [ "df -h | awk 'NR>1{print $1,$2,$3,$4,$5,$6}'" ] } ) # Retrieve the output of the command command_id = response['Command']['CommandId'] logger.info(f"instance_id: {instance_id}") tries = 0 output = 'False' ''' In this way, the code is not iteratively making API calls, but adding a backoff in the code so it can wait before making a new call and also with this logic, it will only retry 10 times ''' while tries < 10: tries = tries + 1 try: # Add a delay to avoid hitting API rate limits time.sleep(0.5) result = ssm_client.get_command_invocation( CommandId=command_id, InstanceId=instance_id, ) if result['Status'] == 'InProgress': continue output = result['StandardOutputContent'] break # in case send_command API call fails and command_id doen't exist except ssm_client.exceptions.InvocationDoesNotExist: continue volumes = output.splitlines() for volume in volumes: mount, size, used, available, percentage_used, name = volume.split() try: #check only nsg named volumes if name.startswith('/nsg'): if percentage_used.endswith('%'): percentage_used = int(percentage_used[:-1]) else: percentage_used = 0 if int(percentage_used) > 80: message = f"EBS volume {name} on instance ({instance_id} / {instance_name(instance_id)}) is {percentage_used}% full\n size: {size} | available: {available} | used: {used}" # Send message to Slack or raise alarm send_alert(message) except Exception as e: logger.error(e) except Exception as e: logger.error(e) # send message to Slack or raise alarm def send_alert(message): slack_message = { 'channel': SLACK_CHANNEL, 'text': f"{message}" } headers={'Content-Type': 'application/json'} try: response = http.request('PUT', WEBHOOK_URL, headers=headers, body=json.dumps(slack_message)) logger.info(f"Status code: {response.status}") except Exception as e: logger.error(e) # tag related information def instance_name(instance_id): ec2_resource = boto3.resource('ec2') instance_info = ec2_resource.instances.filter( InstanceIds=[ instance_id, ], ) for instance in instance_info: logger.info(f"EC2 instance {instance.id} tags") if len(instance.tags) > 0: for tag in instance.tags: if tag["Key"] == "Name": return tag["Value"] return "Name tag missing"
Editor is loading...