Untitled
unknown
plain_text
15 days ago
2.3 kB
3
Indexable
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta import sys from dotenv import load_dotenv import json import glob load_dotenv(override=True) sys.path.append('/app/scraper/src/fetchers/reference_data/tradingview') from data_coverage_scraper import crawler_data_coverage # from database.reference_data.cassandra_client import CassandraClient default_args = { "owner": "airflow", "depends_on_past": False, "start_date": datetime(2025, 3, 4), "retries": 1, "retry_delay": timedelta(minutes=5), } # def countries_tradingview_task_callable(): # return countries_scraper(tradingview_path='/data/tradingview_data') def exchanges_tradingview_task_callable(): return crawler_data_coverage(tradingview_path='/data/tradingview_data') def load_to_cassandra_task_callable(): client = CassandraClient() client.connect() path ='/data/tradingview_data' for json_file in glob.glob(f"{path}/*.json"): with open(json_file, 'r', encoding='utf-8') as f: data = json.load(f) if 'countries' in data[0]: for region_data in data: region = region_data['region'] for country in region_data['countries']: client.insert_country(region, country) else: for exchange in data: client.insert_exchange(exchange) client.close() return "Data loaded to Cassandra successfully!" with DAG( "web_scraping_dag", default_args=default_args, schedule_interval=None, catchup=False ) as dag: # countries_task = PythonOperator( # task_id="countries_task", # python_callable=countries_tradingview_task_callable # ) exchanges_task = PythonOperator( task_id="exchanges_task", python_callable=exchanges_tradingview_task_callable ) # load_to_cassandra_task = PythonOperator( # task_id="load_to_cassandra", # python_callable=load_to_cassandra_task_callable # ) # countries_task >> exchanges_task >> load_to_cassandra_task # exchanges_task >> load_to_cassandra_task
Editor is loading...
Leave a Comment