Untitled

 avatar
unknown
plain_text
2 years ago
6.4 kB
6
Indexable
import os
import sys
import boto3
import datetime
from jira import JIRA
import logging

# Logger configuration
log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
logging.basicConfig(level=log_level,
                    format='[%(asctime)s][%(levelname)s] - %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger(__name__)

# Check that all required environment variables are set
try:
    JIRA_BASE_URL = os.environ['JIRA_BASE_URL']
    JIRA_PROJECT_NAME = os.environ['JIRA_PROJECT_NAME']
    JIRA_BOARD_NAME = os.environ['JIRA_BOARD_NAME']
    JIRA_PROJECT_KEY = os.environ['JIRA_PROJECT_KEY']
    JIRA_SSL_RESPONSIBLE_STREAM = os.environ['JIRA_SSL_RESPONSIBLE_STREAM']
    JIRA_EPIC_BASE_NAME = os.environ['JIRA_EPIC_BASE_NAME']
    JIRA_PAT = os.environ['JIRA_PAT']
    DAYS_TO_EXPIRATION_THRESHOLD = int(os.environ['DAYS_TO_EXPIRATION_THRESHOLD'])
except KeyError as e:
    logger.error(f"Environment variable {e} is not set. Please set all required environment variables.")
    raise

jira_client = JIRA(server=JIRA_BASE_URL, token_auth=JIRA_PAT)

def check_ssl_expiration(aws_regions: list):
    due_certs = {}

    def certificate_expiring_soon(cert_detail):
        expiration = cert_detail['NotAfter']
        days_to_expiration = (expiration - datetime.datetime.now(datetime.timezone.utc)).days
        logger.debug(f"{cert_detail['DomainName']} expiring in {days_to_expiration} days")
        return days_to_expiration <= DAYS_TO_EXPIRATION_THRESHOLD

    def is_imported(cert_detail):
        certificate_type = cert_detail['Type']
        if certificate_type == 'IMPORTED':
            return True
        else:
            logger.debug(f"{cert_detail['DomainName']} SSL type is {certificate_type}. Skipping...")
            return False

    # Iterate through regions and record expiring in dict variable due_certs
    for region in aws_regions:
        session = boto3.Session(region_name=region)
        acm_client = session.client('acm')
        logger.info(f"Checking SSL certificates in region: {region}")
        certificates = acm_client.list_certificates(CertificateStatuses=['ISSUED'])

        for cert_detail in certificates['CertificateSummaryList']:
            if is_imported(cert_detail) and certificate_expiring_soon(cert_detail):
                domain_name = cert_detail['DomainName']
                certificate_arn = cert_detail['CertificateArn']
                if domain_name not in due_certs:
                    due_certs[domain_name] = []
                due_certs[domain_name].append(certificate_arn)
    if due_certs:
        logger.info(f"Found {len(due_certs)} certificates due for renewal.")
        return due_certs
    else:
        logger.info("No certificates due for renewal.")
        sys.exit(0)

# Look for active tickets that match [DPS][AUTO] SSL renewal - {domain} format
def search_ticket(domain: str):
    if "*" in domain:
        domain = domain.replace("*", "\\\*")
    search_query = f'project = "{JIRA_PROJECT_NAME}" AND status != Done AND status != Completed AND summary ~ "\\\[DPS\\\]\\\[AUTO\\\] SSL renewal - {domain}"'
    issues = jira_client.search_issues(jql_str = search_query)
    return issues

def get_board_id():
    boards = jira_client.boards(name=JIRA_BOARD_NAME, projectKeyOrID=JIRA_PROJECT_KEY)
    if boards:
        return boards[0].id
    else:
        logger.error("Board not found.")
        sys.exit(1)

def get_active_sprint_id(board_id: str):
    sprint = jira_client.sprints(board_id=board_id, state = "active")
    if sprint:
        return sprint[0].id
    else:
        logger.error("Sprint not found.")
        sys.exit(1)

def get_epic_issue_key():
    search_query = f'project = "{JIRA_PROJECT_NAME}" AND summary ~ "{JIRA_EPIC_BASE_NAME}" AND type = Epic AND status != Done AND status != Completed'
    issues = jira_client.search_issues(jql_str = search_query)
    if not issues:
        logger.warning("Epic not found. Tickets will be created without Epic Link")
        return
    elif len(issues) > 1:
        logger.warning("Found more than one matching Epic. The ticket may have incorrect Epic Link")
    epic_key = issues[-1].key # get latest epic if more than one found
    logger.info(f"Epic key is {epic_key}")
    return epic_key

def create_jira_issue(sprint_id: str, domain: str, arns: list, epic_key: str):

    fields = {
        'project': {
            'key': JIRA_PROJECT_KEY
        },
        'issuetype': {
            'name': 'Story'
        },
        'priority': {
            'name': 'Low'
        },
        'labels': [JIRA_SSL_RESPONSIBLE_STREAM],
        'summary': f'[DPS][AUTO] SSL Renewal - {domain}',
        'description': 'Certificate ARN\n' + '\n'.join(arns),
        'customfield_10005': sprint_id, # customfield_10005 is configured as "Sprint" field in our Jira
        'customfield_10006': epic_key # customfield_10006 is configured as "Epic Link" field in our Jira
    }

    new_issue = jira_client.create_issue(fields)
    return f"Ticket ID is {new_issue.key}"

# Check if existing ticket description contains all ARNs of expiring certificates and append if not
def update_description(issue_key: str, arn_list: list):
    issue = jira_client.issue(issue_key)
    current_description = issue.fields.description
    existing_arns = set(current_description.strip().split('\n'))
    missing_arns = [arn for arn in arn_list if arn not in existing_arns]

    if missing_arns:
        new_description = current_description + "\n" + "\n".join(missing_arns)
        issue.update(fields={'description': new_description})
        return f"Updated description with {len(missing_arns)} missing ARNs."
    else:
        return "No missing ARNs to update."

def main():

    aws_regions = ['us-east-1', 'eu-west-1']
    due_certs = check_ssl_expiration(aws_regions)
    board_id = get_board_id()
    sprint_id = get_active_sprint_id(board_id)
    epic_key = get_epic_issue_key()
    for domain, arns_list in due_certs.items():
        jira_issue = search_ticket(domain)
        if not jira_issue:
            logger.info(f"Creating ticket for {domain}")
            logger.info(create_jira_issue(sprint_id, domain, arns_list, epic_key))
        else:
            logger.info(f"Ticket for {domain} already exists")
            logger.info(update_description(jira_issue[0].key, arns_list))

if __name__ == '__main__':
    main()
Editor is loading...
Leave a Comment