Untitled
unknown
python
a year ago
4.5 kB
14
Indexable
import datetime import time import psycopg2 import requests import json import pandas as pd import numpy as np from airflow import DAG from airflow.providers.http.operators.http import SimpleHttpOperator from airflow.operators.python import PythonOperator from airflow.hooks.base import BaseHook from airflow.models.xcom import XCom ###POSTGRESQL settings### #set postgresql connectionfrom basehook psql_conn = BaseHook.get_connection('postgresql_de') ##init test connection conn = psycopg2.connect(f"dbname='de' port='{psql_conn.port}' user='{psql_conn.login}' host='{psql_conn.host}' password='{psql_conn.password}'") cur = conn.cursor() cur.close() conn.close() #4. апдейт витринок (таблички f) def customer_retention_view(ti): #connection to database psql_conn = BaseHook.get_connection('postgresql_de') conn = psycopg2.connect(f"dbname='de' port='{psql_conn.port}' user='{psql_conn.login}' host='{psql_conn.host}' password='{psql_conn.password}'") cur = conn.cursor() #f_activity cur.execute("""INSERT INTO mart.f_customer_retention (new_customers_count) with new_customers_count AS ( select count(customer_id) new_customers_count from mart.f_sales join mart.d_calendar on f_sales.date_id = d_calendar.date_id where week_of_year = DATE_PART('week', '{{ds}}'::DATE) GROUP BY customer_id HAVING count(customer_id) = 1 ) SELECT count(new_customers_count) FROM new_customers_count; INSERT INTO mart.f_customer_retention (returning_customers_count) with returning_customers_count AS ( select count(customer_id) returning_customers_count from mart.f_sales join mart.d_calendar on f_sales.date_id = d_calendar.date_id where week_of_year = DATE_PART('week', '{{ds}}'::DATE) GROUP BY customer_id HAVING count(customer_id) > 1 ) SELECT count(returning_customers_count) FROM returning_customers_count; INSERT INTO mart.f_customer_retention (refunded_customer_count) with refunded_customer_count AS ( select count(customer_id) refunded_customer_count from mart.f_sales join mart.d_calendar on f_sales.date_id = d_calendar.date_id where week_of_year = DATE_PART('week', '{{ds}}'::DATE) AND payment_amount < 0 GROUP BY customer_id ) SELECT count(refunded_customer_count) FROM refunded_customer_count; INSERT INTO mart.f_customer_retention (period_name) VALUES ('weekly'); INSERT INTO mart.f_customer_retention (period_id) select week_of_year AS period_id from mart.d_calendar where week_of_year = DATE_PART('week', '{{ds}}'::DATE) INSERT INTO mart.f_customer_retention (new_customers_revenue) with new_customers_revenue AS ( select sum(payment_amount) new_customers_revenue from mart.f_sales join mart.d_calendar on f_sales.date_id = d_calendar.date_id where week_of_year = DATE_PART('week', '{{ds}}'::DATE) GROUP BY customer_id HAVING count(customer_id) = 1 ) SELECT sum(new_customers_revenue) FROM new_customers_revenue; INSERT INTO mart.f_customer_retention (returning_customers_revenue) with returning_customers_revenue AS ( select sum(payment_amount) returning_customers_revenue from mart.f_sales join mart.d_calendar on f_sales.date_id = d_calendar.date_id where week_of_year = DATE_PART('week', '{{ds}}'::DATE) GROUP BY customer_id HAVING count(customer_id) > 1 ) SELECT sum(returning_customers_revenue) FROM returning_customers_revenue; INSERT INTO mart.f_customer_retention (customers_refunded) select count(payment_amount) customers_refunded from mart.f_sales join mart.d_calendar on f_sales.date_id = d_calendar.date_id where week_of_year = DATE_PART('week', '{{ds}}'::DATE) AND payment_amount < 0; """) conn.commit() cur.close() conn.close() return 200 #Объявляем даг dag = DAG( dag_id='create_customer_retention_view', schedule_interval = "0 11 * * MON", start_date=datetime.datetime.today() - datetime.timedelta(days=1), catchup=False, dagrun_timeout=datetime.timedelta(minutes=60) ) t_customer_retention_view = PythonOperator(task_id='customer_retention_view', python_callable=customer_retention_view, dag=dag) t_customer_retention_view
Editor is loading...
Leave a Comment