pydantic class loop zaloop
unknown
python
2 years ago
1.5 kB
10
Indexable
import yaml
from pydantic import BaseModel, constr, Field, ValidationError
from typing import Dict, List, Optional, Set
ENVS = {'prod', 'preprod', 'prod_internal', 'preprod_internal'}
for env in ENVS:
class WorkflowConfig(BaseModel, extra='forbid'):
input_triggers: Optional[List[constr(pattern="/cloud/dwh/" + env + "/.*_ready")]] = None
query_parameters: Dict[str, constr(pattern="//home/cloud-dwh/data/" + env + "/.*")]
destination_path: constr(pattern="//home/cloud-dwh/data/" + env + "/.*")
lag_monitor: Optional[Dict[str, str]] = None
execution_frequency_hours: Optional[int] = Field(None, ge=1, le=24)
execution_cron: Optional[str] = None
WorkflowConfig.__name__ = env
globals()[env] = WorkflowConfig
class Parameters(BaseModel, extra="forbid"):
prod: globals()['prod']
preprod: Optional[globals()['preprod']] = None
prod_internal: Optional[globals()['prod_internal']] = None
preprod_internal: Optional[globals()['preprod_internal']] = None
used_query_parameters: Set[str]
def parse_from_yaml(path_to_yaml):
with open(path_to_yaml) as f:
config = yaml.safe_load(f)
config['used_query_parameters'] = {'accounts', 'billingaccounts'}
parameters = Parameters(**config)
return parameters
try:
parameters = parse_from_yaml('parameters.yaml')
except ValidationError as e:
print(str(e))Editor is loading...