Untitled

 avatar
unknown
python
6 months ago
2.7 kB
3
Indexable
def update_appflow_flow(entity_name, mapping_list):
    if entity_name in table_with_timestamp_field:
        appflow_client = boto3.client('appflow')
        flow_name = f'{appflow_name_prefix}{entity_name}'
        flow_description = f'{appflow_description}{entity_name}'

        flow_trigger_type = 'Scheduled'
        trigger_properties = {
            "scheduleExpression": "rate(1 hour)",
            "dataPullMode": "Incremental",
            "scheduleStartTime": int(after_now_2min),
            "timezone": "Asia/Saigon",
            "scheduleOffset": 3600
        }

        timestampField = table_with_timestamp_field[entity_name]

        # Create the flow
        appflow_client.update_flow(
            flowName=flow_name,
            description=flow_description,
            triggerConfig={
                "triggerType": flow_trigger_type,
                "triggerProperties": {
                    "Scheduled": trigger_properties
                }
            },
            sourceFlowConfig={
                "connectorType": "SAPOData",
                "connectorProfileName": sap_connector_profile_name,
                "sourceConnectorProperties": {
                    "SAPOData": {
                        "objectPath": f"/sap/opu/odata/sap/ZSD06_SRV/{entity_name}Set",
                        "paginationConfig": {"maxPageSize": batch_size},
                        "parallelismConfig": {"maxParallelism": parallelism_process}
                    }
                },
                "incrementalPullConfig": {
                    "datetimeTypeFieldName": timestampField,

                }
            },

            destinationFlowConfigList=[
                {
                    "connectorType": "Redshift",
                    "connectorProfileName": redshift_connector_profile_name,
                    "destinationConnectorProperties": {
                        "Redshift": {
                            "object": f"poc.{entity_name.lower()}",
                            "intermediateBucketName": intermediate_bucket_name,
                            "errorHandlingConfig": {
                                "failOnFirstDestinationError": True,
                                "bucketPrefix": f"{error_handling_bucket_prefix}/{entity_name}",
                                "bucketName": intermediate_bucket_name
                            }
                        }
                    }
                }
            ],
            tasks=mapping_list
        )

        print(f"AppFlow flow for entity {entity_name} created successfully")

    else:
        print(
            f"Entity {entity_name} does not have timestamp field, skip update flow")
Editor is loading...
Leave a Comment