Untitled

mail@pastecode.io avatar
unknown
plain_text
18 days ago
7.0 kB
2
Indexable
Never
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;

namespace CustomerMDMBatchProcessor
{
    public class CustomerMDMBatchProcessor
    {
        private static ILogger _log;
        private static string connstr = Environment.GetEnvironmentVariable("GTSAppDB");
        private static string storedProcedure = Environment.GetEnvironmentVariable("StoredProcedureName");
        private static bool logEnabled = Convert.ToBoolean(Environment.GetEnvironmentVariable("logenabled"));
        private static string eventhubConnStr_Log = Environment.GetEnvironmentVariable("EventHubConnectionString_Log");
        private static string eventhubName_Log = Environment.GetEnvironmentVariable("EventHubName_Log");
        private static EventHubProducerClient _producerClient = InitializeEventHubProducerClient();

        private static EventHubProducerClient ProducerClient
        {
            get
            {
                if (_producerClient == null || _producerClient.IsClosed)
                {
                    _producerClient = InitializeEventHubProducerClient();
                }
                return _producerClient;
            }
        }

        private static EventHubProducerClient InitializeEventHubProducerClient()
        {
            try
            {
                return new EventHubProducerClient(eventhubConnStr_Log, eventhubName_Log);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error creating EventHubProducerClient: {ex.Message}");
                throw;
            }
        }

        [FunctionName("CustomerMDMBatchProcessor")]
        public static async Task Run(
            [EventHubTrigger("%EventHub%", Connection = "EventHubConnectionString", ConsumerGroup = "%ConsumerGroup%")] EventData[] events,
            ILogger log)
        {
            _log = log;
            List<Tuple<string, string, string, int>> errorList = new List<Tuple<string, string, string, int>>();
            StringBuilder messages = new StringBuilder();
            string batchMessages = string.Empty;

            try
            {
                if (logEnabled)
                {
                    log.LogInformation($"Count of messages: {events.Length}");
                }

                foreach (EventData eventData in events)
                {
                    try
                    {
                        var jsonString = eventData.EventBody.ToString();
                        messages.Append(jsonString).Append(",");
                    }
                    catch (Exception ex)
                    {
                        log.LogError($"Exception while processing event: {ex.Message}");
                        errorList.Add(new Tuple<string, string, string, int>(eventData.EventBody.ToString(), ex.Message, "N", 104));
                    }
                }

                // Remove the trailing comma and wrap the messages in square brackets
                batchMessages = messages.Length > 0 ? $"[{messages.ToString().TrimEnd(',') }]" : string.Empty;

                // Store batch messages to the database if there are any
                if (!string.IsNullOrEmpty(batchMessages))
                {
                    Stopwatch sw = new Stopwatch();
                    sw.Start();

                    using (SqlConnection conn = new SqlConnection(connstr))
                    {
                        if (conn.State != ConnectionState.Open)
                        {
                            conn.Open();
                        }

                        using (SqlCommand cmd = conn.CreateCommand())
                        {
                            cmd.CommandText = storedProcedure;
                            cmd.CommandType = CommandType.StoredProcedure;
                            cmd.CommandTimeout = 120;
                            cmd.Parameters.AddWithValue("@EventMessage", batchMessages);

                            await cmd.ExecuteNonQueryAsync();
                        }
                    }

                    sw.Stop();
                    if (logEnabled)
                    {
                        log.LogInformation($"SQL execution time: {sw.ElapsedMilliseconds}ms");
                    }
                }

                // Handle errors if any
                if (errorList.Count > 0)
                {
                    await PushErrorMessagesToEventHub(errorList, log);
                }
            }
            catch (Exception ex)
            {
                log.LogError($"Exception in CustomerMDMBatchProcessor: {ex.Message}");
                List<Tuple<string, string, string, int>> el = new List<Tuple<string, string, string, int>>
                {
                    new Tuple<string, string, string, int>(string.Empty, $"Exception in CustomerMDMBatchProcessor: {ex.Message}", "N", 106)
                };
                await PushErrorMessagesToEventHub(el, log);
            }
        }

        private static async Task PushErrorMessagesToEventHub(List<Tuple<string, string, string, int>> errorList, ILogger log)
        {
            try
            {
                using (EventDataBatch eventBatch = await ProducerClient.CreateBatchAsync())
                {
                    foreach (var error in errorList)
                    {
                        var logMessage = new
                        {
                            Message = error.Item2,
                            EntityId = error.Item3,
                            State = error.Item4,
                            ReprocessFlag = error.Item3
                        };

                        var messageBody = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(logMessage));
                        if (!eventBatch.TryAdd(new EventData(messageBody)))
                        {
                            await ProducerClient.SendAsync(eventBatch);
                            eventBatch = await ProducerClient.CreateBatchAsync();

                            if (!eventBatch.TryAdd(new EventData(messageBody)))
                            {
                                log.LogError("Failed to add the error message to the new batch.");
                            }
                        }
                    }

                    if (eventBatch.Count > 0)
                    {
                        await ProducerClient.SendAsync(eventBatch);
                    }
                }
            }
            catch (Exception ex)
            {
                log.LogError($"Error occurred while pushing error messages to Event Hub: {ex.Message}");
            }
        }
    }
}
Leave a Comment