Untitled
unknown
plain_text
2 years ago
6.4 kB
6
Indexable
import os import sys import boto3 import datetime from jira import JIRA import logging # Logger configuration log_level = os.getenv('LOG_LEVEL', 'INFO').upper() logging.basicConfig(level=log_level, format='[%(asctime)s][%(levelname)s] - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') logger = logging.getLogger(__name__) # Check that all required environment variables are set try: JIRA_BASE_URL = os.environ['JIRA_BASE_URL'] JIRA_PROJECT_NAME = os.environ['JIRA_PROJECT_NAME'] JIRA_BOARD_NAME = os.environ['JIRA_BOARD_NAME'] JIRA_PROJECT_KEY = os.environ['JIRA_PROJECT_KEY'] JIRA_SSL_RESPONSIBLE_STREAM = os.environ['JIRA_SSL_RESPONSIBLE_STREAM'] JIRA_EPIC_BASE_NAME = os.environ['JIRA_EPIC_BASE_NAME'] JIRA_PAT = os.environ['JIRA_PAT'] DAYS_TO_EXPIRATION_THRESHOLD = int(os.environ['DAYS_TO_EXPIRATION_THRESHOLD']) except KeyError as e: logger.error(f"Environment variable {e} is not set. Please set all required environment variables.") raise jira_client = JIRA(server=JIRA_BASE_URL, token_auth=JIRA_PAT) def check_ssl_expiration(aws_regions: list): due_certs = {} def certificate_expiring_soon(cert_detail): expiration = cert_detail['NotAfter'] days_to_expiration = (expiration - datetime.datetime.now(datetime.timezone.utc)).days logger.debug(f"{cert_detail['DomainName']} expiring in {days_to_expiration} days") return days_to_expiration <= DAYS_TO_EXPIRATION_THRESHOLD def is_imported(cert_detail): certificate_type = cert_detail['Type'] if certificate_type == 'IMPORTED': return True else: logger.debug(f"{cert_detail['DomainName']} SSL type is {certificate_type}. Skipping...") return False # Iterate through regions and record expiring in dict variable due_certs for region in aws_regions: session = boto3.Session(region_name=region) acm_client = session.client('acm') logger.info(f"Checking SSL certificates in region: {region}") certificates = acm_client.list_certificates(CertificateStatuses=['ISSUED']) for cert_detail in certificates['CertificateSummaryList']: if is_imported(cert_detail) and certificate_expiring_soon(cert_detail): domain_name = cert_detail['DomainName'] certificate_arn = cert_detail['CertificateArn'] if domain_name not in due_certs: due_certs[domain_name] = [] due_certs[domain_name].append(certificate_arn) if due_certs: logger.info(f"Found {len(due_certs)} certificates due for renewal.") return due_certs else: logger.info("No certificates due for renewal.") sys.exit(0) # Look for active tickets that match [DPS][AUTO] SSL renewal - {domain} format def search_ticket(domain: str): if "*" in domain: domain = domain.replace("*", "\\\*") search_query = f'project = "{JIRA_PROJECT_NAME}" AND status != Done AND status != Completed AND summary ~ "\\\[DPS\\\]\\\[AUTO\\\] SSL renewal - {domain}"' issues = jira_client.search_issues(jql_str = search_query) return issues def get_board_id(): boards = jira_client.boards(name=JIRA_BOARD_NAME, projectKeyOrID=JIRA_PROJECT_KEY) if boards: return boards[0].id else: logger.error("Board not found.") sys.exit(1) def get_active_sprint_id(board_id: str): sprint = jira_client.sprints(board_id=board_id, state = "active") if sprint: return sprint[0].id else: logger.error("Sprint not found.") sys.exit(1) def get_epic_issue_key(): search_query = f'project = "{JIRA_PROJECT_NAME}" AND summary ~ "{JIRA_EPIC_BASE_NAME}" AND type = Epic AND status != Done AND status != Completed' issues = jira_client.search_issues(jql_str = search_query) if not issues: logger.warning("Epic not found. Tickets will be created without Epic Link") return elif len(issues) > 1: logger.warning("Found more than one matching Epic. The ticket may have incorrect Epic Link") epic_key = issues[-1].key # get latest epic if more than one found logger.info(f"Epic key is {epic_key}") return epic_key def create_jira_issue(sprint_id: str, domain: str, arns: list, epic_key: str): fields = { 'project': { 'key': JIRA_PROJECT_KEY }, 'issuetype': { 'name': 'Story' }, 'priority': { 'name': 'Low' }, 'labels': [JIRA_SSL_RESPONSIBLE_STREAM], 'summary': f'[DPS][AUTO] SSL Renewal - {domain}', 'description': 'Certificate ARN\n' + '\n'.join(arns), 'customfield_10005': sprint_id, # customfield_10005 is configured as "Sprint" field in our Jira 'customfield_10006': epic_key # customfield_10006 is configured as "Epic Link" field in our Jira } new_issue = jira_client.create_issue(fields) return f"Ticket ID is {new_issue.key}" # Check if existing ticket description contains all ARNs of expiring certificates and append if not def update_description(issue_key: str, arn_list: list): issue = jira_client.issue(issue_key) current_description = issue.fields.description existing_arns = set(current_description.strip().split('\n')) missing_arns = [arn for arn in arn_list if arn not in existing_arns] if missing_arns: new_description = current_description + "\n" + "\n".join(missing_arns) issue.update(fields={'description': new_description}) return f"Updated description with {len(missing_arns)} missing ARNs." else: return "No missing ARNs to update." def main(): aws_regions = ['us-east-1', 'eu-west-1'] due_certs = check_ssl_expiration(aws_regions) board_id = get_board_id() sprint_id = get_active_sprint_id(board_id) epic_key = get_epic_issue_key() for domain, arns_list in due_certs.items(): jira_issue = search_ticket(domain) if not jira_issue: logger.info(f"Creating ticket for {domain}") logger.info(create_jira_issue(sprint_id, domain, arns_list, epic_key)) else: logger.info(f"Ticket for {domain} already exists") logger.info(update_description(jira_issue[0].key, arns_list)) if __name__ == '__main__': main()
Editor is loading...
Leave a Comment