Apache Airflow Example
unknown
python
a year ago
2.5 kB
7
Indexable
Never
from datetime import datetime, timedelta from airflow import DAG from airflow.providers.postgres.operators.postgres import PostgresOperator # Default DAG arguments default_args = { 'owner': 'steeeeeeeeve', 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } # Instantiate a DAG dag = DAG( 'example_postgres_operator_with_formatted_sql', default_args=default_args, description='An example using formatted SQL with PostgresOperator', schedule_interval=timedelta(days=1), start_date=datetime(2023, 9, 1), catchup=False, ) # Task to create example_table create_example_table = PostgresOperator( task_id='create_example_table', postgres_conn_id='your_postgres_conn_id', sql=""" CREATE TABLE IF NOT EXISTS example_table ( id SERIAL PRIMARY KEY, name VARCHAR(255) NOT NULL, age INT ); """, dag=dag, ) # Task to create another_table create_another_table = PostgresOperator( task_id='create_another_table', postgres_conn_id='your_postgres_conn_id', sql=""" CREATE TABLE IF NOT EXISTS another_table ( id SERIAL PRIMARY KEY, occupation VARCHAR(255) NOT NULL ); """, dag=dag, ) # Task to insert data into example_table insert_into_example_table = PostgresOperator( task_id='insert_into_example_table', postgres_conn_id='your_postgres_conn_id', sql=""" INSERT INTO example_table (name, age) VALUES ('Alice', 30), ('Bob', 40); """, dag=dag, ) # Task to insert data into another_table insert_into_another_table = PostgresOperator( task_id='insert_into_another_table', postgres_conn_id='your_postgres_conn_id', sql=""" INSERT INTO another_table (occupation) VALUES ('Engineer'), ('Doctor'); """, dag=dag, ) # Task to perform SELECT with JOIN select_with_join = PostgresOperator( task_id='select_with_join', postgres_conn_id='your_postgres_conn_id', sql=""" SELECT e.name, e.age, a.occupation FROM example_table e JOIN another_table a ON e.id = a.id; """, dag=dag, ) # Set task order create_example_table >> insert_into_example_table create_another_table >> insert_into_another_table [insert_into_example_table, insert_into_another_table] >> select_with_join