Untitled
unknown
plain_text
a year ago
7.0 kB
12
Indexable
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
namespace CustomerMDMBatchProcessor
{
public class CustomerMDMBatchProcessor
{
private static ILogger _log;
private static string connstr = Environment.GetEnvironmentVariable("GTSAppDB");
private static string storedProcedure = Environment.GetEnvironmentVariable("StoredProcedureName");
private static bool logEnabled = Convert.ToBoolean(Environment.GetEnvironmentVariable("logenabled"));
private static string eventhubConnStr_Log = Environment.GetEnvironmentVariable("EventHubConnectionString_Log");
private static string eventhubName_Log = Environment.GetEnvironmentVariable("EventHubName_Log");
private static EventHubProducerClient _producerClient = InitializeEventHubProducerClient();
private static EventHubProducerClient ProducerClient
{
get
{
if (_producerClient == null || _producerClient.IsClosed)
{
_producerClient = InitializeEventHubProducerClient();
}
return _producerClient;
}
}
private static EventHubProducerClient InitializeEventHubProducerClient()
{
try
{
return new EventHubProducerClient(eventhubConnStr_Log, eventhubName_Log);
}
catch (Exception ex)
{
Console.WriteLine($"Error creating EventHubProducerClient: {ex.Message}");
throw;
}
}
[FunctionName("CustomerMDMBatchProcessor")]
public static async Task Run(
[EventHubTrigger("%EventHub%", Connection = "EventHubConnectionString", ConsumerGroup = "%ConsumerGroup%")] EventData[] events,
ILogger log)
{
_log = log;
List<Tuple<string, string, string, int>> errorList = new List<Tuple<string, string, string, int>>();
StringBuilder messages = new StringBuilder();
string batchMessages = string.Empty;
try
{
if (logEnabled)
{
log.LogInformation($"Count of messages: {events.Length}");
}
foreach (EventData eventData in events)
{
try
{
var jsonString = eventData.EventBody.ToString();
messages.Append(jsonString).Append(",");
}
catch (Exception ex)
{
log.LogError($"Exception while processing event: {ex.Message}");
errorList.Add(new Tuple<string, string, string, int>(eventData.EventBody.ToString(), ex.Message, "N", 104));
}
}
// Remove the trailing comma and wrap the messages in square brackets
batchMessages = messages.Length > 0 ? $"[{messages.ToString().TrimEnd(',') }]" : string.Empty;
// Store batch messages to the database if there are any
if (!string.IsNullOrEmpty(batchMessages))
{
Stopwatch sw = new Stopwatch();
sw.Start();
using (SqlConnection conn = new SqlConnection(connstr))
{
if (conn.State != ConnectionState.Open)
{
conn.Open();
}
using (SqlCommand cmd = conn.CreateCommand())
{
cmd.CommandText = storedProcedure;
cmd.CommandType = CommandType.StoredProcedure;
cmd.CommandTimeout = 120;
cmd.Parameters.AddWithValue("@EventMessage", batchMessages);
await cmd.ExecuteNonQueryAsync();
}
}
sw.Stop();
if (logEnabled)
{
log.LogInformation($"SQL execution time: {sw.ElapsedMilliseconds}ms");
}
}
// Handle errors if any
if (errorList.Count > 0)
{
await PushErrorMessagesToEventHub(errorList, log);
}
}
catch (Exception ex)
{
log.LogError($"Exception in CustomerMDMBatchProcessor: {ex.Message}");
List<Tuple<string, string, string, int>> el = new List<Tuple<string, string, string, int>>
{
new Tuple<string, string, string, int>(string.Empty, $"Exception in CustomerMDMBatchProcessor: {ex.Message}", "N", 106)
};
await PushErrorMessagesToEventHub(el, log);
}
}
private static async Task PushErrorMessagesToEventHub(List<Tuple<string, string, string, int>> errorList, ILogger log)
{
try
{
using (EventDataBatch eventBatch = await ProducerClient.CreateBatchAsync())
{
foreach (var error in errorList)
{
var logMessage = new
{
Message = error.Item2,
EntityId = error.Item3,
State = error.Item4,
ReprocessFlag = error.Item3
};
var messageBody = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(logMessage));
if (!eventBatch.TryAdd(new EventData(messageBody)))
{
await ProducerClient.SendAsync(eventBatch);
eventBatch = await ProducerClient.CreateBatchAsync();
if (!eventBatch.TryAdd(new EventData(messageBody)))
{
log.LogError("Failed to add the error message to the new batch.");
}
}
}
if (eventBatch.Count > 0)
{
await ProducerClient.SendAsync(eventBatch);
}
}
}
catch (Exception ex)
{
log.LogError($"Error occurred while pushing error messages to Event Hub: {ex.Message}");
}
}
}
}
Editor is loading...
Leave a Comment