Untitled

 avatar
unknown
plain_text
a year ago
11 kB
7
Indexable
This is my code in A file
from aurora_performance_data_service.helpers.database.DBEngine import DBEngine
from aurora_performance_data_service.helpers.ssm_controller.ssm_cmd_utils import SSMControllerHosts
from aurora_performance_data_service.helpers.workflow.pas_workflow_type import PASWorkflowType
from aurora_performance_data_service.helpers.workflow.workflow_utils import (
    build_full_yaml_from_minimal_yaml,
    generate_db_cluster_name_suffix,
    get_custom_endpoint_config,
    get_instance_size,
    get_verified_params_for_workflow,
    is_minimal_yaml,
    load_params_from_yaml,
    namespace_to_dict,
    update_custom_steps,
    update_workflow_for_io_sku,
    validate_minimal_yaml_mandatory_fields,
    validate_yaml_attribute_values,
)
from aurora_performance_data_service.workflow.rdsbms_workflow_manager import (
    get_rdsbms_workflow_manager,
)
from aurora_performance_data_service.workflow.workflow_summary import get_result_manager

logger = logging.getLogger(__name__)


class __BasePerfWorkflowService:
    """Base class for Perf workflows"""

    def __init__(
        self,
        name: str,
        workflow_config: dict,
        user_config: dict,
        workflow_prefix: str = None,
        **kwargs,
    ):
        """
        Initialize base workflow
        Params:
        :workflow_params, dict, Workflow params.
        """
        self.__engine = None
        self.__workflow_type = None
        self.__rdsbms_manager = None
        self.__perf_manager = None
        self.__parent_workflow_id = None
        self.__internal_workflow_id = None
        self.__pas_workflow_type = None
        self.__workflow_config = None
        self.__workflow_params = None

        # Presets
        self.__name = name
        self.__workflow_prefix = workflow_prefix
        self.__user_config = copy.deepcopy(user_config)

        self._set_workflow_params(workflow_config)

        self.__db_instance_size = get_instance_size(self.workflow_config["config"])
        self.__workflow_user = kwargs.get("workflow_user", None)

    @property
    def internal_workflow_id(self):
        if self.__internal_workflow_id:
            return self.__internal_workflow_id

        if not self.engine:
            return
        prefix = f"{self.__workflow_prefix}-" if self.__workflow_prefix else ""
        self.__internal_workflow_id = prefix + self.engine.strip() + f"-{uuid.uuid4().hex}"
        return self.__internal_workflow_id

    @property
    def parent_workflow_id(self):
        """ """
        # TODO : This is a temporary fix.
        # Need to add `workflow` attr.
        return self.__parent_workflow_id

    @property
    def pas_workflow_type(self):
        if not self.__pas_workflow_type:
            if (
                "config" in self.workflow_config
                and "pasWorkflowType" in self.workflow_config["config"]
                and self.workflow_config["config"]["pasWorkflowType"]
                in [workflow_type.value for workflow_type in PASWorkflowType]
            ):
                self.__pas_workflow_type = self.workflow_config["config"]["pasWorkflowType"]

        return self.__pas_workflow_type

    @property
    def workflow_config(self):
        """ """
        return self.__workflow_config

    @property
    def user_config(self):
        return self.__user_config

    @property
    def name(self):
        """ """
        return self.__name

    @property
    def rdsbms_manager(self):
        """BMS Workflow Manager"""
        if not self.__rdsbms_manager:
            self.__rdsbms_manager = get_rdsbms_workflow_manager()

        return self.__rdsbms_manager

    @property
    def workflow_params(self):
        """ """
        return self.__workflow_params

    @property
    def workflow_type(self):
        """ """
        return self.workflow_params.config.workflow_type

    # something along lines of {"instance_type": A, "num_slots": B, "plugin": C
    # TODO: would need script changes to allow for different plugins and slot count.
    @property
    def is_logical_replication_enabled(self):
        """Checks if logical replication parameters were passed as part of the configurations"""
        return (
            hasattr(self.workflow_params, "logicalreplication")
            and self.workflow_params.logicalreplication
        )

    @property
    def is_sidekick_enabled(self):
        """Checks if sidekick parameters were passed as part of the configurations"""
        return hasattr(self.workflow_params, "sidekick") and self.workflow_params.sidekick

    @property
    def is_ssm_controller_rds_enabled(self):
        """"""
        return (
            PERF_SERVICE_CONFIGS_YAML_KEY in self.workflow_config
            and SSMControllerHosts.RDS.value in self.workflow_config[PERF_SERVICE_CONFIGS_YAML_KEY]
        )

    @property
    def get_workflow_resource_retention_time(self):
        """Retrieves the resource retention time based on configuration"""
        test_type = self.test_type
        if (
            test_type == "regression"
            and self.is_sidekick_enabled
            and hasattr(self.workflow_params.sidekick, RESOURCE_RETENTION_TAG)
        ):
            return self.workflow_params.sidekick.resource_retention_time_seconds
        else:
            return (
                DEFAULT_SIDEKICK_RESOURCE_RETENTION_SECONDS
                if self.is_sidekick_enabled
                else DEFAULT_RESOURCE_RETENTION_SECONDS
            )

    @property
    def ssm_controller_rds_features(self):
        if self.is_ssm_controller_rds_enabled:
            return self.workflow_config[PERF_SERVICE_CONFIGS_YAML_KEY][
                SSMControllerHosts.RDS.value
            ].get("features")

        return []

    @property
    def emails(self):
        """Retrieves the email list from yaml"""

        if hasattr(self.workflow_params.config, "emails"):
            return self.workflow_params.config.emails

    @property
    def engine(self):
        if not self.__engine:
            if hasattr(self.workflow_params.config, "engine"):
                self.__engine = self.workflow_params.config.engine
        return self.__engine

    @property
    def db_instance_size(self):
        """
        Note: This only supports one db instance
        """
        return self.__db_instance_size

    @property
    def test_type(self):
        """
        Get the test_type
        """
        if (
            "perf_service_configs" in self.workflow_config
            and self.workflow_config["perf_service_configs"]
            and "test_type" in self.workflow_config["perf_service_configs"]
        ):
            if self.workflow_config["perf_service_configs"]["test_type"] == "regression":
                return "regression"

        # default to adhoc if incorrect input
        return "adhoc"

    @property
    def retain_ec2_instance(self):
        """
        Get the retain_ec2_instance parameter
        """
        if (
            hasattr(self.workflow_params, "resource_retention")
            and hasattr(self.workflow_params.resource_retention, "retain_ec2_instance")
            and isinstance(self.workflow_params.resource_retention.retain_ec2_instance, bool)
        ):
            return self.workflow_params.resource_retention.retain_ec2_instance is True
        else:
            return False

    def _set_workflow_params(self, workflow_config):
        """Performs workflow params adjustment(s)."""

        if "endpoints" in workflow_config["config"] and workflow_config["config"].get("endpoints"):
            _workflow_config_cpy = copy.deepcopy(workflow_config)
            _custom_endpoint_config = get_custom_endpoint_config(_workflow_config_cpy)
            _workflow_config_cpy["config"]["dbInstanceClasses"] = _custom_endpoint_config[
                "dbInstanceClasses"
            ]
            _workflow_config_cpy["config"]["EngineVersion"] = _custom_endpoint_config[
                "EngineVersion"
            ]
            self.__workflow_config = copy.deepcopy(_workflow_config_cpy)
            del _workflow_config_cpy
            # this is a walk around for custom endpoint
            # TODO: need to revisit
            if "endpoints" in _custom_endpoint_config:
                workflow_config["config"]["endpoints"] = _custom_endpoint_config["endpoints"]
            self.__workflow_params = get_verified_params_for_workflow(workflow_config)

        else:
            # workflow reference
            self.__workflow_params = get_verified_params_for_workflow(
                copy.deepcopy(workflow_config)
            )
            # PAS reference
            self.__workflow_config = workflow_config

    @property
    def has_io_sku(self):
        """Checks if io sku storage type passed to configurations"""
        if "config" in self.workflow_config and "storageType" in self.workflow_config["config"]:
            return self.workflow_config["config"]["storageType"] in ["aurora-iopt1", "aurora"]
        else:
            return False

    def override_parameters(self, parameters: str):
        """
        Apply override parameters in workflow_config
        """
        if not self.workflow_config or not isinstance(self.workflow_config, dict):
            return
        # ast.literal_eval() should be a safe choice for this purpose,
        # since it only accept limited python literal structures
        parameters_tuple: tuple[str, str] = ast.literal_eval(parameters)
        for keys_str, value in parameters_tuple:
            current_dict = self.workflow_config
            keys: list = [k for k in keys_str.split(":") if k]
            for key in keys[:-1]:
                if key not in current_dict:
                    current_dict[key] = {}
                current_dict = current_dict[key]
                if not isinstance(current_dict, dict):
                    raise ValueError(
                        f"Override parameter failed due to mismatch structure. Key in question: {keys_str}"
                    )
            if value.startswith("{") and value.endswith("}"):
                value = value[1:-1]
                value = [int(v) if _is_num(v) else v for v in value.split(":") if v]
            elif _is_num(value):
                value = int(value)
            current_dict[keys[-1]] = value

    def rdsbms_params_dict(self):
        """Returns params dict."""
        return namespace_to_dict(self.workflow_params.config)

I want to use the value obtained fromm dsbms_params_dict in another file wihout creating an instacne
Editor is loading...
Leave a Comment