Untitled

 avatar
unknown
plain_text
10 months ago
17 kB
7
Indexable
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json;
using System.Text;
using System.Data;
using System.Data.SqlClient;
using Azure.Data.Tables;
using Azure.Messaging.EventHubs;
using System.Text.RegularExpressions;
using Azure;
using System.Net.Sockets;
using System.Diagnostics;
using System.Collections.Concurrent;
using Microsoft.Azure.Amqp.Framing;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.VisualBasic;

namespace CustomerMDMBatchProcessor
{
    public class CustomeMDMBatchProcessor
    {
        private static ILogger _log;
        private static string connstr = Environment.GetEnvironmentVariable("GTSAppDB");
        private static string storedProcedure = Environment.GetEnvironmentVariable("StoredProcedureName");
        private static bool logEnabled = Convert.ToBoolean(Environment.GetEnvironmentVariable("logenabled"));
        public static ConcurrentDictionary<string, string> eventdataMapping;
        private static string jsonErrorString = string.Empty;
        public static string eventhubConnStr_Log = Environment.GetEnvironmentVariable("EventHubConnectionString_Log");
        public static string eventhubName_Log = Environment.GetEnvironmentVariable("EventHubName_Log");
        private static EventHubProducerClient _producerClient = InitializeEventHubProducerClient();

        private static EventHubProducerClient ProducerClient
        {
            get
            {
                if (_producerClient == null || _producerClient.IsClosed)
                {
                    _producerClient = InitializeEventHubProducerClient();
                }
                return _producerClient;
            }
        }

        private static EventHubProducerClient InitializeEventHubProducerClient()
        {
            try
            {
                return new EventHubProducerClient(eventhubConnStr_Log, eventhubName_Log);
            }
            catch
            {
                Console.WriteLine($"Error on creating EventHubProducerClient for EventHub | ConnString - {eventhubConnStr_Log}");
                throw;
            }
        }

