Untitled
unknown
plain_text
a year ago
11 kB
7
Indexable
This is my code in A file from aurora_performance_data_service.helpers.database.DBEngine import DBEngine from aurora_performance_data_service.helpers.ssm_controller.ssm_cmd_utils import SSMControllerHosts from aurora_performance_data_service.helpers.workflow.pas_workflow_type import PASWorkflowType from aurora_performance_data_service.helpers.workflow.workflow_utils import ( build_full_yaml_from_minimal_yaml, generate_db_cluster_name_suffix, get_custom_endpoint_config, get_instance_size, get_verified_params_for_workflow, is_minimal_yaml, load_params_from_yaml, namespace_to_dict, update_custom_steps, update_workflow_for_io_sku, validate_minimal_yaml_mandatory_fields, validate_yaml_attribute_values, ) from aurora_performance_data_service.workflow.rdsbms_workflow_manager import ( get_rdsbms_workflow_manager, ) from aurora_performance_data_service.workflow.workflow_summary import get_result_manager logger = logging.getLogger(__name__) class __BasePerfWorkflowService: """Base class for Perf workflows""" def __init__( self, name: str, workflow_config: dict, user_config: dict, workflow_prefix: str = None, **kwargs, ): """ Initialize base workflow Params: :workflow_params, dict, Workflow params. """ self.__engine = None self.__workflow_type = None self.__rdsbms_manager = None self.__perf_manager = None self.__parent_workflow_id = None self.__internal_workflow_id = None self.__pas_workflow_type = None self.__workflow_config = None self.__workflow_params = None # Presets self.__name = name self.__workflow_prefix = workflow_prefix self.__user_config = copy.deepcopy(user_config) self._set_workflow_params(workflow_config) self.__db_instance_size = get_instance_size(self.workflow_config["config"]) self.__workflow_user = kwargs.get("workflow_user", None) @property def internal_workflow_id(self): if self.__internal_workflow_id: return self.__internal_workflow_id if not self.engine: return prefix = f"{self.__workflow_prefix}-" if self.__workflow_prefix else "" self.__internal_workflow_id = prefix + self.engine.strip() + f"-{uuid.uuid4().hex}" return self.__internal_workflow_id @property def parent_workflow_id(self): """ """ # TODO : This is a temporary fix. # Need to add `workflow` attr. return self.__parent_workflow_id @property def pas_workflow_type(self): if not self.__pas_workflow_type: if ( "config" in self.workflow_config and "pasWorkflowType" in self.workflow_config["config"] and self.workflow_config["config"]["pasWorkflowType"] in [workflow_type.value for workflow_type in PASWorkflowType] ): self.__pas_workflow_type = self.workflow_config["config"]["pasWorkflowType"] return self.__pas_workflow_type @property def workflow_config(self): """ """ return self.__workflow_config @property def user_config(self): return self.__user_config @property def name(self): """ """ return self.__name @property def rdsbms_manager(self): """BMS Workflow Manager""" if not self.__rdsbms_manager: self.__rdsbms_manager = get_rdsbms_workflow_manager() return self.__rdsbms_manager @property def workflow_params(self): """ """ return self.__workflow_params @property def workflow_type(self): """ """ return self.workflow_params.config.workflow_type # something along lines of {"instance_type": A, "num_slots": B, "plugin": C # TODO: would need script changes to allow for different plugins and slot count. @property def is_logical_replication_enabled(self): """Checks if logical replication parameters were passed as part of the configurations""" return ( hasattr(self.workflow_params, "logicalreplication") and self.workflow_params.logicalreplication ) @property def is_sidekick_enabled(self): """Checks if sidekick parameters were passed as part of the configurations""" return hasattr(self.workflow_params, "sidekick") and self.workflow_params.sidekick @property def is_ssm_controller_rds_enabled(self): """""" return ( PERF_SERVICE_CONFIGS_YAML_KEY in self.workflow_config and SSMControllerHosts.RDS.value in self.workflow_config[PERF_SERVICE_CONFIGS_YAML_KEY] ) @property def get_workflow_resource_retention_time(self): """Retrieves the resource retention time based on configuration""" test_type = self.test_type if ( test_type == "regression" and self.is_sidekick_enabled and hasattr(self.workflow_params.sidekick, RESOURCE_RETENTION_TAG) ): return self.workflow_params.sidekick.resource_retention_time_seconds else: return ( DEFAULT_SIDEKICK_RESOURCE_RETENTION_SECONDS if self.is_sidekick_enabled else DEFAULT_RESOURCE_RETENTION_SECONDS ) @property def ssm_controller_rds_features(self): if self.is_ssm_controller_rds_enabled: return self.workflow_config[PERF_SERVICE_CONFIGS_YAML_KEY][ SSMControllerHosts.RDS.value ].get("features") return [] @property def emails(self): """Retrieves the email list from yaml""" if hasattr(self.workflow_params.config, "emails"): return self.workflow_params.config.emails @property def engine(self): if not self.__engine: if hasattr(self.workflow_params.config, "engine"): self.__engine = self.workflow_params.config.engine return self.__engine @property def db_instance_size(self): """ Note: This only supports one db instance """ return self.__db_instance_size @property def test_type(self): """ Get the test_type """ if ( "perf_service_configs" in self.workflow_config and self.workflow_config["perf_service_configs"] and "test_type" in self.workflow_config["perf_service_configs"] ): if self.workflow_config["perf_service_configs"]["test_type"] == "regression": return "regression" # default to adhoc if incorrect input return "adhoc" @property def retain_ec2_instance(self): """ Get the retain_ec2_instance parameter """ if ( hasattr(self.workflow_params, "resource_retention") and hasattr(self.workflow_params.resource_retention, "retain_ec2_instance") and isinstance(self.workflow_params.resource_retention.retain_ec2_instance, bool) ): return self.workflow_params.resource_retention.retain_ec2_instance is True else: return False def _set_workflow_params(self, workflow_config): """Performs workflow params adjustment(s).""" if "endpoints" in workflow_config["config"] and workflow_config["config"].get("endpoints"): _workflow_config_cpy = copy.deepcopy(workflow_config) _custom_endpoint_config = get_custom_endpoint_config(_workflow_config_cpy) _workflow_config_cpy["config"]["dbInstanceClasses"] = _custom_endpoint_config[ "dbInstanceClasses" ] _workflow_config_cpy["config"]["EngineVersion"] = _custom_endpoint_config[ "EngineVersion" ] self.__workflow_config = copy.deepcopy(_workflow_config_cpy) del _workflow_config_cpy # this is a walk around for custom endpoint # TODO: need to revisit if "endpoints" in _custom_endpoint_config: workflow_config["config"]["endpoints"] = _custom_endpoint_config["endpoints"] self.__workflow_params = get_verified_params_for_workflow(workflow_config) else: # workflow reference self.__workflow_params = get_verified_params_for_workflow( copy.deepcopy(workflow_config) ) # PAS reference self.__workflow_config = workflow_config @property def has_io_sku(self): """Checks if io sku storage type passed to configurations""" if "config" in self.workflow_config and "storageType" in self.workflow_config["config"]: return self.workflow_config["config"]["storageType"] in ["aurora-iopt1", "aurora"] else: return False def override_parameters(self, parameters: str): """ Apply override parameters in workflow_config """ if not self.workflow_config or not isinstance(self.workflow_config, dict): return # ast.literal_eval() should be a safe choice for this purpose, # since it only accept limited python literal structures parameters_tuple: tuple[str, str] = ast.literal_eval(parameters) for keys_str, value in parameters_tuple: current_dict = self.workflow_config keys: list = [k for k in keys_str.split(":") if k] for key in keys[:-1]: if key not in current_dict: current_dict[key] = {} current_dict = current_dict[key] if not isinstance(current_dict, dict): raise ValueError( f"Override parameter failed due to mismatch structure. Key in question: {keys_str}" ) if value.startswith("{") and value.endswith("}"): value = value[1:-1] value = [int(v) if _is_num(v) else v for v in value.split(":") if v] elif _is_num(value): value = int(value) current_dict[keys[-1]] = value def rdsbms_params_dict(self): """Returns params dict.""" return namespace_to_dict(self.workflow_params.config) I want to use the value obtained fromm dsbms_params_dict in another file wihout creating an instacne
Editor is loading...
Leave a Comment