Untitled
unknown
plain_text
2 years ago
11 kB
13
Indexable
This is my code in A file
from aurora_performance_data_service.helpers.database.DBEngine import DBEngine
from aurora_performance_data_service.helpers.ssm_controller.ssm_cmd_utils import SSMControllerHosts
from aurora_performance_data_service.helpers.workflow.pas_workflow_type import PASWorkflowType
from aurora_performance_data_service.helpers.workflow.workflow_utils import (
build_full_yaml_from_minimal_yaml,
generate_db_cluster_name_suffix,
get_custom_endpoint_config,
get_instance_size,
get_verified_params_for_workflow,
is_minimal_yaml,
load_params_from_yaml,
namespace_to_dict,
update_custom_steps,
update_workflow_for_io_sku,
validate_minimal_yaml_mandatory_fields,
validate_yaml_attribute_values,
)
from aurora_performance_data_service.workflow.rdsbms_workflow_manager import (
get_rdsbms_workflow_manager,
)
from aurora_performance_data_service.workflow.workflow_summary import get_result_manager
logger = logging.getLogger(__name__)
class __BasePerfWorkflowService:
"""Base class for Perf workflows"""
def __init__(
self,
name: str,
workflow_config: dict,
user_config: dict,
workflow_prefix: str = None,
**kwargs,
):
"""
Initialize base workflow
Params:
:workflow_params, dict, Workflow params.
"""
self.__engine = None
self.__workflow_type = None
self.__rdsbms_manager = None
self.__perf_manager = None
self.__parent_workflow_id = None
self.__internal_workflow_id = None
self.__pas_workflow_type = None
self.__workflow_config = None
self.__workflow_params = None
# Presets
self.__name = name
self.__workflow_prefix = workflow_prefix
self.__user_config = copy.deepcopy(user_config)
self._set_workflow_params(workflow_config)
self.__db_instance_size = get_instance_size(self.workflow_config["config"])
self.__workflow_user = kwargs.get("workflow_user", None)
@property
def internal_workflow_id(self):
if self.__internal_workflow_id:
return self.__internal_workflow_id
if not self.engine:
return
prefix = f"{self.__workflow_prefix}-" if self.__workflow_prefix else ""
self.__internal_workflow_id = prefix + self.engine.strip() + f"-{uuid.uuid4().hex}"
return self.__internal_workflow_id
@property
def parent_workflow_id(self):
""" """
# TODO : This is a temporary fix.
# Need to add `workflow` attr.
return self.__parent_workflow_id
@property
def pas_workflow_type(self):
if not self.__pas_workflow_type:
if (
"config" in self.workflow_config
and "pasWorkflowType" in self.workflow_config["config"]
and self.workflow_config["config"]["pasWorkflowType"]
in [workflow_type.value for workflow_type in PASWorkflowType]
):
self.__pas_workflow_type = self.workflow_config["config"]["pasWorkflowType"]
return self.__pas_workflow_type
@property
def workflow_config(self):
""" """
return self.__workflow_config
@property
def user_config(self):
return self.__user_config
@property
def name(self):
""" """
return self.__name
@property
def rdsbms_manager(self):
"""BMS Workflow Manager"""
if not self.__rdsbms_manager:
self.__rdsbms_manager = get_rdsbms_workflow_manager()
return self.__rdsbms_manager
@property
def workflow_params(self):
""" """
return self.__workflow_params
@property
def workflow_type(self):
""" """
return self.workflow_params.config.workflow_type
# something along lines of {"instance_type": A, "num_slots": B, "plugin": C
# TODO: would need script changes to allow for different plugins and slot count.
@property
def is_logical_replication_enabled(self):
"""Checks if logical replication parameters were passed as part of the configurations"""
return (
hasattr(self.workflow_params, "logicalreplication")
and self.workflow_params.logicalreplication
)
@property
def is_sidekick_enabled(self):
"""Checks if sidekick parameters were passed as part of the configurations"""
return hasattr(self.workflow_params, "sidekick") and self.workflow_params.sidekick
@property
def is_ssm_controller_rds_enabled(self):
""""""
return (
PERF_SERVICE_CONFIGS_YAML_KEY in self.workflow_config
and SSMControllerHosts.RDS.value in self.workflow_config[PERF_SERVICE_CONFIGS_YAML_KEY]
)
@property
def get_workflow_resource_retention_time(self):
"""Retrieves the resource retention time based on configuration"""
test_type = self.test_type
if (
test_type == "regression"
and self.is_sidekick_enabled
and hasattr(self.workflow_params.sidekick, RESOURCE_RETENTION_TAG)
):
return self.workflow_params.sidekick.resource_retention_time_seconds
else:
return (
DEFAULT_SIDEKICK_RESOURCE_RETENTION_SECONDS
if self.is_sidekick_enabled
else DEFAULT_RESOURCE_RETENTION_SECONDS
)
@property
def ssm_controller_rds_features(self):
if self.is_ssm_controller_rds_enabled:
return self.workflow_config[PERF_SERVICE_CONFIGS_YAML_KEY][
SSMControllerHosts.RDS.value
].get("features")
return []
@property
def emails(self):
"""Retrieves the email list from yaml"""
if hasattr(self.workflow_params.config, "emails"):
return self.workflow_params.config.emails
@property
def engine(self):
if not self.__engine:
if hasattr(self.workflow_params.config, "engine"):
self.__engine = self.workflow_params.config.engine
return self.__engine
@property
def db_instance_size(self):
"""
Note: This only supports one db instance
"""
return self.__db_instance_size
@property
def test_type(self):
"""
Get the test_type
"""
if (
"perf_service_configs" in self.workflow_config
and self.workflow_config["perf_service_configs"]
and "test_type" in self.workflow_config["perf_service_configs"]
):
if self.workflow_config["perf_service_configs"]["test_type"] == "regression":
return "regression"
# default to adhoc if incorrect input
return "adhoc"
@property
def retain_ec2_instance(self):
"""
Get the retain_ec2_instance parameter
"""
if (
hasattr(self.workflow_params, "resource_retention")
and hasattr(self.workflow_params.resource_retention, "retain_ec2_instance")
and isinstance(self.workflow_params.resource_retention.retain_ec2_instance, bool)
):
return self.workflow_params.resource_retention.retain_ec2_instance is True
else:
return False
def _set_workflow_params(self, workflow_config):
"""Performs workflow params adjustment(s)."""
if "endpoints" in workflow_config["config"] and workflow_config["config"].get("endpoints"):
_workflow_config_cpy = copy.deepcopy(workflow_config)
_custom_endpoint_config = get_custom_endpoint_config(_workflow_config_cpy)
_workflow_config_cpy["config"]["dbInstanceClasses"] = _custom_endpoint_config[
"dbInstanceClasses"
]
_workflow_config_cpy["config"]["EngineVersion"] = _custom_endpoint_config[
"EngineVersion"
]
self.__workflow_config = copy.deepcopy(_workflow_config_cpy)
del _workflow_config_cpy
# this is a walk around for custom endpoint
# TODO: need to revisit
if "endpoints" in _custom_endpoint_config:
workflow_config["config"]["endpoints"] = _custom_endpoint_config["endpoints"]
self.__workflow_params = get_verified_params_for_workflow(workflow_config)
else:
# workflow reference
self.__workflow_params = get_verified_params_for_workflow(
copy.deepcopy(workflow_config)
)
# PAS reference
self.__workflow_config = workflow_config
@property
def has_io_sku(self):
"""Checks if io sku storage type passed to configurations"""
if "config" in self.workflow_config and "storageType" in self.workflow_config["config"]:
return self.workflow_config["config"]["storageType"] in ["aurora-iopt1", "aurora"]
else:
return False
def override_parameters(self, parameters: str):
"""
Apply override parameters in workflow_config
"""
if not self.workflow_config or not isinstance(self.workflow_config, dict):
return
# ast.literal_eval() should be a safe choice for this purpose,
# since it only accept limited python literal structures
parameters_tuple: tuple[str, str] = ast.literal_eval(parameters)
for keys_str, value in parameters_tuple:
current_dict = self.workflow_config
keys: list = [k for k in keys_str.split(":") if k]
for key in keys[:-1]:
if key not in current_dict:
current_dict[key] = {}
current_dict = current_dict[key]
if not isinstance(current_dict, dict):
raise ValueError(
f"Override parameter failed due to mismatch structure. Key in question: {keys_str}"
)
if value.startswith("{") and value.endswith("}"):
value = value[1:-1]
value = [int(v) if _is_num(v) else v for v in value.split(":") if v]
elif _is_num(value):
value = int(value)
current_dict[keys[-1]] = value
def rdsbms_params_dict(self):
"""Returns params dict."""
return namespace_to_dict(self.workflow_params.config)
I want to use the value obtained fromm dsbms_params_dict in another file wihout creating an instacneEditor is loading...
Leave a Comment