        [FunctionName("CustomerMDMBatchProcessor")]
        public static async Task Run([EventHubTrigger("%EventHub%", Connection = "EventHubConnectionString", ConsumerGroup = "%ConsumerGroup%")] EventData[] events, ILogger log)
        {
            _log = log;
            List<Tuple<string, string, string, int>> errorList = new List<Tuple<string, string, string, int>>();
            eventdataMapping = new ConcurrentDictionary<string, string>();
            try
            {
                if (logEnabled)
                {
                    log.LogInformation($"Count of messages - {events.Count()}");
                }
                StringBuilder messages = new StringBuilder();
                string batchMessages = string.Empty;

                foreach (EventData eventData in events)
                {

                    try
                    {
                        var messageBody = eventData.EventBody;
                        var jsonString = messageBody.ToString();

                        jsonErrorString = jsonString;

                        jsonString = Regex.Replace(jsonString, @"\\r\\n?|\\n", string.Empty);
                        jsonString = jsonString.Replace(@"\", string.Empty);
                        jsonString = jsonString.Replace("\"{", "{");
                        jsonString = jsonString.Replace("}\"", "}");
                        JObject result = ValidateJSON(jsonString, log);

                        if (eventData.Properties.ContainsKey("ReProcessFlag"))
                        {
                            eventdataMapping.TryAdd(result["MessageId"].ToString(), eventData.Properties["ReProcessFlag"].ToString());
                        }

                        var EntityId = result["EntityId"].ToString();
                        var GoldenRecordId = result["GoldenRecordId"].ToString();
                        var MessageId = result["MessageId"].ToString();
                        var MessageDate = result["MessageDate"].ToString();
                        var CustomerSourceCRMDetails = result["CustomerSourceCRMDetails"].ToString();
                        var CustomerSourceECCDetails = result["CustomerSourceECCDetails"].ToString();

                        var ReprocessFlag = result["ReProcessFlag"].ToString();
                        ReprocessFlag = string.IsNullOrEmpty(ReprocessFlag) ? "N" : ReprocessFlag;

                        if (string.IsNullOrEmpty(GoldenRecordId))
                        {
                            errorList.Add(new Tuple<string, string, string, int>(jsonErrorString, "Goldenrecordid is null", ReprocessFlag, 100));
                            continue;
                        }

                        if (string.IsNullOrEmpty(CustomerSourceCRMDetails) && string.IsNullOrEmpty(CustomerSourceECCDetails))
                        {
                            errorList.Add(new Tuple<string, string, string, int>(jsonErrorString, "Customer Source CRM & ECC Target is nul", ReprocessFlag, 101));
                            continue;
                        }

                        if (string.IsNullOrEmpty(EntityId))
                        {
                            errorList.Add(new Tuple<string, string, string, int>(jsonErrorString, "Servicing Branch & Nearest Dealer Not Present in Payload", ReprocessFlag, 102));
                            continue;
                        }

                        if (result["ReProcessFlag"].ToString() != "D" && result["ReProcessFlag"].ToString() != "A")
                        {
                            object outerJson = JsonConvert.DeserializeObject(jsonString);
                            var outerjsonData = JObject.Parse(outerJson.ToString());
                            messages.Append(jsonString);
                            messages.Append(",");
                        }
                        else
                        {
                            errorList.Add(new Tuple<string, string, string, int>(jsonErrorString, "Skipped", ReprocessFlag, 103));
                        }
                    }
                    catch (Exception ex)
                    {
                        log.LogError($"CustomerMDMDataBatchProcessor Recieving json one by one Exception::{ex}");
                        errorList.Add(new Tuple<string, string, string, int>(jsonErrorString, $"CustomerMDMDataBatchProcessor Recieving json one by one Exception::{ex}", "N", 104));
                    }
                }
                batchMessages = messages.ToString();
                if (batchMessages.Length > 0)
                {
                    batchMessages = batchMessages.Substring(0, batchMessages.Length - 1);
                    batchMessages = '[' + batchMessages + ']';
                    Stopwatch sw = new Stopwatch();
                    sw.Start();
                    using (SqlConnection conn = new SqlConnection(connstr))
                    {
                        if (conn.State != ConnectionState.Open)
                        {
                            conn.Open();
                        }

                        using (SqlCommand cmd = conn.CreateCommand())
                        {
                            cmd.CommandText = storedProcedure;
                            cmd.CommandType = CommandType.StoredProcedure;
                            cmd.CommandTimeout = 120;
                            System.Data.SqlClient.SqlParameter sqlParameter;
                            sqlParameter = cmd.Parameters.AddWithValue("@EventMessage", batchMessages);

                            cmd.ExecuteNonQuery();

                            sw.Stop();
                            if (logEnabled)
                            {
                                log.LogInformation($"CustomerMDMBatchProcessor sql connection Data table:: ExecutionTime:{sw.ElapsedMilliseconds}ms");
                            }

                        }
                    }
                }

                if (errorList.Count > 0)
                {
                    try
                    {
                        await PushErrorMessagesToEventHub(errorList, log).ConfigureAwait(false);
                    }
                    catch (AggregateException ex)
                    {
                        log.LogError($"CustomerMDMBatchProcessor PushErrorMessagesToEventHub Catch::error in parallel execution {ex}");
                        List<Tuple<string, string, string, int>> el = new List<Tuple<string, string, string, int>>();
                        el.Add(new Tuple<string, string, string, int>(jsonErrorString, $"CustomerMDMBatchProcessor::PushErrorMessagesToEventHub Catch::error in parallel execution {ex}", "N", 105));
                        await PushErrorMessagesToEventHub(el, log).ConfigureAwait(false);
                    }

                }
            }
            catch (Exception ex)
            {
                log.LogError($"CustomerMDMDataBatchProcessor Exception :{ex.ToString()}");
                List<Tuple<string, string, string, int>> el = new List<Tuple<string, string, string, int>>();
                el.Add(new Tuple<string, string, string, int>(jsonErrorString, $"CustomerMDMDataBatchProcessor Exception :{ex}", "N", 106));
                await PushErrorMessagesToEventHub(el, log).ConfigureAwait(false);
            }
        }

        /// <summary>
        /// Validates whether the input json is in correct format or not
        /// </summary>
        /// <param name="inputJson"></param>
        /// <param name="log"></param>
        /// <returns>true or false</returns>
        public static JObject ValidateJSON(string inputJson, ILogger log)
        {
            dynamic resultSet = new JObject();
            try
            {
                var inputObject = JObject.Parse(inputJson);
                resultSet.MessageId = inputObject["id"];
                resultSet.MessageDate = inputObject["time"];
                resultSet.GoldenRecordId = inputObject["data"]["goldenRecordID"];

                resultSet.EntityId = string.IsNullOrEmpty(inputObject["data"]["nearestDealer"].ToString()) ? (inputObject["data"].Contains("servicingBranch") ? inputObject["data"]["servicingBranch"] : string.Empty) : inputObject["data"]["nearestDealer"];
                resultSet.CustomerSourceCRMDetails = inputObject["data"]["customerSourceSystemReference"][0]["target"].ToString() == "CRM" ? inputObject["data"]["customerSourceSystemReference"] : string.Empty;
                resultSet.CustomerSourceECCDetails = inputObject["data"]["customerSourceSystemReference"][0]["target"].ToString() == "ECC" ? inputObject["data"]["customerSourceSystemReference"] : string.Empty;

                resultSet.ReProcessFlag = "";
                var deactivateFlag = inputObject["data"]["accountDeactivatedInd"] != null ? inputObject["data"]["accountDeactivatedInd"].ToString() : "Y";
                if (deactivateFlag == "Y")
                {
                    resultSet.ReProcessFlag = "D";
                }
                var innerJsonRef = inputObject["data"]["accountType"];
                List<string> accountType = new List<string>();
                foreach (var jobject in innerJsonRef)
                {
                    accountType.Add(jobject["value"].ToString());
                }
                if (!accountType.Contains("Customer"))
                {
                    resultSet.ReProcessFlag = "A";
                }
                return resultSet;
            }
            catch (JsonReaderException ex)
            {
                log.LogError($"CustomerMDMDataBatchProcessor ValidateJSON::Invalid Json Block: {ex}");
                List<Tuple<string, string, string, int>> el = new List<Tuple<string, string, string, int>>();
                el.Add(new Tuple<string, string, string, int>(jsonErrorString, $"CustomerMDMDataBatchProcessor ValidateJSON::Invalid Json Block: {ex}", "N", 107));
                PushErrorMessagesToEventHub(el, log);
                return resultSet;
            }
        }

        private static async Task PushErrorMessagesToEventHub(List<Tuple<string, string, string, int>> tablemsg, ILogger log)
        {
            try
            {
                List<LogMessage> logmessages = new List<LogMessage>();

                var id = string.Empty;
                var entityid = string.Empty;
                var goldenrecordid = string.Empty;
                var msgdate = string.Empty;
                foreach (var item in tablemsg)
                {
                    bool isdataExists = false;
                    if (!string.IsNullOrEmpty(item.Item1))
                    {
                        var jsonObj = item.Item1.ToString();

                        jsonObj = Regex.Replace(jsonObj, @"\\r\\n?|\\n", string.Empty);
                        jsonObj = jsonObj.Replace(@"\", string.Empty);
                        jsonObj = jsonObj.Replace("\"{", "{");
                        jsonObj = jsonObj.Replace("}\"", "}");
                        object outerJson = JsonConvert.DeserializeObject(jsonObj);
                        var outerjsonData = JObject.Parse(outerJson.ToString());
                        id = outerjsonData["id"].ToString();
                        entityid = string.IsNullOrEmpty(outerjsonData["data"]["nearestDealer"].ToString()) ? (outerjsonData["data"].Contains("servicingBranch") ? outerjsonData["data"]["servicingBranch"].ToString() : string.Empty) : outerjsonData["data"]["nearestDealer"].ToString();
                        goldenrecordid = outerjsonData["data"]["goldenRecordID"].ToString();
                        msgdate = outerjsonData["time"].ToString();
                        if (eventdataMapping.ContainsKey(id))
                        {
                            if (eventdataMapping.TryGetValue(id, out var val))
                            {
                                if (val == "Y")
                                {
                                    isdataExists = true;
                                }
                            }
                        }
                    }

                    if (!isdataExists)
                    {

                        try
                        {
                            LogMessage logMessage = new LogMessage()
                            {
                                Message = item.Item2,
                                EntityId = entityid,
                                State = item.Item4,
                                GoldenRecordId = goldenrecordid,
                                MessageId = id,
                                MessageDate = DateTimeOffset.Parse(msgdate),
                                Type = "Error",
                                ReprocessFlag = item.Item3
                            };

                            logmessages.Add(logMessage);
                        }
                        catch (Exception ex)
                        {
                            log.LogError($"CustomerMDMDataBatchProcessor::PushErrorMessagesToEventHub:{ex}");
                        }
                    }
                }

                EventDataBatch eventBatch = await ProducerClient.CreateBatchAsync().ConfigureAwait(false);
                foreach (var message in logmessages)
                {
                    if (!eventBatch.TryAdd(new Azure.Messaging.EventHubs.EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)))))
                    {
                        try
                        {
                            await ProducerClient.SendAsync(eventBatch).ConfigureAwait(false);
                            eventBatch = await ProducerClient.CreateBatchAsync().ConfigureAwait(false);
                            eventBatch.TryAdd(new Azure.Messaging.EventHubs.EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message))));
                        }
                        catch (Exception ex)
                        {
                            log.LogError("Error occured while pushing to Eventhub", ex);
                        }
                    }
                }

                try
                {
                    await ProducerClient.SendAsync(eventBatch).ConfigureAwait(false);
                }
                catch (Exception ex)
                {
                    log.LogError($"Error occured while pushing to Eventhub", ex);
                }

            }

            catch (Exception ex)
            {
                log.LogError($"CustomerMDMDataBatchProcessor::PushErrorMessagesToTable:{ex}");
            }
        }

    }
}
Editor is loading...
Leave a Comment