Untitled

 avatar
unknown
plain_text
a year ago
4.3 kB
25
Indexable
import logging

import boto3
from coral import coralrpc
from odin_client.aws_credentials_provider import AWSCredentialsProvider
from odin_client.refresher import TimedRefresher
from rds.admin.internal.describecustomerdbinstancesrequest import DescribeCustomerDBInstancesRequest
from rds.admin.internal.describedbclusterrequest import DescribeDBClusterRequest
from rds.admin.internal.describedbinstancehostsmessage import DescribeDbInstanceHostsMessage
from rds.admin.internal.rdsinternalwebservice import RDSInternalWebServiceClient

from aurora_performance_data_service.helpers.constants import (
    AURORA_PERF_ANALYSIS_ACCT,
    AWS_DEFAULT_REGION,
    MATERIAL_SET,
    RDS_DOMAINS,
    WORKFLOW_CLIENT_TIMEOUT,
)

logger = logging.getLogger(__name__)


def get_aws_api_endpoint(domain):
    if domain == "integ":
        return RDS_DOMAINS["integ"]
    elif domain == "prod":
        return RDS_DOMAINS["prod"]
    elif domain == "elephant":
        return RDS_DOMAINS["elephant"]
    elif domain == "orion":
        return RDS_DOMAINS["orion"]
    elif domain == "mammoth":
        return RDS_DOMAINS["mammoth"]
    elif domain == "qa":
        return RDS_DOMAINS["qa"]
    else:
        return None


def get_aws_api_credentials(domain, region):
    """
        Get AWS API credentials
    :param domain:
    :param region:
    :return:
    """
    try:
        credentials = boto3.Session(profile_name="default").get_credentials()
    except Exception as e:
        logger.warning(f"Failed to get Turtle credentials: {e}, Getting Odin credentials")
        credentials_provider = AWSCredentialsProvider(MATERIAL_SET, TimedRefresher(90))
        credentials = credentials_provider.aws_access_key_pair

    return credentials


class IWSClient:
    def __init__(self, domain, region, read_timeout=WORKFLOW_CLIENT_TIMEOUT):
        self.__iws_client = self.__get_iws_client(domain, region, read_timeout)

    def __get_iws_client(self, domain, region, read_timeout):
        credentials = get_aws_api_credentials(domain, region)
        endpoint = get_aws_api_endpoint(domain)
        orchestrator = coralrpc.new_orchestrator(
            endpoint=endpoint,
            aws_region=region,
            aws_service="rds",
            aws_access_key=credentials.access_key.encode(),
            aws_secret_key=credentials.secret_key.encode(),
            aws_security_token=credentials.token.encode() if credentials.token else None,
            signature_algorithm="v4",
            timeout=(30, read_timeout),
        )
        return RDSInternalWebServiceClient(orchestrator)

    def get_db_instance_host_details(self, db_instance_id, customer_id):
        request = DescribeDbInstanceHostsMessage(
            instance_id=db_instance_id, customer_id=customer_id
        )
        response = None
        try:
            response = self.__iws_client.describe_db_instance_hosts(request)
        except Exception as e:
            logger.exception(e)
        if not response:
            return None
        return response

    def describe_db_cluster(self, customer_cluster_identifier, customer_aws_id, db_cluster_id=None):
        request = DescribeDBClusterRequest(
            customer_cluster_identifier=customer_cluster_identifier,
            customer_aws_id=customer_aws_id,
            db_cluster_id=db_cluster_id,
        )
        response = None
        try:
            response = self.__iws_client.describe_db_cluster(request)
        except Exception as e:
            logger.exception(e)
        if not response:
            return None
        return response

    def get_customer_db_instances(
        self,
        customer_id=AURORA_PERF_ANALYSIS_ACCT,
        db_instance_identifier=None,
    ):
        request = DescribeCustomerDBInstancesRequest(
            customer_id=customer_id, db_instance_identifier_search_string=db_instance_identifier
        )
        response = None
        try:
            response = self.__iws_client.describe_customer_db_instances(request)
        except Exception as e:
            logger.exception(e)
        if not response:
            return None
        return response.customer_db_instances


def get_iws_client(domain, region=AWS_DEFAULT_REGION):
    return IWSClient(domain, region)
Editor is loading...
Leave a Comment