Untitled

 avatar
unknown
plain_text
6 months ago
3.1 kB
5
Indexable
from aws_cdk import Stack
from principal_environment import PrincipalEnvironment
from aws_constructs.bucket import BucketConstruct
from aws_constructs.lambdas import LambdaConstruct
from aws_constructs.glue import GlueConstruct
from aws_constructs.step_function import StepFunctionConstruct
from aws_constructs.iam_role import IAMRoleConstruct


class CurationHubConstruct:
    def __init__(
        self, scope: Stack, all_resources: dict, env: PrincipalEnvironment, config: dict
    ):
        self.__create_iam_resources(
            scope, config, all_resources.get("iam-resources", []), env
        )
        self.__create_buckets(scope, config, all_resources.get("buckets", []), env)
        self.__create_lambdas(scope, config, all_resources.get("lambdas", []), env)
        self.__create_glueetls(scope, config, all_resources.get("glueetls", []), env)
        self.__create_step_functions(
            scope, config, all_resources.get("step-functions", []), env
        )

    @staticmethod
    def __create_buckets(
        scope: Stack, config: dict, resources: list, env: PrincipalEnvironment
    ) -> None:
        for resource in resources:
            if resource:
                bucket = BucketConstruct.create_bucket(
                    scope, f"{config['name']}-{resource}", env
                )
                BucketConstruct.create_lifecyle_rules(bucket, config["lifecycle"][resource])

    @staticmethod
    def __create_lambdas(
        scope: Stack, config: dict, resources: list, env: PrincipalEnvironment
    ) -> None:
        for resource in resources:
            if resource:
                LambdaConstruct.create_lambda(
                    scope,
                    config["name"],
                    resource,
                    env,
                    config["identifier"],
                )

    @staticmethod
    def __create_glueetls(
        scope: Stack, config: dict, resources: list, env: PrincipalEnvironment
    ) -> None:
        for resource in resources:
            if resource:
                if isinstance(resource, str):
                    GlueConstruct.create_glue_job(scope, config, env, resource)
                else:
                    resource_n = list(resource.keys())[0]
                    job_type = list(resource.values())[0]
                    GlueConstruct.create_glue_job(scope, config, env, resource_n, job_type)

    @staticmethod
    def __create_step_functions(
        scope: Stack, config: dict, resources: list, env: PrincipalEnvironment
    ) -> None:
        for resource in resources:
            if resource:
                StepFunctionConstruct.create_step_function(scope, config, resource, env)

    @staticmethod
    def __create_iam_resources(
            scope: Stack, config: dict, resources: list, env: PrincipalEnvironment
    ) -> None:
        for resource in resources:
            if resource:
                IAMRoleConstruct.create_custom_role(
                    scope, config["name"], env, resource
                )
Editor is loading...
Leave a Comment