Untitled
unknown
plain_text
2 years ago
2.4 kB
8
Indexable
def update_polling_counts_for_batch(client, aggregated_values, timestamp):
table_name = os.getenv(MEGLO_DEVICE_HEALTH_TABLE_NAME, default=None)
polling_type = os.getenv(LAMBDA_TYPE, default=None)
current_month_attributes = __get_current_month_ddb_attributes()
polling_attribute = current_month_attributes[POLLING_COUNT]
err_attribute = current_month_attributes[POLLING_ERROR]
conn_err_attribute = current_month_attributes[CONNECTION_ERROR]
# these are all Counter type
aggregated_polling_count = aggregated_values[POLLING_COUNT]
aggregated_polling_errors = aggregated_values[POLLING_ERROR]
aggregated_connection_errors = aggregated_values[CONNECTION_ERROR]
data_centers = aggregated_polling_count.keys() # aggregated_polling_count contains each data center processed
statements = []
for data_center in data_centers:
polling_count = aggregated_polling_count[data_center]
polling_errors = aggregated_polling_errors[data_center]
connection_errors = aggregated_connection_errors[data_center]
statement = f"""
UPDATE {table_name}
SET
{polling_attribute} = {polling_attribute} + :count,
{err_attribute} = {err_attribute} + :errors,
{conn_err_attribute} = {conn_err_attribute} + :conn_errors,
{POLLING_COUNT_CURRENT_MIN} = :count,
{ERROR_COUNT_CURRENT_MIN} = :errors,
{LAST_PROCESSED} = :last_processed
WHERE
{DATA_CENTER} = :data_center AND
{POLLING_TYPE} = :polling_type
"""
parameters = [
{
"M":
{
"count": {"S": str(polling_count) },
"errors": {'N': str(polling_errors) },
"conn_errors": {'N': str(connection_errors) },
"last_processed": {'N': str(timestamp) },
"data_center": {'S': data_center.lower() },
"polling_type": {'S': polling_type.lower() }
}
}
]
statements.append({'Statement': statement, 'Parameters': parameters, 'ConsistentRead': True})
# Execute batch statements
print("Statements are:", statements)
response = client.batch_execute_statement(Statements=statements)
return responseEditor is loading...