Untitled
unknown
plain_text
2 years ago
2.6 kB
12
Indexable
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from cassandra.cluster import Cluster
import pandas as pd
from cassandra.auth import PlainTextAuthProvider
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
def execute_cassandra_query():
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra@123')
cluster = Cluster(['172.16.177.58'], auth_provider=auth_provider)
session = cluster.connect('hobs_ods_staging')
cassandra_query = "SELECT * FROM hos_event_recon_summary limit 10"
result_set = session.execute(cassandra_query)
df = pd.DataFrame(result_set, columns=['recon_start_date', 'recon_end_date', 'event_type', 'no_of_failed_events',
'no_of_failed_events_pubished_to_kafka', 'no_of_passed_events',
'no_of_passed_events_via_recon', 'total_no_of_events'])
df['recon_start_date'] = pd.to_datetime(df['recon_start_date']).dt.date
result_df = df.groupby(['recon_start_date', 'event_type']).agg({
'total_no_of_events': 'sum',
'no_of_passed_events': 'sum',
'no_of_failed_events': 'sum',
'no_of_passed_events_via_recon': 'sum'
}).reset_index()
msg = MIMEMultipart()
msg['From'] = '[email protected]'
msg['To'] = '[email protected]'
msg['Subject'] = 'Cassandra Query Results'
html = result_df.to_html(index=False)
body = MIMEText(html, 'html')
msg.attach(body)
smtp_server = '10.16.16.90'
smtp_port = 25
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.sendmail(msg['From'], msg['To'], msg.as_string())
# Define the default_args dictionary to specify the default parameters of the DAG
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Instantiate the DAG
dag = DAG(
'cassandra_query_dag',
default_args=default_args,
description='DAG for executing Cassandra query and sending results via email',
schedule_interval=timedelta(days=1), # Adjust the schedule interval as needed
)
# Define the task using PythonOperator
execute_query_task = PythonOperator(
task_id='execute_cassandra_query_task',
python_callable=execute_cassandra_query,
dag=dag,
)
# Set the task dependencies
execute_query_task
if __name__ == "__main__":
dag.cli()
Editor is loading...
Leave a Comment