Untitled
unknown
python
a year ago
2.7 kB
7
Indexable
def update_appflow_flow(entity_name, mapping_list):
if entity_name in table_with_timestamp_field:
appflow_client = boto3.client('appflow')
flow_name = f'{appflow_name_prefix}{entity_name}'
flow_description = f'{appflow_description}{entity_name}'
flow_trigger_type = 'Scheduled'
trigger_properties = {
"scheduleExpression": "rate(1 hour)",
"dataPullMode": "Incremental",
"scheduleStartTime": int(after_now_2min),
"timezone": "Asia/Saigon",
"scheduleOffset": 3600
}
timestampField = table_with_timestamp_field[entity_name]
# Create the flow
appflow_client.update_flow(
flowName=flow_name,
description=flow_description,
triggerConfig={
"triggerType": flow_trigger_type,
"triggerProperties": {
"Scheduled": trigger_properties
}
},
sourceFlowConfig={
"connectorType": "SAPOData",
"connectorProfileName": sap_connector_profile_name,
"sourceConnectorProperties": {
"SAPOData": {
"objectPath": f"/sap/opu/odata/sap/ZSD06_SRV/{entity_name}Set",
"paginationConfig": {"maxPageSize": batch_size},
"parallelismConfig": {"maxParallelism": parallelism_process}
}
},
"incrementalPullConfig": {
"datetimeTypeFieldName": timestampField,
}
},
destinationFlowConfigList=[
{
"connectorType": "Redshift",
"connectorProfileName": redshift_connector_profile_name,
"destinationConnectorProperties": {
"Redshift": {
"object": f"poc.{entity_name.lower()}",
"intermediateBucketName": intermediate_bucket_name,
"errorHandlingConfig": {
"failOnFirstDestinationError": True,
"bucketPrefix": f"{error_handling_bucket_prefix}/{entity_name}",
"bucketName": intermediate_bucket_name
}
}
}
}
],
tasks=mapping_list
)
print(f"AppFlow flow for entity {entity_name} created successfully")
else:
print(
f"Entity {entity_name} does not have timestamp field, skip update flow")Editor is loading...
Leave a Comment