pydantic class loop zaloop

 avatar
unknown
python
2 years ago
1.5 kB
7
Indexable
import yaml
from pydantic import BaseModel, constr, Field, ValidationError
from typing import Dict, List, Optional, Set

ENVS = {'prod', 'preprod', 'prod_internal', 'preprod_internal'}

for env in ENVS:
    class WorkflowConfig(BaseModel, extra='forbid'):
        input_triggers:             Optional[List[constr(pattern="/cloud/dwh/" + env + "/.*_ready")]] = None
        query_parameters:           Dict[str, constr(pattern="//home/cloud-dwh/data/" + env + "/.*")]
        destination_path:           constr(pattern="//home/cloud-dwh/data/" + env + "/.*")
        lag_monitor:                Optional[Dict[str, str]] = None
        execution_frequency_hours:  Optional[int] = Field(None, ge=1, le=24)
        execution_cron:             Optional[str] = None

    WorkflowConfig.__name__ = env
    globals()[env] = WorkflowConfig


class Parameters(BaseModel, extra="forbid"):
    prod: globals()['prod']
    preprod: Optional[globals()['preprod']] = None
    prod_internal: Optional[globals()['prod_internal']] = None
    preprod_internal: Optional[globals()['preprod_internal']] = None
    used_query_parameters: Set[str]


def parse_from_yaml(path_to_yaml):
    with open(path_to_yaml) as f:
        config = yaml.safe_load(f)
        config['used_query_parameters'] = {'accounts', 'billingaccounts'}
        parameters = Parameters(**config)

    return parameters


try:
    parameters = parse_from_yaml('parameters.yaml')
except ValidationError as e:
    print(str(e))
Editor is loading...