using System; using System.Collections.Generic; using System.Data; using System.Data.SqlClient; using System.Linq; using System.Text; using System.Threading.Tasks; using Microsoft.Azure.WebJobs; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer; namespace CustomerMDMBatchProcessor { public class CustomerMDMBatchProcessor { private static ILogger _log; private static string connstr = Environment.GetEnvironmentVariable("GTSAppDB"); private static string storedProcedure = Environment.GetEnvironmentVariable("StoredProcedureName"); private static bool logEnabled = Convert.ToBoolean(Environment.GetEnvironmentVariable("logenabled")); private static string eventhubConnStr_Log = Environment.GetEnvironmentVariable("EventHubConnectionString_Log"); private static string eventhubName_Log = Environment.GetEnvironmentVariable("EventHubName_Log"); private static EventHubProducerClient _producerClient = InitializeEventHubProducerClient(); private static EventHubProducerClient ProducerClient { get { if (_producerClient == null || _producerClient.IsClosed) { _producerClient = InitializeEventHubProducerClient(); } return _producerClient; } } private static EventHubProducerClient InitializeEventHubProducerClient() { try { return new EventHubProducerClient(eventhubConnStr_Log, eventhubName_Log); } catch (Exception ex) { Console.WriteLine($"Error creating EventHubProducerClient: {ex.Message}"); throw; } } [FunctionName("CustomerMDMBatchProcessor")] public static async Task Run( [EventHubTrigger("%EventHub%", Connection = "EventHubConnectionString", ConsumerGroup = "%ConsumerGroup%")] EventData[] events, ILogger log) { _log = log; List<Tuple<string, string, string, int>> errorList = new List<Tuple<string, string, string, int>>(); StringBuilder messages = new StringBuilder(); string batchMessages = string.Empty; try { if (logEnabled) { log.LogInformation($"Count of messages: {events.Length}"); } foreach (EventData eventData in events) { try { var jsonString = eventData.EventBody.ToString(); messages.Append(jsonString).Append(","); } catch (Exception ex) { log.LogError($"Exception while processing event: {ex.Message}"); errorList.Add(new Tuple<string, string, string, int>(eventData.EventBody.ToString(), ex.Message, "N", 104)); } } // Remove the trailing comma and wrap the messages in square brackets batchMessages = messages.Length > 0 ? $"[{messages.ToString().TrimEnd(',') }]" : string.Empty; // Store batch messages to the database if there are any if (!string.IsNullOrEmpty(batchMessages)) { Stopwatch sw = new Stopwatch(); sw.Start(); using (SqlConnection conn = new SqlConnection(connstr)) { if (conn.State != ConnectionState.Open) { conn.Open(); } using (SqlCommand cmd = conn.CreateCommand()) { cmd.CommandText = storedProcedure; cmd.CommandType = CommandType.StoredProcedure; cmd.CommandTimeout = 120; cmd.Parameters.AddWithValue("@EventMessage", batchMessages); await cmd.ExecuteNonQueryAsync(); } } sw.Stop(); if (logEnabled) { log.LogInformation($"SQL execution time: {sw.ElapsedMilliseconds}ms"); } } // Handle errors if any if (errorList.Count > 0) { await PushErrorMessagesToEventHub(errorList, log); } } catch (Exception ex) { log.LogError($"Exception in CustomerMDMBatchProcessor: {ex.Message}"); List<Tuple<string, string, string, int>> el = new List<Tuple<string, string, string, int>> { new Tuple<string, string, string, int>(string.Empty, $"Exception in CustomerMDMBatchProcessor: {ex.Message}", "N", 106) }; await PushErrorMessagesToEventHub(el, log); } } private static async Task PushErrorMessagesToEventHub(List<Tuple<string, string, string, int>> errorList, ILogger log) { try { using (EventDataBatch eventBatch = await ProducerClient.CreateBatchAsync()) { foreach (var error in errorList) { var logMessage = new { Message = error.Item2, EntityId = error.Item3, State = error.Item4, ReprocessFlag = error.Item3 }; var messageBody = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(logMessage)); if (!eventBatch.TryAdd(new EventData(messageBody))) { await ProducerClient.SendAsync(eventBatch); eventBatch = await ProducerClient.CreateBatchAsync(); if (!eventBatch.TryAdd(new EventData(messageBody))) { log.LogError("Failed to add the error message to the new batch."); } } } if (eventBatch.Count > 0) { await ProducerClient.SendAsync(eventBatch); } } } catch (Exception ex) { log.LogError($"Error occurred while pushing error messages to Event Hub: {ex.Message}"); } } } }
Leave a Comment