Untitled

 avatar
unknown
plain_text
a month ago
2.3 kB
2
Indexable
import logging
import pathlib
from datetime import datetime, timedelta

import pendulum
import pytz
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s

from dateutil.relativedelta import relativedelta
from kubernetes.client import models as k8s
from zpcommon.utils import (
    get_node_settings,
)

env = "uat"
with open(pathlib.Path(__file__).parent.resolve().absolute().joinpath(f"etl-image-{env}.txt")) as f:
    ETL_DOCKER_IMAGE = f.readline()

task_logger = logging.getLogger("airflow.task")

core_args = {
    "start_date": pendulum.datetime(2023, 3, 8),
    "params": {"environment": "UAT"},
    "retries": 3,
    "retry_delay": timedelta(minutes=1),
}


tz = pytz.timezone("Asia/Singapore")
today = datetime.now(tz)
yesterday = today - relativedelta(days=1)
day_path = yesterday.strftime("%m_%d_%Y")


configmaps = [
    k8s.V1EnvFromSource(
        config_map_ref=k8s.V1ConfigMapEnvSource(name=f"eztracker-airflow-eztracker-{env}")
    ),
]

#####################
# DAG for EzTracker Inbound Master ETL
#####################
with DAG(
    dag_id=f"eztracker_inbound_master_{env}",
    schedule_interval="0/20 * * * *",
    default_args=core_args,
    catchup=False,
    tags=["owner:ezTracker", "app:ezTracker", "system:ezTracker"],
) as dag:

    eztracker_inbound_master = KubernetesPodOperator(
        namespace="airflow",
        image=ETL_DOCKER_IMAGE,
        image_pull_secrets=[k8s.V1LocalObjectReference("airflow-docker-secret")],
        cmds=[
            "poetry",
            "run",
            "eztracker_inbound_master"
        ],
        name=f"{env}_eztracker_inbound_master",
        is_delete_operator_pod=True,
        in_cluster=True,
        task_id=f"{env}eztracker_inbound_master",
        get_logs=True,
        image_pull_policy="Always",
        container_resources=k8s.V1ResourceRequirements(
            limits={"memory": "200Mi", "cpu": "400m"},
            requests={"memory": "100Mi", "cpu": "200m"},
        ),
        log_events_on_failure=True,
        sla=timedelta(minutes=30),
        **get_node_settings("airflow16"),
        env_from=configmaps,
    )
    eztracker_inbound_master

Editor is loading...
Leave a Comment