Untitled
unknown
plain_text
a year ago
4.2 kB
10
Indexable
"""Module responsible for the stack creation code for Data Reservoir."""
import os.path
from aws_cdk import Stack, aws_kms
from aws_constructs.lambdas import LambdaConstruct
from constructs import Construct
from data_foundation_constructs.helpers import sns
from data_foundation_constructs.ingestion_hub.helpers.preprocess import (
PreprocessConfig, PreprocessType)
from data_foundation_constructs.ingestion_hub.ingestion_hub import IngestionHub
from data_foundation_constructs.ingestion_hub.ingestion_hub_settings import \
generate_settings
from pgi_cdk_standard_library.helpers.s3 import deploy_to_s3
from principal_environment import PrincipalEnvironment
from ..aws_constructs import s3_bucket
class PrinamHkFaDataReservoir(Stack):
"""Class containing the code for creating Data Reservoir stack when it's instantiated."""
def __init__(
self,
scope: Construct,
stack_id: str,
app_name: str,
env: PrincipalEnvironment,
data_source: dict,
**kwargs: dict,
):
Stack.__init__(self, scope, stack_id, env=env.to_cdk_env, **kwargs)
self.app_name = app_name
self.env = env
self._create_ingestion_stack(data_source)
# Importing Foundation Managed KMS KEY
self.kms = aws_kms.Alias.from_alias_name(
scope=self,
id="GetKMSKey",
alias_name="alias/pfg/foundation/standard",
)
bucket_name = "prinam-hk-fa-data-reservoir"
self.bucket = s3_bucket.create_bucket(
scope=self, bucket=bucket_name, env=env, encryption_key=self.kms
)
# CREATE TRIGGER LAMBDA
if data_source.get("trigger", {env.aws_environment_name: False}).get(
env.aws_environment_name
):
LambdaConstruct.create_lambda(self, data_source["name"], "trigger", env)
def _create_ingestion_stack(self, data_source: dict) -> None:
shared_xmatters_topic = sns.import_sns_topic(
scope=self,
app_name=self.app_name,
topic_name=data_source["shared_xmatters_topic"][
self.env.aws_environment_name
][self.env.region],
identifier="xmatters",
)
preprocess_for_data_source = data_source.get("preprocess", {})
preprocess_config = (
PreprocessConfig(
data_source_name=data_source["name"],
secret_manager=preprocess_for_data_source.get("secret_manager", False),
preprocess_type=PreprocessType[
preprocess_for_data_source.get("type", "none").upper()
],
preprocess_root_directory=f"""{os.path.abspath(
os.path.join(os.path.dirname(__file__),
'..', '..', '..', 'lambdas', 'preprocess')
)}""",
code_path=preprocess_for_data_source.get(
"code_path", data_source["name"]
),
additional_python_modules=[],
)
if preprocess_for_data_source
else None
)
hub_settings = generate_settings(
app_name=self.app_name,
env=self.env,
data_source=data_source,
preprocess_config=preprocess_config,
shared_xmatters_topic=shared_xmatters_topic,
notify_topic=None,
)
hub = IngestionHub(
scope=self,
construct_id="ingestion-hub",
ingestion_hub_settings=hub_settings,
)
if (
preprocess_config
and preprocess_config.preprocess_type is PreprocessType.GLUE
):
deploy_to_s3(
scope=self,
bucket_deploy_id="preprocess-upload",
dest_bucket=hub.curation_hub.reservoir_bucket,
path=f"../../glue/preprocess/{preprocess_config.code_path}",
prefix=f"glue/preprocess/{preprocess_config.code_path}",
role=hub.curation_hub.bucket_deployment_role,
)
Editor is loading...
Leave a Comment