Untitled

mail@pastecode.io avatar
unknown
plain_text
10 months ago
3.7 kB
6
Indexable
Code in A.py
class PerfWorkflowService(__BasePerfWorkflowService):
    def __init__(
        self,
        workflow_yaml: str,
        workflow_timeout: int = None,
        workflow_id: str = None,
        workflow_prefix: str = None,
        custom_binaries: list = None,
        **kwargs,
    ):
        """Constructs AuroraPerfWorkflow object.

        Params:
        :workflow_yaml, str, workflow yaml config.
        """
        workflow_config = None
        user_config = None
        if workflow_yaml and not workflow_id:
            workflow_config = load_params_from_yaml(workflow_yaml)

            if is_minimal_yaml(workflow_config):
                validate_yaml_attribute_values(workflow_config)
                validate_minimal_yaml_mandatory_fields(workflow_config)
                user_config = workflow_config
                workflow_config = build_full_yaml_from_minimal_yaml(workflow_config)
            # Update workflow_config with custom steps
            if custom_binaries:
                update_custom_steps(workflow_config, custom_binaries=custom_binaries)
        else:
            workflow_result_mgr = get_result_manager(workflow_id=None)
            workflow_config = workflow_result_mgr.get_workflow_config_for_workload_id(workflow_id)
            assert workflow_config, "workflow config is not set in resultdb"

        generate_db_cluster_name_suffix(workflow_config)
        update_workflow_for_io_sku(workflow_config)

        super().__init__(
            "PerfWorkflowService",
            workflow_config,
            user_config,
            workflow_timeout=workflow_timeout,
            workflow_prefix=workflow_prefix,
            **kwargs,
        )

Code in B.py
def collect_ams_engine_version_logs(self, instance_id, workflow_path):
        """
        Gather ams engine versions
        """
        credentials = self.get_pas_credentials(PAS_ADMIN_OPS_ROLE)
        cmds = self.get_rds_s3_file_transfer_template(
            credentials["AccessKeyId"], credentials["SecretAccessKey"]
        )
        dbname1 = "auroraperf"
        dbpassword1 = "rdsbmsperf"
        dbuser1 = "rdsbmsperf"

        cmds.append("export logfile=/tmp/engine_commit_id.log")
        cmds.append("sudo date >> $logfile 2>&1")

        cmds.append(
            f"engine_public_version=$(/rdsdbbin/oscar/bin/mysql --user={dbuser1} --database={dbname1} -e \"SELECT aurora_version();\" --password={dbpassword1} | grep -oP '[\\d.]+' | tr -d '\\n')"
        )

        cmds.append(
            "MySQL_grep=$(echo $(strings /rdsdbbin/oscar/bin/mysqld | grep 'git-head' -m1));"
        )
        cmds.append(
            "engine_internal_version=$(echo $MySQL_grep | grep -P 'OscarMysql[\\d]+-[\\d.]+' -o | grep -P '[\\d.]+' | tr -d '\\n');"
        )
        cmds.append(
            "engine_build_time=$(echo $MySQL_grep | grep -P '\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}UTC' -o | tr -d '\\n');"
        )
        cmds.append(
            "engine_git_commit=$(echo $MySQL_grep | grep -P '[\\w\\d]{40}' -o -m1 | tr -d '\\n');"
        )

        cmds.append(
            'echo "engine_public_version: $engine_public_version engine_internal_version: $engine_internal_version engine_build_time: $engine_build_time engine_git_commit: $engine_git_commit" >> $logfile 2>&1'
        )

        cmds.append(
            "aws s3 cp ${logfile} "
            + f"{get_pas_logs_s3_dir_for_instance(workflow_path, instance_id[0].strip())} "
            + ">> $logfile 2>&1"
        )

        self.__ssm_client.run_cmd(
            instance_id[1],
            cmds,
        )


I want to use workflow_config = load_params_from_yaml(workflow_yaml) in B.py but without making an instance of class
Leave a Comment