2 months ago
28 kB
require("dotenv").config(); const express = require("express"); const router = express.Router(); const Multer = require("multer"); const uuid = require("uuid"); const uuidv1 = uuid.v1; const { Storage } = require("@google-cloud/storage"); const { default: mongoose } = require("mongoose"); const { config } = require("dotenv"); const sharp = require("sharp"); const Aerospike = require("aerospike"); const AO = Aerospike.operations; const exp = Aerospike.exp; const { getAerospikeClient } = require("../../../../databases/aerospike"); const maps = Aerospike.maps; const { removeStopwords, eng, fra } = require("stopword"); const now = require("nano-time"); const { uploadImage } = require("../../../helper/uploadImage"); var createError = require("http-errors"); const batchType = Aerospike.batchType; const { sendActivityMessage } = require("../../../../kafka/producerActi"); const { sendNotificationMessage } = require("../../../../kafka/producerNoti"); const redisHelper = require("../../../helper/redisHelper"); //aerospike helper const AsHelper = require("../../../helper/aerospikeQueriesHelper"); const createHttpError = require("http-errors"); class BlockPostEventModel { // async fetchAllEvents(id, page, limit) { // return new Promise(async (resolve, reject) => { // const client = await getAerospikeClient(); // let query = client.query( // process.env.CLUSTER_NAME, // process.env.SET_BLOCK_EVENT // ); // const tempBin = "ExpVar"; // query.where(Aerospike.filter.equal("e_id", id)); // // const queryPolicy = new Aerospike.QueryPolicy({}); // const stream = query.foreach(); // var temp = []; // stream.on("data", async function (record) { // temp.push(record.bins); // }); // stream.on("end", async function (record) { // // var page = req.query.page || 1; // // var limit = req.query.limit || 3; // var start = (page - 1) * limit; // var end = page * limit; // var count = 0; // temp = temp.splice(start, end); // resolve(temp); // }); // }); // } fetchAllEvents = async (id, qPage, qLimit) => { const page = Number(qPage) || 0; const limit = Number(qLimit) || 10; const start = page * limit; const end = start + limit; return new Promise((resolve, reject) => { getAerospikeClient() .then((client) => { const query = client.query( process.env.CLUSTER_NAME, process.env.SET_BLOCK_EVENT ); query.where(Aerospike.filter.equal("g_id", id)); const stream = query.foreach(); const arr = []; stream.on("data", (data) => { const now = new Date().getTime(); const difference = new Date(data.bins.e_d).getTime() - now; if (difference < 0) return; arr.push(data.bins); }); stream.on("error", reject); stream.on("end", () => { const sorted = arr.sort((a, b) => a.c_t - b.c_t); const results = sorted.slice(start, end); resolve({ eventsList: results }); }); }) .catch(reject); }); }; // fetchInitialDataFromAerospike = async ( // client, // initialStartTime, // initialEndTime, // targetRecordCount, // groupExpFilter = 0 //if yes filter query only // ) => { // let records = []; // let startTime = initialStartTime; // let endTime = initialEndTime; // let attempts = 0; // Initialize attempts to track the number of loops // //loop count is making sure that we save or gather records at least minimum of targetRecordCount or say 10000 // while (records.length < targetRecordCount && attempts < 10) { // //console.log(`Attempt ${attempts + 1}: Fetching records between ${new Date(startTime).toISOString()} and ${new Date(endTime).toISOString()}`); // // Construct the query // let query = client.query( // process.env.CLUSTER_NAME, // process.env.SET_BLOCK_EVENT // ); // query.select(["id", "c_t"]); // query.where(Aerospike.filter.range("c_t", startTime, endTime)); // const queryPolicy = new Aerospike.QueryPolicy({}); // //if it's type is group then filter only group post // if (groupExpFilter) { // // queryPolicy.filterExpression = exp.eq( // // exp.binStr("isEvent"), // // exp.str("true") // // ); // } // // Execute the query and collect results // let batchRecords = []; // let stream = query.foreach(queryPolicy); // await new Promise((resolve, reject) => { // stream.on("data", (record) => { // batchRecords.push(record.bins); // }); // stream.on("end", () => { // records.push( // ...batchRecords.slice(0, targetRecordCount - records.length) // ); // // console.log( // // `Fetched ${batchRecords.length} records eventsssss this attempt, total accumulated: ${records.length}` // // ); // resolve(batchRecords); // }); // stream.on("error", (error) => { // console.error(`Error during data fetch: ${error}`); // reject(error); // }); // }); // if (records.length >= targetRecordCount) { // //console.log("Target record count reached, stopping fetch."); // break; // Exit if enough records are gathered // } // // Adjust time window for next fetch if necessary // endTime = startTime; // startTime -= initialEndTime - initialStartTime; // Extend the search interval backwards // //console.log(`Adjusting fetch interval for next attempt: new start time is ${new Date(startTime).toISOString()}`); // attempts++; // } // if (attempts >= 9) { // //console.log("Reached maximum attempts without gathering sufficient records."); // } // return records; // }; async fetchSingleEvent(id) { var client = await getAerospikeClient(); const event_key = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_BLOCK_EVENT, id ); const data = await client.get(event_key); try { return data.bins; } catch (error) { throw createError.BadRequest(err.message); } } async blockEventDelete(id) { //return new Promise(async (resolve, reject) => { // let query = client.query( // process.env.CLUSTER_NAME, // process.env.SET_BLOCK_EVENT // ); // const tempBin = "ExpVar"; // this bin is to hold expression read operation output // query.select(["delete"]); //select single bin // query.where(Aerospike.filter.equal("e_id", id)); // // const queryPolicy = new Aerospike.QueryPolicy({}); // const stream = query.foreach(); // stream.on("data", async function (record) { // console.log(">> ", record.bins); // var data = record.bins.delete; // const ops = [Aerospike.operations.write("delete", true)]; // await client.operate(event_key, ops); // }); // stream.on("end", async function (record) { // resolve({ msg: "Event has been delete" }); // }); // }); const client = await getAerospikeClient(); const event_key = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_BLOCK_EVENT, id ); const ops = [Aerospike.operations.write("delete", true)]; client.operate(event_key, ops, (err, result) => { if (err) { throw createError.BadRequest("Error"); } else { return "Event deleted"; } }); } async blockeditEvent(data) { try { const { title, description, terms, eventimage, link, price, id } = data; const client = await getAerospikeClient(); const key = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_BLOCK_EVENT, id ); const ops = [ Aerospike.operations.write("tit", title), Aerospike.operations.write("des", description), Aerospike.operations.write("tm", terms), Aerospike.operations.write("eventimage", eventimage), Aerospike.operations.write("link", link), Aerospike.operations.write("price", price), ]; await client.operate(key, ops); const { bins } = await client.get(key); console.log("done"); return bins; } catch (err) { console.log(err.message); throw createHttpError.BadRequest("ERROR"); } } async shareEventModel(data) { try { const { id } = data; const client = await getAerospikeClient(); const key = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_BLOCK_EVENT, id ); const ops = [Aerospike.operations.add("sc", 1)]; //share counter await client.operate(key, ops); return "shared"; } catch (err) { console.log(err.message); throw createHttpError.BadRequest("ERROR"); } } async blockEventJoin(user, eventData) { try { const client = await getAerospikeClient(); //destructure const { id, coupon, tax, netPayable } = eventData; const handleUn = user.handleUn; //BUY event if it's premium event.. const time = Date.now(); const user_meta = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_USER_META, handleUn ); const transactions_key = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_TRANSACTIONS, time ); const event_key = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_BLOCK_EVENT, id ); let batchArrKeys = []; if (netPayable > 0) { //if paid event then only insert to transactions table batchArrKeys = [user_meta, event_key]; } else { batchArrKeys = [user_meta, transactions_key, event_key]; } let batchPolicy = new Aerospike.BatchPolicy({}); let batchRecords = []; //fetch event details const getEventData = await client.get(event_key); const eDetails = getEventData.bins; const groupId = eDetails.g_id; const start_date = eDetails.s_d; const end_date = eDetails.e_d; // Calculate the difference in milliseconds and convert to days const duration = (new Date(end_date) - new Date(start_date)) / (1000 * 60 * 60 * 24); for (let i = 0; i < batchArrKeys.length; i++) { if (batchArrKeys[i].set === process.env.SET_USER_META) { batchRecords.push({ type: Aerospike.batchType.BATCH_WRITE, key: batchArrKeys[i], ops: [Aerospike.lists.append("joined_events", id)], }); } else if (batchArrKeys[i].set === process.env.SET_TRANSACTIONS) { batchRecords.push({ type: Aerospike.batchType.BATCH_WRITE, key: batchArrKeys[i], ops: [ AO.write("id", id), AO.write("validity", duration), AO.write("amount", netPayable), AO.write("start_date", start_date), AO.write("end_date", end_date), AO.write("handleUn", handleUn), AO.write("g_id", groupId), AO.write("coupon", coupon), AO.write("tax", tax), AO.write("type", "block_event"), AO.write("status", "completed"), ], }); } else if (batchArrKeys[i].set === process.env.SET_BLOCK_EVENT) { batchRecords.push({ type: Aerospike.batchType.BATCH_WRITE, key: batchArrKeys[i], ops: [ AO.incr("joined", 1), Aerospike.lists.append("j_u", handleUn), ], }); } } await client.batchWrite(batchRecords, batchPolicy); //send notification to event creator // && !eDetails.j_u.includes(handleUn) if (eDetails.e_c_dun !== handleUn && !eDetails.j_u.includes(handleUn)) { const notificationData = { id: id, ty: 23, vi: false, wo: user.handleUn, ti: Date.now(), nm: `${user.fn} ${user.ln}`, badge: user.badges[0], pi: user.p_i, cat: 3, re: eDetails.bins.e_c_dun, }; await sendNotificationMessage("post_notification", notificationData); return { msg: eDetails.j_u.includes(handleUn) ? "User removed from event." : "User joined in event.", notificationData, }; } else { return { msg: "User joined in event." }; } } catch (error) { throw createError.BadRequest(error.message); } } async applyEventCouponModel({ couponCode }) { try { const client = await getAerospikeClient(); // const handleUn = user.handleUn; // const time = Date.now(); const coupons_key = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_COUPON, couponCode ); const isValidCoupn = await client.exists(coupons_key); if (!isValidCoupn) { throw new Error("Invalid Coupon!!!"); } const { bins: { tp, dp }, } = await client.get(coupons_key); if (tp === "event") { //tp - type //dp - discount in percentage return { type: tp, dispcountInPercentage: dp, }; } else { throw new Error("Invalid Coupon!!!"); } } catch (error) { console.log(error.message); throw createError.BadRequest(error.message); } } async addEventInterest(id, user, value, array) { try { const client = await getAerospikeClient(); var batchRecords = []; const batchType = Aerospike.batchType; const event_key = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_BLOCK_EVENT, id ); const eventData = await client.get(event_key); var ops = eventData.bins.i_u.includes(user.handleUn) ? [ Aerospike.lists.removeByValue("i_u", user.handleUn), Aerospike.lists.removeByValue("n_i_u", user.handleUn), ] : [ Aerospike.lists.removeByValue("n_i_u", user.handleUn), Aerospike.lists.append("i_u", user.handleUn), ]; let batchPolicy1 = new Aerospike.BatchPolicy({}); var batchArrKeys = [event_key]; for (let i = 0; i < batchArrKeys.length; i++) { if (batchArrKeys[i].set === process.env.SET_BLOCK_EVENT) { batchRecords.push({ type: batchType.BATCH_WRITE, key: batchArrKeys[i], ops: ops, }); } } await client.batchWrite(batchRecords, batchPolicy1); if ( eventData.bins.e_c_dun !== user.handleUn && !eventData.bins.i_u.includes(user.handleUn) ) { const notificationData = { id: id, ty: 22, vi: false, wo: user.handleUn, ti: Date.now(), nm: `${user.fn} ${user.ln}`, badge: user.badges[0], pi: user.p_i, cat: 3, re: eventData.bins.e_c_dun, }; await sendNotificationMessage("post_notification", notificationData); return { msg: "Event interested added", notificationData }; } else { return { msg: "Event interested added" }; } } catch (error) { throw createError.BadRequest(error.message); } } async addEventNotInterest(id, user, value, array) { const client = await getAerospikeClient(); var batchRecords = []; const batchType = Aerospike.batchType; const event_key = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_BLOCK_EVENT, id ); const eventData = await client.get(event_key); var ops = eventData.bins.n_i_u.includes(user.handleUn) ? [Aerospike.lists.removeByValue("n_i_u", user.handleUn)] : [ Aerospike.lists.removeByValue("i_u", user.handleUn), Aerospike.lists.append("n_i_u", user.handleUn), ]; let batchPolicy1 = new Aerospike.BatchPolicy({}); var batchArrKeys = [event_key]; for (let i = 0; i < batchArrKeys.length; i++) { if (batchArrKeys[i].set === process.env.SET_BLOCK_EVENT) { batchRecords.push({ type: batchType.BATCH_WRITE, key: batchArrKeys[i], ops: ops, }); } } await client.batchWrite(batchRecords, batchPolicy1); try { return { msg: "Event not interested added" }; } catch (err) { throw createError.BadRequest(err.message); } } async addEventJoin(id, user, value, array) { try { const client = await getAerospikeClient(); var batchRecords = []; const batchType = Aerospike.batchType; const event_key = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_BLOCK_EVENT, id ); const eventData = await client.get(event_key); var ops = array.includes(user.handleUn) ? [ Aerospike.lists.removeByValue("i_u", user.handleUn), Aerospike.lists.removeByValue("n_i_u", user.handleUn), Aerospike.lists.removeByValue("j_u", user.handleUn), ] : [ Aerospike.lists.removeByValue("i_u", user.handleUn), Aerospike.lists.removeByValue("n_i_u", user.handleUn), Aerospike.lists.removeByValue("j_u", user.handleUn), Aerospike.lists.append("n_i_u", user.handleUn), ]; let batchPolicy1 = new Aerospike.BatchPolicy({}); var batchArrKeys = [event_key]; for (let i = 0; i < batchArrKeys.length; i++) { if (batchArrKeys[i].set === process.env.SET_BLOCK_EVENT) { batchRecords.push({ type: batchType.BATCH_WRITE, key: batchArrKeys[i], ops: ops, }); } } await client.batchWrite(batchRecords, batchPolicy1); if ( eventData.bins.e_c_dun !== user.handleUn && !eventData.bins.j_u.includes(j_u) ) { console.log("With notification"); const notificationData = { id: id, ty: 23, vi: false, wo: user.handleUn, ti: Date.now(), nm: `${user.fn} ${user.ln}`, badge: user.badges[0], pi: user.p_i, cat: 3, re: eventData.bins.e_c_dun, }; await sendNotificationMessage("post_notification", notificationData); return { msg: "Event join added", notificationData }; } else { return { msg: "Event join added" }; } } catch (err) { throw createError.BadRequest(err.message); } } async hideEvent(username, id) { console.log(username, id); return new Promise(async (resolve, reject) => { const client = await getAerospikeClient(); const key = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_BLOCK_EVENT, id ); const data = await client.get(key); console.log(">>", data); var ops; if (data.bins.hide.includes(username)) { ops = [Aerospike.lists.removeByValue("hide", username)]; } else { ops = [Aerospike.lists.append("hide", username)]; } client.operate(key, ops, async (err, result) => { if (err) { throw createError.BadRequest(err.message); } else { resolve({ msg: data.bins.hide.includes(username) ? "Unhide event" : "Hide event", }); } }); }); } async handleReportEvent(id, username, body) { console.log(username, id, body); return new Promise(async (resolve, reject) => { const client = await getAerospikeClient(); const key = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_BLOCK_EVENT, id ); const reportKey = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_REPORT, id ); const data = await client.get(key); var ops; const getData = await client.get(report_key); const temp = getData.bins[body.value.toString()]; ops = [ Aerospike.lists.append(body.value.toString(), { u_id: username, time: now.micro(), }), ]; await client.operate(reportKey, ops); const eventops = [Aerospike.lists.append("report", username)]; await client.operate(key, eventops); resolve({ msg: "You reported this event" }); }); } async createBlockEvent(file, body, user) { console.log("i reached create blockevent page blockEventModel page", body); var publicURL; if (file) { console.log("File >>", file); const result = await uploadImage(file); console.log(result); publicURL = result; } var batchRecords = []; const batchType = Aerospike.batchType; let batchPolicy1 = new Aerospike.BatchPolicy({}); const client = await getAerospikeClient(); const event_id = now.micro(); const result = new Date(body.startdate).getTime() - new Date().getTime(); var activity; if (result < 0) { activity = false; } else { activity = true; } const event_key = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_BLOCK_EVENT, event_id.toString() ); const user_key = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_USERS, user.handleUn ); // const post_key = new Aerospike.Key( // process.env.CLUSTER_NAME, // process.env.SET_GROUP_POSTS, // event_id.toString() // ); const group_meta_key = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_GROUP_META, body.blockId ); const report_key = new Aerospike.Key( process.env.CLUSTER_NAME, process.env.SET_REPORT, event_id.toString() ); var batchArrKeys = [event_key, group_meta_key, report_key, user_key]; const tagsArray = body?.tags?.split(",") || []; for (let i = 0; i < batchArrKeys.length; i++) { if (batchArrKeys[i].set === process.env.SET_BLOCK_EVENT) { batchRecords.push({ type: batchType.BATCH_WRITE, key: batchArrKeys[i], ops: [ Aerospike.operations.write("id", event_id), Aerospike.operations.write("tags", tagsArray), Aerospike.operations.write("tit", body.title || ""), Aerospike.operations.write("eventimage", publicURL || ""), Aerospike.operations.write("time", body.time || ""), Aerospike.operations.write("timezone", body.timezone || ""), Aerospike.operations.write("des", body.description || ""), Aerospike.operations.write("tm", body.terms || ""), //terms & condition Aerospike.operations.write("type", body.type || ""), Aerospike.operations.write("link", body.link || ""), Aerospike.operations.write("s_d", body.startdate || ""), Aerospike.operations.write("e_d", body.enddate || ""), Aerospike.operations.write("city", body.city || ""), Aerospike.operations.write("country", body.country || ""), Aerospike.operations.write("i_u", []), Aerospike.operations.write("n_i_u", []), //not_interested_user Aerospike.operations.write("j_u", []), //joined_user Aerospike.operations.write("price", Number(body.price) || ""), Aerospike.operations.write( "is_paid", Number(body.isPaid) > 0 ? true : false ), Aerospike.operations.write("active", activity), Aerospike.operations.write("e_c_img", user.p_i || ""), //event_creator Aerospike.operations.write("e_c_fn", user.fn || ""), Aerospike.operations.write("e_c_ln", user.ln || ""), Aerospike.operations.write("e_c_dun", user.handleUn || ""), Aerospike.operations.write("delete", false), Aerospike.operations.write("isEvent", true), Aerospike.operations.write("g_id", body.g_id || ""), Aerospike.operations.write("g_name", body.g_n || ""), Aerospike.operations.write("c_id", body.coin_id || ""), //coin id Aerospike.operations.write("pin", body.pin || ""), Aerospike.operations.write("slot", body.slotValue || ""), Aerospike.operations.write("joined", 0), Aerospike.operations.write("hide", []), Aerospike.operations.write("reports", []), Aerospike.operations.write("c_t", new Date().getTime()), Aerospike.operations.write("g_p_img", body.g_p_img), ], }); } else if (batchArrKeys[i].set === process.env.SET_GROUP_META) { batchRecords.push({ type: batchType.BATCH_WRITE, key: batchArrKeys[i], ops: [Aerospike.lists.append("events", event_id)], }); } else if (batchArrKeys[i].set === process.env.SET_REPORT) { batchRecords.push({ type: batchType.BATCH_WRITE, key: batchArrKeys[i], ops: [ Aerospike.operations.write("_id", event_id), Aerospike.operations.write("type", "social"), Aerospike.operations.write("cat", "block event"), Aerospike.operations.write("1", []), Aerospike.operations.write("2", []), Aerospike.operations.write("3", []), Aerospike.operations.write("4", []), Aerospike.operations.write("5", []), Aerospike.operations.write("6", []), Aerospike.operations.write("7", []), Aerospike.operations.write("8", []), Aerospike.operations.write("9", []), Aerospike.operations.write("10", []), Aerospike.operations.write("msg", []), ], }); } else if (batchArrKeys[i].set === process.env.SET_USERS) { batchRecords.push({ type: batchType.BATCH_WRITE, key: batchArrKeys[i], ops: [AO.incr("post_c", 1)], }); } } // console.log(batchRecords); await client.batchWrite(batchRecords, batchPolicy1); //update redis records to add new and remove last await redisHelper.addNewRecordAndRemoveOldest( ["eventsFeed:eventIds"], event_id, Date.now() ); const activityData = { link: event_id, c_type: 3, c_at: event_id, }; const data = await client.get(event_key); try { return { msg: "Event created", event: data.bins }; } catch (err) { throw createError.Conflict({ msg: err.message }); } } } module.exports = new BlockPostEventModel();
Editor is loading...
Leave a Comment