from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
# Default DAG arguments
default_args = {
'owner': 'steeeeeeeeve',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Instantiate a DAG
dag = DAG(
'example_postgres_operator_with_formatted_sql',
default_args=default_args,
description='An example using formatted SQL with PostgresOperator',
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 9, 1),
catchup=False,
)
# Task to create example_table
create_example_table = PostgresOperator(
task_id='create_example_table',
postgres_conn_id='your_postgres_conn_id',
sql="""
CREATE TABLE IF NOT EXISTS example_table (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
age INT
);
""",
dag=dag,
)
# Task to create another_table
create_another_table = PostgresOperator(
task_id='create_another_table',
postgres_conn_id='your_postgres_conn_id',
sql="""
CREATE TABLE IF NOT EXISTS another_table (
id SERIAL PRIMARY KEY,
occupation VARCHAR(255) NOT NULL
);
""",
dag=dag,
)
# Task to insert data into example_table
insert_into_example_table = PostgresOperator(
task_id='insert_into_example_table',
postgres_conn_id='your_postgres_conn_id',
sql="""
INSERT INTO example_table (name, age)
VALUES ('Alice', 30), ('Bob', 40);
""",
dag=dag,
)
# Task to insert data into another_table
insert_into_another_table = PostgresOperator(
task_id='insert_into_another_table',
postgres_conn_id='your_postgres_conn_id',
sql="""
INSERT INTO another_table (occupation)
VALUES ('Engineer'), ('Doctor');
""",
dag=dag,
)
# Task to perform SELECT with JOIN
select_with_join = PostgresOperator(
task_id='select_with_join',
postgres_conn_id='your_postgres_conn_id',
sql="""
SELECT e.name, e.age, a.occupation
FROM example_table e
JOIN another_table a ON e.id = a.id;
""",
dag=dag,
)
# Set task order
create_example_table >> insert_into_example_table
create_another_table >> insert_into_another_table
[insert_into_example_table, insert_into_another_table] >> select_with_join