Untitled
unknown
plain_text
a year ago
3.1 kB
9
Indexable
from aws_cdk import Stack
from principal_environment import PrincipalEnvironment
from aws_constructs.bucket import BucketConstruct
from aws_constructs.lambdas import LambdaConstruct
from aws_constructs.glue import GlueConstruct
from aws_constructs.step_function import StepFunctionConstruct
from aws_constructs.iam_role import IAMRoleConstruct
class CurationHubConstruct:
def __init__(
self, scope: Stack, all_resources: dict, env: PrincipalEnvironment, config: dict
):
self.__create_iam_resources(
scope, config, all_resources.get("iam-resources", []), env
)
self.__create_buckets(scope, config, all_resources.get("buckets", []), env)
self.__create_lambdas(scope, config, all_resources.get("lambdas", []), env)
self.__create_glueetls(scope, config, all_resources.get("glueetls", []), env)
self.__create_step_functions(
scope, config, all_resources.get("step-functions", []), env
)
@staticmethod
def __create_buckets(
scope: Stack, config: dict, resources: list, env: PrincipalEnvironment
) -> None:
for resource in resources:
if resource:
bucket = BucketConstruct.create_bucket(
scope, f"{config['name']}-{resource}", env
)
BucketConstruct.create_lifecyle_rules(bucket, config["lifecycle"][resource])
@staticmethod
def __create_lambdas(
scope: Stack, config: dict, resources: list, env: PrincipalEnvironment
) -> None:
for resource in resources:
if resource:
LambdaConstruct.create_lambda(
scope,
config["name"],
resource,
env,
config["identifier"],
)
@staticmethod
def __create_glueetls(
scope: Stack, config: dict, resources: list, env: PrincipalEnvironment
) -> None:
for resource in resources:
if resource:
if isinstance(resource, str):
GlueConstruct.create_glue_job(scope, config, env, resource)
else:
resource_n = list(resource.keys())[0]
job_type = list(resource.values())[0]
GlueConstruct.create_glue_job(scope, config, env, resource_n, job_type)
@staticmethod
def __create_step_functions(
scope: Stack, config: dict, resources: list, env: PrincipalEnvironment
) -> None:
for resource in resources:
if resource:
StepFunctionConstruct.create_step_function(scope, config, resource, env)
@staticmethod
def __create_iam_resources(
scope: Stack, config: dict, resources: list, env: PrincipalEnvironment
) -> None:
for resource in resources:
if resource:
IAMRoleConstruct.create_custom_role(
scope, config["name"], env, resource
)Editor is loading...
Leave a Comment