Untitled
unknown
python
3 years ago
3.2 kB
8
Indexable
import requests
import os
import pandas as pd
import time
from dotenv import load_dotenv
import os
from sqlalchemy import create_engine
from sqlalchemy.engine import URL
import numpy as np
from datetime import datetime
import math
import json
def load_streaming_data(file_name):
with open(os.path.join(os.path.dirname(__file__), file_name),"r") as f:
json_config = json.load(f)
return json_config
def connect_database(CONNECTION_STRING):
connection_url = URL.create("mssql+pyodbc", query={"odbc_connect": CONNECTION_STRING})
return create_engine(connection_url)
def to_datetime(df, format):
for col in df.columns:
if df[col].dtype == 'object':
try:
df[col] = pd.to_datetime(df[col])
except ValueError:
pass
for col in df.columns:
if df[col].dtype == 'datetime64[ns]':
df[col] = df[col].dt.strftime(format)
return df
def post_streaming_data(sql_connection, sql_file, powerbi_url):
# Read SQL File
with open(os.path.join(os.path.dirname(__file__), 'SQL_STATEMENTS', sql_file),"r") as f:
sql = f.read()
# Execute SQL File
df = pd.read_sql_query(sql, sql_connection)
# Insert Column timestamp
df.insert(0, 'timestamp', pd.to_datetime(datetime.now()).replace(microsecond=0))
#Change format of dates to PowerBI Format
df = to_datetime(df, "%Y-%m-%dT%H:%M:%SZ")
# Split DataFrame into batches and upload them
split_size = math.ceil(df.size / 5000)
counter = 0
for array in np.array_split(df, split_size):
body = bytes(array.to_json(orient='records'), encoding='utf-8')
with requests.Session() as s:
res = s.post(powerbi_url, body)
if res.status_code == 200:
counter += 1
else:
raise ValueError(f'Der Upload zu PowerBI hat nicht funktioniert für {sql_file.split(".")[0]}.')
time.sleep(1)
# If a Batch is lost, Error will raise
if split_size > counter:
raise ValueError(f'Es ist ein Datenpaket bei {sql_file.split(".")[0]} verloren gegangen: SplitSize:{split_size} , Counter:{counter}')
def main():
try:
load_dotenv()
CONNECTION_STRING = os.getenv('CONNECTION_STRING')
STREAMING_CONFIG = os.getenv('STREAMING_CONFIG')
MAIL_SERVER = os.getenv('MAIL_SERVER')
MAIL_TOKEN = os.getenv('MAIL_TOKEN')
connection = connect_database(CONNECTION_STRING)
streaming_config = load_streaming_data(STREAMING_CONFIG)
for sql_file, powerbi_url in streaming_config.items():
post_streaming_data(connection, sql_file, powerbi_url)
except Exception as ex:
headers = {
"token": MAIL_TOKEN
}
body = {
"subject": "Fehler POWERBI Streaming",
"html": f'<p>{ex}</p>'
}
with requests.Session() as s:
s.post(MAIL_SERVER, body, headers=headers, verify = False)
if __name__ == '__main__':
main()
""" while True:
main()
time.sleep(60) """
Editor is loading...