Untitled
unknown
plain_text
a year ago
17 kB
11
Indexable
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json;
using System.Text;
using System.Data;
using System.Data.SqlClient;
using Azure.Data.Tables;
using Azure.Messaging.EventHubs;
using System.Text.RegularExpressions;
using Azure;
using System.Net.Sockets;
using System.Diagnostics;
using System.Collections.Concurrent;
using Microsoft.Azure.Amqp.Framing;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.VisualBasic;
namespace CustomerMDMBatchProcessor
{
public class CustomeMDMBatchProcessor
{
private static ILogger _log;
private static string connstr = Environment.GetEnvironmentVariable("GTSAppDB");
private static string storedProcedure = Environment.GetEnvironmentVariable("StoredProcedureName");
private static bool logEnabled = Convert.ToBoolean(Environment.GetEnvironmentVariable("logenabled"));
public static ConcurrentDictionary<string, string> eventdataMapping;
private static string jsonErrorString = string.Empty;
public static string eventhubConnStr_Log = Environment.GetEnvironmentVariable("EventHubConnectionString_Log");
public static string eventhubName_Log = Environment.GetEnvironmentVariable("EventHubName_Log");
private static EventHubProducerClient _producerClient = InitializeEventHubProducerClient();
private static EventHubProducerClient ProducerClient
{
get
{
if (_producerClient == null || _producerClient.IsClosed)
{
_producerClient = InitializeEventHubProducerClient();
}
return _producerClient;
}
}
private static EventHubProducerClient InitializeEventHubProducerClient()
{
try
{
return new EventHubProducerClient(eventhubConnStr_Log, eventhubName_Log);
}
catch
{
Console.WriteLine($"Error on creating EventHubProducerClient for EventHub | ConnString - {eventhubConnStr_Log}");
throw;
}
}
[FunctionName("CustomerMDMBatchProcessor")]
public static async Task Run([EventHubTrigger("%EventHub%", Connection = "EventHubConnectionString", ConsumerGroup = "%ConsumerGroup%")] EventData[] events, ILogger log)
{
_log = log;
List<Tuple<string, string, string, int>> errorList = new List<Tuple<string, string, string, int>>();
eventdataMapping = new ConcurrentDictionary<string, string>();
try
{
if (logEnabled)
{
log.LogInformation($"Count of messages - {events.Count()}");
}
StringBuilder messages = new StringBuilder();
string batchMessages = string.Empty;
foreach (EventData eventData in events)
{
try
{
var messageBody = eventData.EventBody;
var jsonString = messageBody.ToString();
jsonErrorString = jsonString;
jsonString = Regex.Replace(jsonString, @"\\r\\n?|\\n", string.Empty);
jsonString = jsonString.Replace(@"\", string.Empty);
jsonString = jsonString.Replace("\"{", "{");
jsonString = jsonString.Replace("}\"", "}");
JObject result = ValidateJSON(jsonString, log);
if (eventData.Properties.ContainsKey("ReProcessFlag"))
{
eventdataMapping.TryAdd(result["MessageId"].ToString(), eventData.Properties["ReProcessFlag"].ToString());
}
var EntityId = result["EntityId"].ToString();
var GoldenRecordId = result["GoldenRecordId"].ToString();
var MessageId = result["MessageId"].ToString();
var MessageDate = result["MessageDate"].ToString();
var CustomerSourceCRMDetails = result["CustomerSourceCRMDetails"].ToString();
var CustomerSourceECCDetails = result["CustomerSourceECCDetails"].ToString();
var ReprocessFlag = result["ReProcessFlag"].ToString();
ReprocessFlag = string.IsNullOrEmpty(ReprocessFlag) ? "N" : ReprocessFlag;
if (string.IsNullOrEmpty(GoldenRecordId))
{
errorList.Add(new Tuple<string, string, string, int>(jsonErrorString, "Goldenrecordid is null", ReprocessFlag, 100));
continue;
}
if (string.IsNullOrEmpty(CustomerSourceCRMDetails) && string.IsNullOrEmpty(CustomerSourceECCDetails))
{
errorList.Add(new Tuple<string, string, string, int>(jsonErrorString, "Customer Source CRM & ECC Target is nul", ReprocessFlag, 101));
continue;
}
if (string.IsNullOrEmpty(EntityId))
{
errorList.Add(new Tuple<string, string, string, int>(jsonErrorString, "Servicing Branch & Nearest Dealer Not Present in Payload", ReprocessFlag, 102));
continue;
}
if (result["ReProcessFlag"].ToString() != "D" && result["ReProcessFlag"].ToString() != "A")
{
object outerJson = JsonConvert.DeserializeObject(jsonString);
var outerjsonData = JObject.Parse(outerJson.ToString());
messages.Append(jsonString);
messages.Append(",");
}
else
{
errorList.Add(new Tuple<string, string, string, int>(jsonErrorString, "Skipped", ReprocessFlag, 103));
}
}
catch (Exception ex)
{
log.LogError($"CustomerMDMDataBatchProcessor Recieving json one by one Exception::{ex}");
errorList.Add(new Tuple<string, string, string, int>(jsonErrorString, $"CustomerMDMDataBatchProcessor Recieving json one by one Exception::{ex}", "N", 104));
}
}
batchMessages = messages.ToString();
if (batchMessages.Length > 0)
{
batchMessages = batchMessages.Substring(0, batchMessages.Length - 1);
batchMessages = '[' + batchMessages + ']';
Stopwatch sw = new Stopwatch();
sw.Start();
using (SqlConnection conn = new SqlConnection(connstr))
{
if (conn.State != ConnectionState.Open)
{
conn.Open();
}
using (SqlCommand cmd = conn.CreateCommand())
{
cmd.CommandText = storedProcedure;
cmd.CommandType = CommandType.StoredProcedure;
cmd.CommandTimeout = 120;
System.Data.SqlClient.SqlParameter sqlParameter;
sqlParameter = cmd.Parameters.AddWithValue("@EventMessage", batchMessages);
cmd.ExecuteNonQuery();
sw.Stop();
if (logEnabled)
{
log.LogInformation($"CustomerMDMBatchProcessor sql connection Data table:: ExecutionTime:{sw.ElapsedMilliseconds}ms");
}
}
}
}
if (errorList.Count > 0)
{
try
{
await PushErrorMessagesToEventHub(errorList, log).ConfigureAwait(false);
}
catch (AggregateException ex)
{
log.LogError($"CustomerMDMBatchProcessor PushErrorMessagesToEventHub Catch::error in parallel execution {ex}");
List<Tuple<string, string, string, int>> el = new List<Tuple<string, string, string, int>>();
el.Add(new Tuple<string, string, string, int>(jsonErrorString, $"CustomerMDMBatchProcessor::PushErrorMessagesToEventHub Catch::error in parallel execution {ex}", "N", 105));
await PushErrorMessagesToEventHub(el, log).ConfigureAwait(false);
}
}
}
catch (Exception ex)
{
log.LogError($"CustomerMDMDataBatchProcessor Exception :{ex.ToString()}");
List<Tuple<string, string, string, int>> el = new List<Tuple<string, string, string, int>>();
el.Add(new Tuple<string, string, string, int>(jsonErrorString, $"CustomerMDMDataBatchProcessor Exception :{ex}", "N", 106));
await PushErrorMessagesToEventHub(el, log).ConfigureAwait(false);
}
}
/// <summary>
/// Validates whether the input json is in correct format or not
/// </summary>
/// <param name="inputJson"></param>
/// <param name="log"></param>
/// <returns>true or false</returns>
public static JObject ValidateJSON(string inputJson, ILogger log)
{
dynamic resultSet = new JObject();
try
{
var inputObject = JObject.Parse(inputJson);
resultSet.MessageId = inputObject["id"];
resultSet.MessageDate = inputObject["time"];
resultSet.GoldenRecordId = inputObject["data"]["goldenRecordID"];
resultSet.EntityId = string.IsNullOrEmpty(inputObject["data"]["nearestDealer"].ToString()) ? (inputObject["data"].Contains("servicingBranch") ? inputObject["data"]["servicingBranch"] : string.Empty) : inputObject["data"]["nearestDealer"];
resultSet.CustomerSourceCRMDetails = inputObject["data"]["customerSourceSystemReference"][0]["target"].ToString() == "CRM" ? inputObject["data"]["customerSourceSystemReference"] : string.Empty;
resultSet.CustomerSourceECCDetails = inputObject["data"]["customerSourceSystemReference"][0]["target"].ToString() == "ECC" ? inputObject["data"]["customerSourceSystemReference"] : string.Empty;
resultSet.ReProcessFlag = "";
var deactivateFlag = inputObject["data"]["accountDeactivatedInd"] != null ? inputObject["data"]["accountDeactivatedInd"].ToString() : "Y";
if (deactivateFlag == "Y")
{
resultSet.ReProcessFlag = "D";
}
var innerJsonRef = inputObject["data"]["accountType"];
List<string> accountType = new List<string>();
foreach (var jobject in innerJsonRef)
{
accountType.Add(jobject["value"].ToString());
}
if (!accountType.Contains("Customer"))
{
resultSet.ReProcessFlag = "A";
}
return resultSet;
}
catch (JsonReaderException ex)
{
log.LogError($"CustomerMDMDataBatchProcessor ValidateJSON::Invalid Json Block: {ex}");
List<Tuple<string, string, string, int>> el = new List<Tuple<string, string, string, int>>();
el.Add(new Tuple<string, string, string, int>(jsonErrorString, $"CustomerMDMDataBatchProcessor ValidateJSON::Invalid Json Block: {ex}", "N", 107));
PushErrorMessagesToEventHub(el, log);
return resultSet;
}
}
private static async Task PushErrorMessagesToEventHub(List<Tuple<string, string, string, int>> tablemsg, ILogger log)
{
try
{
List<LogMessage> logmessages = new List<LogMessage>();
var id = string.Empty;
var entityid = string.Empty;
var goldenrecordid = string.Empty;
var msgdate = string.Empty;
foreach (var item in tablemsg)
{
bool isdataExists = false;
if (!string.IsNullOrEmpty(item.Item1))
{
var jsonObj = item.Item1.ToString();
jsonObj = Regex.Replace(jsonObj, @"\\r\\n?|\\n", string.Empty);
jsonObj = jsonObj.Replace(@"\", string.Empty);
jsonObj = jsonObj.Replace("\"{", "{");
jsonObj = jsonObj.Replace("}\"", "}");
object outerJson = JsonConvert.DeserializeObject(jsonObj);
var outerjsonData = JObject.Parse(outerJson.ToString());
id = outerjsonData["id"].ToString();
entityid = string.IsNullOrEmpty(outerjsonData["data"]["nearestDealer"].ToString()) ? (outerjsonData["data"].Contains("servicingBranch") ? outerjsonData["data"]["servicingBranch"].ToString() : string.Empty) : outerjsonData["data"]["nearestDealer"].ToString();
goldenrecordid = outerjsonData["data"]["goldenRecordID"].ToString();
msgdate = outerjsonData["time"].ToString();
if (eventdataMapping.ContainsKey(id))
{
if (eventdataMapping.TryGetValue(id, out var val))
{
if (val == "Y")
{
isdataExists = true;
}
}
}
}
if (!isdataExists)
{
try
{
LogMessage logMessage = new LogMessage()
{
Message = item.Item2,
EntityId = entityid,
State = item.Item4,
GoldenRecordId = goldenrecordid,
MessageId = id,
MessageDate = DateTimeOffset.Parse(msgdate),
Type = "Error",
ReprocessFlag = item.Item3
};
logmessages.Add(logMessage);
}
catch (Exception ex)
{
log.LogError($"CustomerMDMDataBatchProcessor::PushErrorMessagesToEventHub:{ex}");
}
}
}
EventDataBatch eventBatch = await ProducerClient.CreateBatchAsync().ConfigureAwait(false);
foreach (var message in logmessages)
{
if (!eventBatch.TryAdd(new Azure.Messaging.EventHubs.EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)))))
{
try
{
await ProducerClient.SendAsync(eventBatch).ConfigureAwait(false);
eventBatch = await ProducerClient.CreateBatchAsync().ConfigureAwait(false);
eventBatch.TryAdd(new Azure.Messaging.EventHubs.EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message))));
}
catch (Exception ex)
{
log.LogError("Error occured while pushing to Eventhub", ex);
}
}
}
try
{
await ProducerClient.SendAsync(eventBatch).ConfigureAwait(false);
}
catch (Exception ex)
{
log.LogError($"Error occured while pushing to Eventhub", ex);
}
}
catch (Exception ex)
{
log.LogError($"CustomerMDMDataBatchProcessor::PushErrorMessagesToTable:{ex}");
}
}
}
}
Editor is loading...
Leave a Comment