Untitled
unknown
python
2 years ago
3.2 kB
5
Indexable
import requests import os import pandas as pd import time from dotenv import load_dotenv import os from sqlalchemy import create_engine from sqlalchemy.engine import URL import numpy as np from datetime import datetime import math import json def load_streaming_data(file_name): with open(os.path.join(os.path.dirname(__file__), file_name),"r") as f: json_config = json.load(f) return json_config def connect_database(CONNECTION_STRING): connection_url = URL.create("mssql+pyodbc", query={"odbc_connect": CONNECTION_STRING}) return create_engine(connection_url) def to_datetime(df, format): for col in df.columns: if df[col].dtype == 'object': try: df[col] = pd.to_datetime(df[col]) except ValueError: pass for col in df.columns: if df[col].dtype == 'datetime64[ns]': df[col] = df[col].dt.strftime(format) return df def post_streaming_data(sql_connection, sql_file, powerbi_url): # Read SQL File with open(os.path.join(os.path.dirname(__file__), 'SQL_STATEMENTS', sql_file),"r") as f: sql = f.read() # Execute SQL File df = pd.read_sql_query(sql, sql_connection) # Insert Column timestamp df.insert(0, 'timestamp', pd.to_datetime(datetime.now()).replace(microsecond=0)) #Change format of dates to PowerBI Format df = to_datetime(df, "%Y-%m-%dT%H:%M:%SZ") # Split DataFrame into batches and upload them split_size = math.ceil(df.size / 5000) counter = 0 for array in np.array_split(df, split_size): body = bytes(array.to_json(orient='records'), encoding='utf-8') with requests.Session() as s: res = s.post(powerbi_url, body) if res.status_code == 200: counter += 1 else: raise ValueError(f'Der Upload zu PowerBI hat nicht funktioniert für {sql_file.split(".")[0]}.') time.sleep(1) # If a Batch is lost, Error will raise if split_size > counter: raise ValueError(f'Es ist ein Datenpaket bei {sql_file.split(".")[0]} verloren gegangen: SplitSize:{split_size} , Counter:{counter}') def main(): try: load_dotenv() CONNECTION_STRING = os.getenv('CONNECTION_STRING') STREAMING_CONFIG = os.getenv('STREAMING_CONFIG') MAIL_SERVER = os.getenv('MAIL_SERVER') MAIL_TOKEN = os.getenv('MAIL_TOKEN') connection = connect_database(CONNECTION_STRING) streaming_config = load_streaming_data(STREAMING_CONFIG) for sql_file, powerbi_url in streaming_config.items(): post_streaming_data(connection, sql_file, powerbi_url) except Exception as ex: headers = { "token": MAIL_TOKEN } body = { "subject": "Fehler POWERBI Streaming", "html": f'<p>{ex}</p>' } with requests.Session() as s: s.post(MAIL_SERVER, body, headers=headers, verify = False) if __name__ == '__main__': main() """ while True: main() time.sleep(60) """
Editor is loading...