Apache Airflow Example

mail@pastecode.io avatar
unknown
python
a year ago
2.5 kB
7
Indexable
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator

# Default DAG arguments
default_args = {
    'owner': 'steeeeeeeeve',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Instantiate a DAG
dag = DAG(
    'example_postgres_operator_with_formatted_sql',
    default_args=default_args,
    description='An example using formatted SQL with PostgresOperator',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 9, 1),
    catchup=False,
)

# Task to create example_table
create_example_table = PostgresOperator(
    task_id='create_example_table',
    postgres_conn_id='your_postgres_conn_id',
    sql="""
        CREATE TABLE IF NOT EXISTS example_table (
            id SERIAL PRIMARY KEY, 
            name VARCHAR(255) NOT NULL, 
            age INT
        );
    """,
    dag=dag,
)

# Task to create another_table
create_another_table = PostgresOperator(
    task_id='create_another_table',
    postgres_conn_id='your_postgres_conn_id',
    sql="""
        CREATE TABLE IF NOT EXISTS another_table (
            id SERIAL PRIMARY KEY, 
            occupation VARCHAR(255) NOT NULL
        );
    """,
    dag=dag,
)

# Task to insert data into example_table
insert_into_example_table = PostgresOperator(
    task_id='insert_into_example_table',
    postgres_conn_id='your_postgres_conn_id',
    sql="""
        INSERT INTO example_table (name, age) 
        VALUES ('Alice', 30), ('Bob', 40);
    """,
    dag=dag,
)

# Task to insert data into another_table
insert_into_another_table = PostgresOperator(
    task_id='insert_into_another_table',
    postgres_conn_id='your_postgres_conn_id',
    sql="""
        INSERT INTO another_table (occupation) 
        VALUES ('Engineer'), ('Doctor');
    """,
    dag=dag,
)

# Task to perform SELECT with JOIN
select_with_join = PostgresOperator(
    task_id='select_with_join',
    postgres_conn_id='your_postgres_conn_id',
    sql="""
        SELECT e.name, e.age, a.occupation 
        FROM example_table e 
        JOIN another_table a ON e.id = a.id;
    """,
    dag=dag,
)

# Set task order
create_example_table >> insert_into_example_table
create_another_table >> insert_into_another_table
[insert_into_example_table, insert_into_another_table] >> select_with_join