Untitled
unknown
plain_text
10 months ago
17 kB
7
Indexable
using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Microsoft.Azure.WebJobs; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; using Newtonsoft.Json; using System.Text; using System.Data; using System.Data.SqlClient; using Azure.Data.Tables; using Azure.Messaging.EventHubs; using System.Text.RegularExpressions; using Azure; using System.Net.Sockets; using System.Diagnostics; using System.Collections.Concurrent; using Microsoft.Azure.Amqp.Framing; using Azure.Messaging.EventHubs.Producer; using Microsoft.VisualBasic; namespace CustomerMDMBatchProcessor { public class CustomeMDMBatchProcessor { private static ILogger _log; private static string connstr = Environment.GetEnvironmentVariable("GTSAppDB"); private static string storedProcedure = Environment.GetEnvironmentVariable("StoredProcedureName"); private static bool logEnabled = Convert.ToBoolean(Environment.GetEnvironmentVariable("logenabled")); public static ConcurrentDictionary<string, string> eventdataMapping; private static string jsonErrorString = string.Empty; public static string eventhubConnStr_Log = Environment.GetEnvironmentVariable("EventHubConnectionString_Log"); public static string eventhubName_Log = Environment.GetEnvironmentVariable("EventHubName_Log"); private static EventHubProducerClient _producerClient = InitializeEventHubProducerClient(); private static EventHubProducerClient ProducerClient { get { if (_producerClient == null || _producerClient.IsClosed) { _producerClient = InitializeEventHubProducerClient(); } return _producerClient; } } private static EventHubProducerClient InitializeEventHubProducerClient() { try { return new EventHubProducerClient(eventhubConnStr_Log, eventhubName_Log); } catch { Console.WriteLine($"Error on creating EventHubProducerClient for EventHub | ConnString - {eventhubConnStr_Log}"); throw; } } [FunctionName("CustomerMDMBatchProcessor")] public static async Task Run([EventHubTrigger("%EventHub%", Connection = "EventHubConnectionString", ConsumerGroup = "%ConsumerGroup%")] EventData[] events, ILogger log) { _log = log; List<Tuple<string, string, string, int>> errorList = new List<Tuple<string, string, string, int>>(); eventdataMapping = new ConcurrentDictionary<string, string>(); try { if (logEnabled) { log.LogInformation($"Count of messages - {events.Count()}"); } StringBuilder messages = new StringBuilder(); string batchMessages = string.Empty; foreach (EventData eventData in events) { try { var messageBody = eventData.EventBody; var jsonString = messageBody.ToString(); jsonErrorString = jsonString; jsonString = Regex.Replace(jsonString, @"\\r\\n?|\\n", string.Empty); jsonString = jsonString.Replace(@"\", string.Empty); jsonString = jsonString.Replace("\"{", "{"); jsonString = jsonString.Replace("}\"", "}"); JObject result = ValidateJSON(jsonString, log); if (eventData.Properties.ContainsKey("ReProcessFlag")) { eventdataMapping.TryAdd(result["MessageId"].ToString(), eventData.Properties["ReProcessFlag"].ToString()); } var EntityId = result["EntityId"].ToString(); var GoldenRecordId = result["GoldenRecordId"].ToString(); var MessageId = result["MessageId"].ToString(); var MessageDate = result["MessageDate"].ToString(); var CustomerSourceCRMDetails = result["CustomerSourceCRMDetails"].ToString(); var CustomerSourceECCDetails = result["CustomerSourceECCDetails"].ToString(); var ReprocessFlag = result["ReProcessFlag"].ToString(); ReprocessFlag = string.IsNullOrEmpty(ReprocessFlag) ? "N" : ReprocessFlag; if (string.IsNullOrEmpty(GoldenRecordId)) { errorList.Add(new Tuple<string, string, string, int>(jsonErrorString, "Goldenrecordid is null", ReprocessFlag, 100)); continue; } if (string.IsNullOrEmpty(CustomerSourceCRMDetails) && string.IsNullOrEmpty(CustomerSourceECCDetails)) { errorList.Add(new Tuple<string, string, string, int>(jsonErrorString, "Customer Source CRM & ECC Target is nul", ReprocessFlag, 101)); continue; } if (string.IsNullOrEmpty(EntityId)) { errorList.Add(new Tuple<string, string, string, int>(jsonErrorString, "Servicing Branch & Nearest Dealer Not Present in Payload", ReprocessFlag, 102)); continue; } if (result["ReProcessFlag"].ToString() != "D" && result["ReProcessFlag"].ToString() != "A") { object outerJson = JsonConvert.DeserializeObject(jsonString); var outerjsonData = JObject.Parse(outerJson.ToString()); messages.Append(jsonString); messages.Append(","); } else { errorList.Add(new Tuple<string, string, string, int>(jsonErrorString, "Skipped", ReprocessFlag, 103)); } } catch (Exception ex) { log.LogError($"CustomerMDMDataBatchProcessor Recieving json one by one Exception::{ex}"); errorList.Add(new Tuple<string, string, string, int>(jsonErrorString, $"CustomerMDMDataBatchProcessor Recieving json one by one Exception::{ex}", "N", 104)); } } batchMessages = messages.ToString(); if (batchMessages.Length > 0) { batchMessages = batchMessages.Substring(0, batchMessages.Length - 1); batchMessages = '[' + batchMessages + ']'; Stopwatch sw = new Stopwatch(); sw.Start(); using (SqlConnection conn = new SqlConnection(connstr)) { if (conn.State != ConnectionState.Open) { conn.Open(); } using (SqlCommand cmd = conn.CreateCommand()) { cmd.CommandText = storedProcedure; cmd.CommandType = CommandType.StoredProcedure; cmd.CommandTimeout = 120; System.Data.SqlClient.SqlParameter sqlParameter; sqlParameter = cmd.Parameters.AddWithValue("@EventMessage", batchMessages); cmd.ExecuteNonQuery(); sw.Stop(); if (logEnabled) { log.LogInformation($"CustomerMDMBatchProcessor sql connection Data table:: ExecutionTime:{sw.ElapsedMilliseconds}ms"); } } } } if (errorList.Count > 0) { try { await PushErrorMessagesToEventHub(errorList, log).ConfigureAwait(false); } catch (AggregateException ex) { log.LogError($"CustomerMDMBatchProcessor PushErrorMessagesToEventHub Catch::error in parallel execution {ex}"); List<Tuple<string, string, string, int>> el = new List<Tuple<string, string, string, int>>(); el.Add(new Tuple<string, string, string, int>(jsonErrorString, $"CustomerMDMBatchProcessor::PushErrorMessagesToEventHub Catch::error in parallel execution {ex}", "N", 105)); await PushErrorMessagesToEventHub(el, log).ConfigureAwait(false); } } } catch (Exception ex) { log.LogError($"CustomerMDMDataBatchProcessor Exception :{ex.ToString()}"); List<Tuple<string, string, string, int>> el = new List<Tuple<string, string, string, int>>(); el.Add(new Tuple<string, string, string, int>(jsonErrorString, $"CustomerMDMDataBatchProcessor Exception :{ex}", "N", 106)); await PushErrorMessagesToEventHub(el, log).ConfigureAwait(false); } } /// <summary> /// Validates whether the input json is in correct format or not /// </summary> /// <param name="inputJson"></param> /// <param name="log"></param> /// <returns>true or false</returns> public static JObject ValidateJSON(string inputJson, ILogger log) { dynamic resultSet = new JObject(); try { var inputObject = JObject.Parse(inputJson); resultSet.MessageId = inputObject["id"]; resultSet.MessageDate = inputObject["time"]; resultSet.GoldenRecordId = inputObject["data"]["goldenRecordID"]; resultSet.EntityId = string.IsNullOrEmpty(inputObject["data"]["nearestDealer"].ToString()) ? (inputObject["data"].Contains("servicingBranch") ? inputObject["data"]["servicingBranch"] : string.Empty) : inputObject["data"]["nearestDealer"]; resultSet.CustomerSourceCRMDetails = inputObject["data"]["customerSourceSystemReference"][0]["target"].ToString() == "CRM" ? inputObject["data"]["customerSourceSystemReference"] : string.Empty; resultSet.CustomerSourceECCDetails = inputObject["data"]["customerSourceSystemReference"][0]["target"].ToString() == "ECC" ? inputObject["data"]["customerSourceSystemReference"] : string.Empty; resultSet.ReProcessFlag = ""; var deactivateFlag = inputObject["data"]["accountDeactivatedInd"] != null ? inputObject["data"]["accountDeactivatedInd"].ToString() : "Y"; if (deactivateFlag == "Y") { resultSet.ReProcessFlag = "D"; } var innerJsonRef = inputObject["data"]["accountType"]; List<string> accountType = new List<string>(); foreach (var jobject in innerJsonRef) { accountType.Add(jobject["value"].ToString()); } if (!accountType.Contains("Customer")) { resultSet.ReProcessFlag = "A"; } return resultSet; } catch (JsonReaderException ex) { log.LogError($"CustomerMDMDataBatchProcessor ValidateJSON::Invalid Json Block: {ex}"); List<Tuple<string, string, string, int>> el = new List<Tuple<string, string, string, int>>(); el.Add(new Tuple<string, string, string, int>(jsonErrorString, $"CustomerMDMDataBatchProcessor ValidateJSON::Invalid Json Block: {ex}", "N", 107)); PushErrorMessagesToEventHub(el, log); return resultSet; } } private static async Task PushErrorMessagesToEventHub(List<Tuple<string, string, string, int>> tablemsg, ILogger log) { try { List<LogMessage> logmessages = new List<LogMessage>(); var id = string.Empty; var entityid = string.Empty; var goldenrecordid = string.Empty; var msgdate = string.Empty; foreach (var item in tablemsg) { bool isdataExists = false; if (!string.IsNullOrEmpty(item.Item1)) { var jsonObj = item.Item1.ToString(); jsonObj = Regex.Replace(jsonObj, @"\\r\\n?|\\n", string.Empty); jsonObj = jsonObj.Replace(@"\", string.Empty); jsonObj = jsonObj.Replace("\"{", "{"); jsonObj = jsonObj.Replace("}\"", "}"); object outerJson = JsonConvert.DeserializeObject(jsonObj); var outerjsonData = JObject.Parse(outerJson.ToString()); id = outerjsonData["id"].ToString(); entityid = string.IsNullOrEmpty(outerjsonData["data"]["nearestDealer"].ToString()) ? (outerjsonData["data"].Contains("servicingBranch") ? outerjsonData["data"]["servicingBranch"].ToString() : string.Empty) : outerjsonData["data"]["nearestDealer"].ToString(); goldenrecordid = outerjsonData["data"]["goldenRecordID"].ToString(); msgdate = outerjsonData["time"].ToString(); if (eventdataMapping.ContainsKey(id)) { if (eventdataMapping.TryGetValue(id, out var val)) { if (val == "Y") { isdataExists = true; } } } } if (!isdataExists) { try { LogMessage logMessage = new LogMessage() { Message = item.Item2, EntityId = entityid, State = item.Item4, GoldenRecordId = goldenrecordid, MessageId = id, MessageDate = DateTimeOffset.Parse(msgdate), Type = "Error", ReprocessFlag = item.Item3 }; logmessages.Add(logMessage); } catch (Exception ex) { log.LogError($"CustomerMDMDataBatchProcessor::PushErrorMessagesToEventHub:{ex}"); } } } EventDataBatch eventBatch = await ProducerClient.CreateBatchAsync().ConfigureAwait(false); foreach (var message in logmessages) { if (!eventBatch.TryAdd(new Azure.Messaging.EventHubs.EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message))))) { try { await ProducerClient.SendAsync(eventBatch).ConfigureAwait(false); eventBatch = await ProducerClient.CreateBatchAsync().ConfigureAwait(false); eventBatch.TryAdd(new Azure.Messaging.EventHubs.EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)))); } catch (Exception ex) { log.LogError("Error occured while pushing to Eventhub", ex); } } } try { await ProducerClient.SendAsync(eventBatch).ConfigureAwait(false); } catch (Exception ex) { log.LogError($"Error occured while pushing to Eventhub", ex); } } catch (Exception ex) { log.LogError($"CustomerMDMDataBatchProcessor::PushErrorMessagesToTable:{ex}"); } } } }
Editor is loading...
Leave a Comment