pydantic class loop zaloop
unknown
python
2 years ago
1.5 kB
7
Indexable
import yaml from pydantic import BaseModel, constr, Field, ValidationError from typing import Dict, List, Optional, Set ENVS = {'prod', 'preprod', 'prod_internal', 'preprod_internal'} for env in ENVS: class WorkflowConfig(BaseModel, extra='forbid'): input_triggers: Optional[List[constr(pattern="/cloud/dwh/" + env + "/.*_ready")]] = None query_parameters: Dict[str, constr(pattern="//home/cloud-dwh/data/" + env + "/.*")] destination_path: constr(pattern="//home/cloud-dwh/data/" + env + "/.*") lag_monitor: Optional[Dict[str, str]] = None execution_frequency_hours: Optional[int] = Field(None, ge=1, le=24) execution_cron: Optional[str] = None WorkflowConfig.__name__ = env globals()[env] = WorkflowConfig class Parameters(BaseModel, extra="forbid"): prod: globals()['prod'] preprod: Optional[globals()['preprod']] = None prod_internal: Optional[globals()['prod_internal']] = None preprod_internal: Optional[globals()['preprod_internal']] = None used_query_parameters: Set[str] def parse_from_yaml(path_to_yaml): with open(path_to_yaml) as f: config = yaml.safe_load(f) config['used_query_parameters'] = {'accounts', 'billingaccounts'} parameters = Parameters(**config) return parameters try: parameters = parse_from_yaml('parameters.yaml') except ValidationError as e: print(str(e))
Editor is loading...