Untitled
unknown
plain_text
2 years ago
3.7 kB
13
Indexable
Code in A.py
class PerfWorkflowService(__BasePerfWorkflowService):
def __init__(
self,
workflow_yaml: str,
workflow_timeout: int = None,
workflow_id: str = None,
workflow_prefix: str = None,
custom_binaries: list = None,
**kwargs,
):
"""Constructs AuroraPerfWorkflow object.
Params:
:workflow_yaml, str, workflow yaml config.
"""
workflow_config = None
user_config = None
if workflow_yaml and not workflow_id:
workflow_config = load_params_from_yaml(workflow_yaml)
if is_minimal_yaml(workflow_config):
validate_yaml_attribute_values(workflow_config)
validate_minimal_yaml_mandatory_fields(workflow_config)
user_config = workflow_config
workflow_config = build_full_yaml_from_minimal_yaml(workflow_config)
# Update workflow_config with custom steps
if custom_binaries:
update_custom_steps(workflow_config, custom_binaries=custom_binaries)
else:
workflow_result_mgr = get_result_manager(workflow_id=None)
workflow_config = workflow_result_mgr.get_workflow_config_for_workload_id(workflow_id)
assert workflow_config, "workflow config is not set in resultdb"
generate_db_cluster_name_suffix(workflow_config)
update_workflow_for_io_sku(workflow_config)
super().__init__(
"PerfWorkflowService",
workflow_config,
user_config,
workflow_timeout=workflow_timeout,
workflow_prefix=workflow_prefix,
**kwargs,
)
Code in B.py
def collect_ams_engine_version_logs(self, instance_id, workflow_path):
"""
Gather ams engine versions
"""
credentials = self.get_pas_credentials(PAS_ADMIN_OPS_ROLE)
cmds = self.get_rds_s3_file_transfer_template(
credentials["AccessKeyId"], credentials["SecretAccessKey"]
)
dbname1 = "auroraperf"
dbpassword1 = "rdsbmsperf"
dbuser1 = "rdsbmsperf"
cmds.append("export logfile=/tmp/engine_commit_id.log")
cmds.append("sudo date >> $logfile 2>&1")
cmds.append(
f"engine_public_version=$(/rdsdbbin/oscar/bin/mysql --user={dbuser1} --database={dbname1} -e \"SELECT aurora_version();\" --password={dbpassword1} | grep -oP '[\\d.]+' | tr -d '\\n')"
)
cmds.append(
"MySQL_grep=$(echo $(strings /rdsdbbin/oscar/bin/mysqld | grep 'git-head' -m1));"
)
cmds.append(
"engine_internal_version=$(echo $MySQL_grep | grep -P 'OscarMysql[\\d]+-[\\d.]+' -o | grep -P '[\\d.]+' | tr -d '\\n');"
)
cmds.append(
"engine_build_time=$(echo $MySQL_grep | grep -P '\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}UTC' -o | tr -d '\\n');"
)
cmds.append(
"engine_git_commit=$(echo $MySQL_grep | grep -P '[\\w\\d]{40}' -o -m1 | tr -d '\\n');"
)
cmds.append(
'echo "engine_public_version: $engine_public_version engine_internal_version: $engine_internal_version engine_build_time: $engine_build_time engine_git_commit: $engine_git_commit" >> $logfile 2>&1'
)
cmds.append(
"aws s3 cp ${logfile} "
+ f"{get_pas_logs_s3_dir_for_instance(workflow_path, instance_id[0].strip())} "
+ ">> $logfile 2>&1"
)
self.__ssm_client.run_cmd(
instance_id[1],
cmds,
)
I want to use workflow_config = load_params_from_yaml(workflow_yaml) in B.py but without making an instance of classEditor is loading...
Leave a Comment