import boto3
import os
import time
import logging
import json
import os
import urllib3
http = urllib3.PoolManager()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
WEBHOOK_URL=os.environ['slackWebhookUrl']
SLACK_CHANNEL = os.environ['slackChannel']
def lambda_handler(event, context):
"""lambda function to monitor EBS volumes in AWS account
Args:
event (dict)): JSON-formatted document that contains data for a Lambda function to process
context (object): provide information about the invocation, function, and runtime environment
"""
ec2_client = boto3.client('ec2')
ssm_client = boto3.client('ssm')
# throttling - paginator to iterate over the results
paginator = ec2_client.get_paginator('describe_instances')
page_iterator = paginator.paginate()
instance_ids = []
for page in page_iterator:
for reservation in page['Reservations']:
for instance in reservation['Instances']:
# Get the instance IDs of all EC2 instances in the account
instance_ids.append(instance['InstanceId'])
# Get the EBS volumes attached to each instance
for instance_id in instance_ids:
try:
response = ssm_client.send_command(
InstanceIds=[instance_id],
DocumentName="AWS-RunShellScript",
TimeoutSeconds=240,
Parameters={
"commands": [
"df -h | awk 'NR>1{print $1,$2,$3,$4,$5,$6}'"
]
}
)
# Retrieve the output of the command
command_id = response['Command']['CommandId']
logger.info(f"instance_id: {instance_id}")
tries = 0
output = 'False'
''' In this way, the code is not iteratively making API calls, but adding a backoff in the code so it
can wait before making a new call and also with this logic, it will only retry 10 times
'''
while tries < 10:
tries = tries + 1
try:
# Add a delay to avoid hitting API rate limits
time.sleep(0.5)
result = ssm_client.get_command_invocation(
CommandId=command_id,
InstanceId=instance_id,
)
if result['Status'] == 'InProgress':
continue
output = result['StandardOutputContent']
break
# in case send_command API call fails and command_id doen't exist
except ssm_client.exceptions.InvocationDoesNotExist:
continue
volumes = output.splitlines()
for volume in volumes:
mount, size, used, available, percentage_used, name = volume.split()
try:
#check only nsg named volumes
if name.startswith('/nsg'):
if percentage_used.endswith('%'):
percentage_used = int(percentage_used[:-1])
else:
percentage_used = 0
if int(percentage_used) > 80:
message = f"EBS volume {name} on instance ({instance_id} / {instance_name(instance_id)}) is {percentage_used}% full\n size: {size} | available: {available} | used: {used}"
# Send message to Slack or raise alarm
send_alert(message)
except Exception as e:
logger.error(e)
except Exception as e:
logger.error(e)
# send message to Slack or raise alarm
def send_alert(message):
slack_message = {
'channel': SLACK_CHANNEL,
'text': f"{message}"
}
headers={'Content-Type': 'application/json'}
try:
response = http.request('PUT', WEBHOOK_URL, headers=headers, body=json.dumps(slack_message))
logger.info(f"Status code: {response.status}")
except Exception as e:
logger.error(e)
# tag related information
def instance_name(instance_id):
ec2_resource = boto3.resource('ec2')
instance_info = ec2_resource.instances.filter(
InstanceIds=[
instance_id,
],
)
for instance in instance_info:
logger.info(f"EC2 instance {instance.id} tags")
if len(instance.tags) > 0:
for tag in instance.tags:
if tag["Key"] == "Name":
return tag["Value"]
return "Name tag missing"