Untitled
Code in A.py class PerfWorkflowService(__BasePerfWorkflowService): def __init__( self, workflow_yaml: str, workflow_timeout: int = None, workflow_id: str = None, workflow_prefix: str = None, custom_binaries: list = None, **kwargs, ): """Constructs AuroraPerfWorkflow object. Params: :workflow_yaml, str, workflow yaml config. """ workflow_config = None user_config = None if workflow_yaml and not workflow_id: workflow_config = load_params_from_yaml(workflow_yaml) if is_minimal_yaml(workflow_config): validate_yaml_attribute_values(workflow_config) validate_minimal_yaml_mandatory_fields(workflow_config) user_config = workflow_config workflow_config = build_full_yaml_from_minimal_yaml(workflow_config) # Update workflow_config with custom steps if custom_binaries: update_custom_steps(workflow_config, custom_binaries=custom_binaries) else: workflow_result_mgr = get_result_manager(workflow_id=None) workflow_config = workflow_result_mgr.get_workflow_config_for_workload_id(workflow_id) assert workflow_config, "workflow config is not set in resultdb" generate_db_cluster_name_suffix(workflow_config) update_workflow_for_io_sku(workflow_config) super().__init__( "PerfWorkflowService", workflow_config, user_config, workflow_timeout=workflow_timeout, workflow_prefix=workflow_prefix, **kwargs, ) Code in B.py def collect_ams_engine_version_logs(self, instance_id, workflow_path): """ Gather ams engine versions """ credentials = self.get_pas_credentials(PAS_ADMIN_OPS_ROLE) cmds = self.get_rds_s3_file_transfer_template( credentials["AccessKeyId"], credentials["SecretAccessKey"] ) dbname1 = "auroraperf" dbpassword1 = "rdsbmsperf" dbuser1 = "rdsbmsperf" cmds.append("export logfile=/tmp/engine_commit_id.log") cmds.append("sudo date >> $logfile 2>&1") cmds.append( f"engine_public_version=$(/rdsdbbin/oscar/bin/mysql --user={dbuser1} --database={dbname1} -e \"SELECT aurora_version();\" --password={dbpassword1} | grep -oP '[\\d.]+' | tr -d '\\n')" ) cmds.append( "MySQL_grep=$(echo $(strings /rdsdbbin/oscar/bin/mysqld | grep 'git-head' -m1));" ) cmds.append( "engine_internal_version=$(echo $MySQL_grep | grep -P 'OscarMysql[\\d]+-[\\d.]+' -o | grep -P '[\\d.]+' | tr -d '\\n');" ) cmds.append( "engine_build_time=$(echo $MySQL_grep | grep -P '\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}UTC' -o | tr -d '\\n');" ) cmds.append( "engine_git_commit=$(echo $MySQL_grep | grep -P '[\\w\\d]{40}' -o -m1 | tr -d '\\n');" ) cmds.append( 'echo "engine_public_version: $engine_public_version engine_internal_version: $engine_internal_version engine_build_time: $engine_build_time engine_git_commit: $engine_git_commit" >> $logfile 2>&1' ) cmds.append( "aws s3 cp ${logfile} " + f"{get_pas_logs_s3_dir_for_instance(workflow_path, instance_id[0].strip())} " + ">> $logfile 2>&1" ) self.__ssm_client.run_cmd( instance_id[1], cmds, ) I want to use workflow_config = load_params_from_yaml(workflow_yaml) in B.py but without making an instance of class
Leave a Comment