Untitled

 avatar
unknown
python
2 years ago
3.2 kB
5
Indexable
import requests
import os
import pandas as pd
import time
from dotenv import load_dotenv
import os
from sqlalchemy import create_engine
from sqlalchemy.engine import URL
import numpy as np
from datetime import datetime
import math
import json

def load_streaming_data(file_name):
    with open(os.path.join(os.path.dirname(__file__), file_name),"r") as f:
        json_config = json.load(f)
    return json_config

def connect_database(CONNECTION_STRING):
    connection_url = URL.create("mssql+pyodbc", query={"odbc_connect": CONNECTION_STRING})
    return create_engine(connection_url)

def to_datetime(df, format):
    for col in df.columns:
        if df[col].dtype == 'object':
            try:
                df[col] = pd.to_datetime(df[col])
            except ValueError:
                pass

    for col in df.columns:
        if df[col].dtype == 'datetime64[ns]':
            df[col] = df[col].dt.strftime(format)

    return df

def post_streaming_data(sql_connection, sql_file, powerbi_url):
    
    # Read SQL File
    with open(os.path.join(os.path.dirname(__file__), 'SQL_STATEMENTS', sql_file),"r") as f:
        sql = f.read()

    # Execute SQL File
    df = pd.read_sql_query(sql, sql_connection)

    # Insert Column timestamp
    df.insert(0, 'timestamp', pd.to_datetime(datetime.now()).replace(microsecond=0))

    #Change format of dates to PowerBI Format
    df = to_datetime(df, "%Y-%m-%dT%H:%M:%SZ")

    # Split DataFrame into batches and upload them
    split_size = math.ceil(df.size / 5000) 
    counter = 0
    
    for array in np.array_split(df, split_size):
        body = bytes(array.to_json(orient='records'), encoding='utf-8')

        with requests.Session() as s:
            res = s.post(powerbi_url, body)

        if res.status_code == 200:
            counter += 1
        else:
            raise ValueError(f'Der Upload zu PowerBI hat nicht funktioniert für {sql_file.split(".")[0]}.')
        time.sleep(1)

    # If a Batch is lost, Error will raise
    if split_size > counter:
        raise ValueError(f'Es ist ein Datenpaket bei {sql_file.split(".")[0]} verloren gegangen: SplitSize:{split_size} , Counter:{counter}')


        
def main():
    try:
        load_dotenv()
        CONNECTION_STRING = os.getenv('CONNECTION_STRING')
        STREAMING_CONFIG = os.getenv('STREAMING_CONFIG')
        MAIL_SERVER = os.getenv('MAIL_SERVER')
        MAIL_TOKEN = os.getenv('MAIL_TOKEN')

        connection = connect_database(CONNECTION_STRING)
        streaming_config = load_streaming_data(STREAMING_CONFIG)


        for sql_file, powerbi_url in streaming_config.items():
            post_streaming_data(connection, sql_file, powerbi_url)
 

    except Exception as ex:
        headers = {
            "token": MAIL_TOKEN
        }
        body = {
            "subject": "Fehler POWERBI Streaming",
            "html": f'<p>{ex}</p>'
        }
        with requests.Session() as s:
            s.post(MAIL_SERVER, body, headers=headers, verify = False)

       
if __name__ == '__main__':	
    main()
    """ while True:
        main()
        time.sleep(60) """

    
        

Editor is loading...