Untitled
unknown
plain_text
2 years ago
6.4 kB
11
Indexable
import os
import sys
import boto3
import datetime
from jira import JIRA
import logging
# Logger configuration
log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
logging.basicConfig(level=log_level,
format='[%(asctime)s][%(levelname)s] - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger(__name__)
# Check that all required environment variables are set
try:
JIRA_BASE_URL = os.environ['JIRA_BASE_URL']
JIRA_PROJECT_NAME = os.environ['JIRA_PROJECT_NAME']
JIRA_BOARD_NAME = os.environ['JIRA_BOARD_NAME']
JIRA_PROJECT_KEY = os.environ['JIRA_PROJECT_KEY']
JIRA_SSL_RESPONSIBLE_STREAM = os.environ['JIRA_SSL_RESPONSIBLE_STREAM']
JIRA_EPIC_BASE_NAME = os.environ['JIRA_EPIC_BASE_NAME']
JIRA_PAT = os.environ['JIRA_PAT']
DAYS_TO_EXPIRATION_THRESHOLD = int(os.environ['DAYS_TO_EXPIRATION_THRESHOLD'])
except KeyError as e:
logger.error(f"Environment variable {e} is not set. Please set all required environment variables.")
raise
jira_client = JIRA(server=JIRA_BASE_URL, token_auth=JIRA_PAT)
def check_ssl_expiration(aws_regions: list):
due_certs = {}
def certificate_expiring_soon(cert_detail):
expiration = cert_detail['NotAfter']
days_to_expiration = (expiration - datetime.datetime.now(datetime.timezone.utc)).days
logger.debug(f"{cert_detail['DomainName']} expiring in {days_to_expiration} days")
return days_to_expiration <= DAYS_TO_EXPIRATION_THRESHOLD
def is_imported(cert_detail):
certificate_type = cert_detail['Type']
if certificate_type == 'IMPORTED':
return True
else:
logger.debug(f"{cert_detail['DomainName']} SSL type is {certificate_type}. Skipping...")
return False
# Iterate through regions and record expiring in dict variable due_certs
for region in aws_regions:
session = boto3.Session(region_name=region)
acm_client = session.client('acm')
logger.info(f"Checking SSL certificates in region: {region}")
certificates = acm_client.list_certificates(CertificateStatuses=['ISSUED'])
for cert_detail in certificates['CertificateSummaryList']:
if is_imported(cert_detail) and certificate_expiring_soon(cert_detail):
domain_name = cert_detail['DomainName']
certificate_arn = cert_detail['CertificateArn']
if domain_name not in due_certs:
due_certs[domain_name] = []
due_certs[domain_name].append(certificate_arn)
if due_certs:
logger.info(f"Found {len(due_certs)} certificates due for renewal.")
return due_certs
else:
logger.info("No certificates due for renewal.")
sys.exit(0)
# Look for active tickets that match [DPS][AUTO] SSL renewal - {domain} format
def search_ticket(domain: str):
if "*" in domain:
domain = domain.replace("*", "\\\*")
search_query = f'project = "{JIRA_PROJECT_NAME}" AND status != Done AND status != Completed AND summary ~ "\\\[DPS\\\]\\\[AUTO\\\] SSL renewal - {domain}"'
issues = jira_client.search_issues(jql_str = search_query)
return issues
def get_board_id():
boards = jira_client.boards(name=JIRA_BOARD_NAME, projectKeyOrID=JIRA_PROJECT_KEY)
if boards:
return boards[0].id
else:
logger.error("Board not found.")
sys.exit(1)
def get_active_sprint_id(board_id: str):
sprint = jira_client.sprints(board_id=board_id, state = "active")
if sprint:
return sprint[0].id
else:
logger.error("Sprint not found.")
sys.exit(1)
def get_epic_issue_key():
search_query = f'project = "{JIRA_PROJECT_NAME}" AND summary ~ "{JIRA_EPIC_BASE_NAME}" AND type = Epic AND status != Done AND status != Completed'
issues = jira_client.search_issues(jql_str = search_query)
if not issues:
logger.warning("Epic not found. Tickets will be created without Epic Link")
return
elif len(issues) > 1:
logger.warning("Found more than one matching Epic. The ticket may have incorrect Epic Link")
epic_key = issues[-1].key # get latest epic if more than one found
logger.info(f"Epic key is {epic_key}")
return epic_key
def create_jira_issue(sprint_id: str, domain: str, arns: list, epic_key: str):
fields = {
'project': {
'key': JIRA_PROJECT_KEY
},
'issuetype': {
'name': 'Story'
},
'priority': {
'name': 'Low'
},
'labels': [JIRA_SSL_RESPONSIBLE_STREAM],
'summary': f'[DPS][AUTO] SSL Renewal - {domain}',
'description': 'Certificate ARN\n' + '\n'.join(arns),
'customfield_10005': sprint_id, # customfield_10005 is configured as "Sprint" field in our Jira
'customfield_10006': epic_key # customfield_10006 is configured as "Epic Link" field in our Jira
}
new_issue = jira_client.create_issue(fields)
return f"Ticket ID is {new_issue.key}"
# Check if existing ticket description contains all ARNs of expiring certificates and append if not
def update_description(issue_key: str, arn_list: list):
issue = jira_client.issue(issue_key)
current_description = issue.fields.description
existing_arns = set(current_description.strip().split('\n'))
missing_arns = [arn for arn in arn_list if arn not in existing_arns]
if missing_arns:
new_description = current_description + "\n" + "\n".join(missing_arns)
issue.update(fields={'description': new_description})
return f"Updated description with {len(missing_arns)} missing ARNs."
else:
return "No missing ARNs to update."
def main():
aws_regions = ['us-east-1', 'eu-west-1']
due_certs = check_ssl_expiration(aws_regions)
board_id = get_board_id()
sprint_id = get_active_sprint_id(board_id)
epic_key = get_epic_issue_key()
for domain, arns_list in due_certs.items():
jira_issue = search_ticket(domain)
if not jira_issue:
logger.info(f"Creating ticket for {domain}")
logger.info(create_jira_issue(sprint_id, domain, arns_list, epic_key))
else:
logger.info(f"Ticket for {domain} already exists")
logger.info(update_description(jira_issue[0].key, arns_list))
if __name__ == '__main__':
main()Editor is loading...
Leave a Comment