Untitled
unknown
python
6 months ago
2.7 kB
3
Indexable
def update_appflow_flow(entity_name, mapping_list): if entity_name in table_with_timestamp_field: appflow_client = boto3.client('appflow') flow_name = f'{appflow_name_prefix}{entity_name}' flow_description = f'{appflow_description}{entity_name}' flow_trigger_type = 'Scheduled' trigger_properties = { "scheduleExpression": "rate(1 hour)", "dataPullMode": "Incremental", "scheduleStartTime": int(after_now_2min), "timezone": "Asia/Saigon", "scheduleOffset": 3600 } timestampField = table_with_timestamp_field[entity_name] # Create the flow appflow_client.update_flow( flowName=flow_name, description=flow_description, triggerConfig={ "triggerType": flow_trigger_type, "triggerProperties": { "Scheduled": trigger_properties } }, sourceFlowConfig={ "connectorType": "SAPOData", "connectorProfileName": sap_connector_profile_name, "sourceConnectorProperties": { "SAPOData": { "objectPath": f"/sap/opu/odata/sap/ZSD06_SRV/{entity_name}Set", "paginationConfig": {"maxPageSize": batch_size}, "parallelismConfig": {"maxParallelism": parallelism_process} } }, "incrementalPullConfig": { "datetimeTypeFieldName": timestampField, } }, destinationFlowConfigList=[ { "connectorType": "Redshift", "connectorProfileName": redshift_connector_profile_name, "destinationConnectorProperties": { "Redshift": { "object": f"poc.{entity_name.lower()}", "intermediateBucketName": intermediate_bucket_name, "errorHandlingConfig": { "failOnFirstDestinationError": True, "bucketPrefix": f"{error_handling_bucket_prefix}/{entity_name}", "bucketName": intermediate_bucket_name } } } } ], tasks=mapping_list ) print(f"AppFlow flow for entity {entity_name} created successfully") else: print( f"Entity {entity_name} does not have timestamp field, skip update flow")
Editor is loading...
Leave a Comment