Untitled
unknown
plain_text
a year ago
1.4 kB
6
Indexable
host = 'https://app.mode.com' ws = 'getethos' username = ‘xxxx' password = ‘yyyyy' def sleep(seconds): time.sleep(seconds) def export_report_to_json(): report_token = startTrigger.data.get('report_token') # Validate the existence of report_token if not report_token: raise ValueError("Report token not found in startTrigger data.") auth = (username, password) params = {"parameters": startTrigger.data} response = requests.post(f"{host}/api/{ws}/reports/{report_token}/runs", json=params, auth=auth) if response.status_code not in range(200, 300): raise Exception(f"Failed to start report run: {response.status_code}") most_recent_report_run_token = response.json()['token'] attempts = 0 max_attempts = 10 retry_interval = 3 # seconds while attempts < max_attempts: report_response = requests.get(f"{host}/api/{ws}/reports/{report_token}/runs/{most_recent_report_run_token}/results/content.csv", auth=auth) if report_response.status_code != 404: try: df = pd.read_csv(StringIO(report_response.text)) return df.to_json(orient='records') except Exception as e: raise e sleep(retry_interval) attempts += 1 raise Exception('Report not available after maximum attempts') # Usage return export_report_to_json()
Editor is loading...
Leave a Comment