Untitled

 avatar
unknown
plain_text
2 years ago
2.4 kB
5
Indexable
def update_polling_counts_for_batch(client, aggregated_values, timestamp):
    table_name = os.getenv(MEGLO_DEVICE_HEALTH_TABLE_NAME, default=None)
    polling_type = os.getenv(LAMBDA_TYPE, default=None)
    current_month_attributes = __get_current_month_ddb_attributes()
    polling_attribute = current_month_attributes[POLLING_COUNT]
    err_attribute = current_month_attributes[POLLING_ERROR]
    conn_err_attribute = current_month_attributes[CONNECTION_ERROR]
    # these are all Counter type
    aggregated_polling_count = aggregated_values[POLLING_COUNT]
    aggregated_polling_errors = aggregated_values[POLLING_ERROR]
    aggregated_connection_errors = aggregated_values[CONNECTION_ERROR]
    data_centers = aggregated_polling_count.keys()  # aggregated_polling_count contains each data center processed
    statements = []

    for data_center in data_centers:
        polling_count = aggregated_polling_count[data_center]
        polling_errors = aggregated_polling_errors[data_center]
        connection_errors = aggregated_connection_errors[data_center]

        statement = f"""
            UPDATE {table_name}
            SET
                {polling_attribute} = {polling_attribute} + :count,
                {err_attribute} = {err_attribute} + :errors,
                {conn_err_attribute} = {conn_err_attribute} + :conn_errors,
                {POLLING_COUNT_CURRENT_MIN} = :count,
                {ERROR_COUNT_CURRENT_MIN} = :errors,
                {LAST_PROCESSED} = :last_processed
            WHERE
                {DATA_CENTER} = :data_center AND
                {POLLING_TYPE} = :polling_type
        """

        parameters = [
        {
            "M":
             {
                  "count": {"S": str(polling_count) },
                  "errors": {'N': str(polling_errors) },
                  "conn_errors": {'N': str(connection_errors) },
                  "last_processed": {'N': str(timestamp) },
                  "data_center": {'S': data_center.lower() },
                  "polling_type": {'S': polling_type.lower() }

             }

        }
        ]

        statements.append({'Statement': statement, 'Parameters': parameters, 'ConsistentRead': True})

    # Execute batch statements
    print("Statements are:", statements)
    response = client.batch_execute_statement(Statements=statements)

    return response
Editor is loading...