Untitled
unknown
plain_text
6 months ago
3.1 kB
5
Indexable
from aws_cdk import Stack from principal_environment import PrincipalEnvironment from aws_constructs.bucket import BucketConstruct from aws_constructs.lambdas import LambdaConstruct from aws_constructs.glue import GlueConstruct from aws_constructs.step_function import StepFunctionConstruct from aws_constructs.iam_role import IAMRoleConstruct class CurationHubConstruct: def __init__( self, scope: Stack, all_resources: dict, env: PrincipalEnvironment, config: dict ): self.__create_iam_resources( scope, config, all_resources.get("iam-resources", []), env ) self.__create_buckets(scope, config, all_resources.get("buckets", []), env) self.__create_lambdas(scope, config, all_resources.get("lambdas", []), env) self.__create_glueetls(scope, config, all_resources.get("glueetls", []), env) self.__create_step_functions( scope, config, all_resources.get("step-functions", []), env ) @staticmethod def __create_buckets( scope: Stack, config: dict, resources: list, env: PrincipalEnvironment ) -> None: for resource in resources: if resource: bucket = BucketConstruct.create_bucket( scope, f"{config['name']}-{resource}", env ) BucketConstruct.create_lifecyle_rules(bucket, config["lifecycle"][resource]) @staticmethod def __create_lambdas( scope: Stack, config: dict, resources: list, env: PrincipalEnvironment ) -> None: for resource in resources: if resource: LambdaConstruct.create_lambda( scope, config["name"], resource, env, config["identifier"], ) @staticmethod def __create_glueetls( scope: Stack, config: dict, resources: list, env: PrincipalEnvironment ) -> None: for resource in resources: if resource: if isinstance(resource, str): GlueConstruct.create_glue_job(scope, config, env, resource) else: resource_n = list(resource.keys())[0] job_type = list(resource.values())[0] GlueConstruct.create_glue_job(scope, config, env, resource_n, job_type) @staticmethod def __create_step_functions( scope: Stack, config: dict, resources: list, env: PrincipalEnvironment ) -> None: for resource in resources: if resource: StepFunctionConstruct.create_step_function(scope, config, resource, env) @staticmethod def __create_iam_resources( scope: Stack, config: dict, resources: list, env: PrincipalEnvironment ) -> None: for resource in resources: if resource: IAMRoleConstruct.create_custom_role( scope, config["name"], env, resource )
Editor is loading...
Leave a Comment