Untitled
unknown
plain_text
2 years ago
1.4 kB
10
Indexable
host = 'https://app.mode.com'
ws = 'getethos'
username = ‘xxxx'
password = ‘yyyyy'
def sleep(seconds):
time.sleep(seconds)
def export_report_to_json():
report_token = startTrigger.data.get('report_token')
# Validate the existence of report_token
if not report_token:
raise ValueError("Report token not found in startTrigger data.")
auth = (username, password)
params = {"parameters": startTrigger.data}
response = requests.post(f"{host}/api/{ws}/reports/{report_token}/runs", json=params, auth=auth)
if response.status_code not in range(200, 300):
raise Exception(f"Failed to start report run: {response.status_code}")
most_recent_report_run_token = response.json()['token']
attempts = 0
max_attempts = 10
retry_interval = 3 # seconds
while attempts < max_attempts:
report_response = requests.get(f"{host}/api/{ws}/reports/{report_token}/runs/{most_recent_report_run_token}/results/content.csv", auth=auth)
if report_response.status_code != 404:
try:
df = pd.read_csv(StringIO(report_response.text))
return df.to_json(orient='records')
except Exception as e:
raise e
sleep(retry_interval)
attempts += 1
raise Exception('Report not available after maximum attempts')
# Usage
return export_report_to_json()Editor is loading...
Leave a Comment