Untitled
unknown
plain_text
a year ago
28 kB
6
Indexable
require("dotenv").config();
const express = require("express");
const router = express.Router();
const Multer = require("multer");
const uuid = require("uuid");
const uuidv1 = uuid.v1;
const { Storage } = require("@google-cloud/storage");
const { default: mongoose } = require("mongoose");
const { config } = require("dotenv");
const sharp = require("sharp");
const Aerospike = require("aerospike");
const AO = Aerospike.operations;
const exp = Aerospike.exp;
const { getAerospikeClient } = require("../../../../databases/aerospike");
const maps = Aerospike.maps;
const { removeStopwords, eng, fra } = require("stopword");
const now = require("nano-time");
const { uploadImage } = require("../../../helper/uploadImage");
var createError = require("http-errors");
const batchType = Aerospike.batchType;
const { sendActivityMessage } = require("../../../../kafka/producerActi");
const { sendNotificationMessage } = require("../../../../kafka/producerNoti");
const redisHelper = require("../../../helper/redisHelper");
//aerospike helper
const AsHelper = require("../../../helper/aerospikeQueriesHelper");
const createHttpError = require("http-errors");
class BlockPostEventModel {
// async fetchAllEvents(id, page, limit) {
// return new Promise(async (resolve, reject) => {
// const client = await getAerospikeClient();
// let query = client.query(
// process.env.CLUSTER_NAME,
// process.env.SET_BLOCK_EVENT
// );
// const tempBin = "ExpVar";
// query.where(Aerospike.filter.equal("e_id", id));
// // const queryPolicy = new Aerospike.QueryPolicy({});
// const stream = query.foreach();
// var temp = [];
// stream.on("data", async function (record) {
// temp.push(record.bins);
// });
// stream.on("end", async function (record) {
// // var page = req.query.page || 1;
// // var limit = req.query.limit || 3;
// var start = (page - 1) * limit;
// var end = page * limit;
// var count = 0;
// temp = temp.splice(start, end);
// resolve(temp);
// });
// });
// }
fetchAllEvents = async (id, qPage, qLimit) => {
const page = Number(qPage) || 0;
const limit = Number(qLimit) || 10;
const start = page * limit;
const end = start + limit;
return new Promise((resolve, reject) => {
getAerospikeClient()
.then((client) => {
const query = client.query(
process.env.CLUSTER_NAME,
process.env.SET_BLOCK_EVENT
);
query.where(Aerospike.filter.equal("g_id", id));
const stream = query.foreach();
const arr = [];
stream.on("data", (data) => {
const now = new Date().getTime();
const difference = new Date(data.bins.e_d).getTime() - now;
if (difference < 0) return;
arr.push(data.bins);
});
stream.on("error", reject);
stream.on("end", () => {
const sorted = arr.sort((a, b) => a.c_t - b.c_t);
const results = sorted.slice(start, end);
resolve({ eventsList: results });
});
})
.catch(reject);
});
};
// fetchInitialDataFromAerospike = async (
// client,
// initialStartTime,
// initialEndTime,
// targetRecordCount,
// groupExpFilter = 0 //if yes filter query only
// ) => {
// let records = [];
// let startTime = initialStartTime;
// let endTime = initialEndTime;
// let attempts = 0; // Initialize attempts to track the number of loops
// //loop count is making sure that we save or gather records at least minimum of targetRecordCount or say 10000
// while (records.length < targetRecordCount && attempts < 10) {
// //console.log(`Attempt ${attempts + 1}: Fetching records between ${new Date(startTime).toISOString()} and ${new Date(endTime).toISOString()}`);
// // Construct the query
// let query = client.query(
// process.env.CLUSTER_NAME,
// process.env.SET_BLOCK_EVENT
// );
// query.select(["id", "c_t"]);
// query.where(Aerospike.filter.range("c_t", startTime, endTime));
// const queryPolicy = new Aerospike.QueryPolicy({});
// //if it's type is group then filter only group post
// if (groupExpFilter) {
// // queryPolicy.filterExpression = exp.eq(
// // exp.binStr("isEvent"),
// // exp.str("true")
// // );
// }
// // Execute the query and collect results
// let batchRecords = [];
// let stream = query.foreach(queryPolicy);
// await new Promise((resolve, reject) => {
// stream.on("data", (record) => {
// batchRecords.push(record.bins);
// });
// stream.on("end", () => {
// records.push(
// ...batchRecords.slice(0, targetRecordCount - records.length)
// );
// // console.log(
// // `Fetched ${batchRecords.length} records eventsssss this attempt, total accumulated: ${records.length}`
// // );
// resolve(batchRecords);
// });
// stream.on("error", (error) => {
// console.error(`Error during data fetch: ${error}`);
// reject(error);
// });
// });
// if (records.length >= targetRecordCount) {
// //console.log("Target record count reached, stopping fetch.");
// break; // Exit if enough records are gathered
// }
// // Adjust time window for next fetch if necessary
// endTime = startTime;
// startTime -= initialEndTime - initialStartTime; // Extend the search interval backwards
// //console.log(`Adjusting fetch interval for next attempt: new start time is ${new Date(startTime).toISOString()}`);
// attempts++;
// }
// if (attempts >= 9) {
// //console.log("Reached maximum attempts without gathering sufficient records.");
// }
// return records;
// };
async fetchSingleEvent(id) {
var client = await getAerospikeClient();
const event_key = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_BLOCK_EVENT,
id
);
const data = await client.get(event_key);
try {
return data.bins;
} catch (error) {
throw createError.BadRequest(err.message);
}
}
async blockEventDelete(id) {
//return new Promise(async (resolve, reject) => {
// let query = client.query(
// process.env.CLUSTER_NAME,
// process.env.SET_BLOCK_EVENT
// );
// const tempBin = "ExpVar"; // this bin is to hold expression read operation output
// query.select(["delete"]); //select single bin
// query.where(Aerospike.filter.equal("e_id", id));
// // const queryPolicy = new Aerospike.QueryPolicy({});
// const stream = query.foreach();
// stream.on("data", async function (record) {
// console.log(">> ", record.bins);
// var data = record.bins.delete;
// const ops = [Aerospike.operations.write("delete", true)];
// await client.operate(event_key, ops);
// });
// stream.on("end", async function (record) {
// resolve({ msg: "Event has been delete" });
// });
// });
const client = await getAerospikeClient();
const event_key = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_BLOCK_EVENT,
id
);
const ops = [Aerospike.operations.write("delete", true)];
client.operate(event_key, ops, (err, result) => {
if (err) {
throw createError.BadRequest("Error");
} else {
return "Event deleted";
}
});
}
async blockeditEvent(data) {
try {
const { title, description, terms, eventimage, link, price, id } = data;
const client = await getAerospikeClient();
const key = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_BLOCK_EVENT,
id
);
const ops = [
Aerospike.operations.write("tit", title),
Aerospike.operations.write("des", description),
Aerospike.operations.write("tm", terms),
Aerospike.operations.write("eventimage", eventimage),
Aerospike.operations.write("link", link),
Aerospike.operations.write("price", price),
];
await client.operate(key, ops);
const { bins } = await client.get(key);
console.log("done");
return bins;
} catch (err) {
console.log(err.message);
throw createHttpError.BadRequest("ERROR");
}
}
async shareEventModel(data) {
try {
const { id } = data;
const client = await getAerospikeClient();
const key = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_BLOCK_EVENT,
id
);
const ops = [Aerospike.operations.add("sc", 1)]; //share counter
await client.operate(key, ops);
return "shared";
} catch (err) {
console.log(err.message);
throw createHttpError.BadRequest("ERROR");
}
}
async blockEventJoin(user, eventData) {
try {
const client = await getAerospikeClient();
//destructure
const { id, coupon, tax, netPayable } = eventData;
const handleUn = user.handleUn;
//BUY event if it's premium event..
const time = Date.now();
const user_meta = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_USER_META,
handleUn
);
const transactions_key = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_TRANSACTIONS,
time
);
const event_key = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_BLOCK_EVENT,
id
);
let batchArrKeys = [];
if (netPayable > 0) {
//if paid event then only insert to transactions table
batchArrKeys = [user_meta, event_key];
} else {
batchArrKeys = [user_meta, transactions_key, event_key];
}
let batchPolicy = new Aerospike.BatchPolicy({});
let batchRecords = [];
//fetch event details
const getEventData = await client.get(event_key);
const eDetails = getEventData.bins;
const groupId = eDetails.g_id;
const start_date = eDetails.s_d;
const end_date = eDetails.e_d;
// Calculate the difference in milliseconds and convert to days
const duration =
(new Date(end_date) - new Date(start_date)) / (1000 * 60 * 60 * 24);
for (let i = 0; i < batchArrKeys.length; i++) {
if (batchArrKeys[i].set === process.env.SET_USER_META) {
batchRecords.push({
type: Aerospike.batchType.BATCH_WRITE,
key: batchArrKeys[i],
ops: [Aerospike.lists.append("joined_events", id)],
});
} else if (batchArrKeys[i].set === process.env.SET_TRANSACTIONS) {
batchRecords.push({
type: Aerospike.batchType.BATCH_WRITE,
key: batchArrKeys[i],
ops: [
AO.write("id", id),
AO.write("validity", duration),
AO.write("amount", netPayable),
AO.write("start_date", start_date),
AO.write("end_date", end_date),
AO.write("handleUn", handleUn),
AO.write("g_id", groupId),
AO.write("coupon", coupon),
AO.write("tax", tax),
AO.write("type", "block_event"),
AO.write("status", "completed"),
],
});
} else if (batchArrKeys[i].set === process.env.SET_BLOCK_EVENT) {
batchRecords.push({
type: Aerospike.batchType.BATCH_WRITE,
key: batchArrKeys[i],
ops: [
AO.incr("joined", 1),
Aerospike.lists.append("j_u", handleUn),
],
});
}
}
await client.batchWrite(batchRecords, batchPolicy);
//send notification to event creator
// && !eDetails.j_u.includes(handleUn)
if (eDetails.e_c_dun !== handleUn && !eDetails.j_u.includes(handleUn)) {
const notificationData = {
id: id,
ty: 23,
vi: false,
wo: user.handleUn,
ti: Date.now(),
nm: `${user.fn} ${user.ln}`,
badge: user.badges[0],
pi: user.p_i,
cat: 3,
re: eDetails.bins.e_c_dun,
};
await sendNotificationMessage("post_notification", notificationData);
return {
msg: eDetails.j_u.includes(handleUn)
? "User removed from event."
: "User joined in event.",
notificationData,
};
} else {
return { msg: "User joined in event." };
}
} catch (error) {
throw createError.BadRequest(error.message);
}
}
async applyEventCouponModel({ couponCode }) {
try {
const client = await getAerospikeClient();
// const handleUn = user.handleUn;
// const time = Date.now();
const coupons_key = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_COUPON,
couponCode
);
const isValidCoupn = await client.exists(coupons_key);
if (!isValidCoupn) {
throw new Error("Invalid Coupon!!!");
}
const {
bins: { tp, dp },
} = await client.get(coupons_key);
if (tp === "event") {
//tp - type
//dp - discount in percentage
return {
type: tp,
dispcountInPercentage: dp,
};
} else {
throw new Error("Invalid Coupon!!!");
}
} catch (error) {
console.log(error.message);
throw createError.BadRequest(error.message);
}
}
async addEventInterest(id, user, value, array) {
try {
const client = await getAerospikeClient();
var batchRecords = [];
const batchType = Aerospike.batchType;
const event_key = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_BLOCK_EVENT,
id
);
const eventData = await client.get(event_key);
var ops = eventData.bins.i_u.includes(user.handleUn)
? [
Aerospike.lists.removeByValue("i_u", user.handleUn),
Aerospike.lists.removeByValue("n_i_u", user.handleUn),
]
: [
Aerospike.lists.removeByValue("n_i_u", user.handleUn),
Aerospike.lists.append("i_u", user.handleUn),
];
let batchPolicy1 = new Aerospike.BatchPolicy({});
var batchArrKeys = [event_key];
for (let i = 0; i < batchArrKeys.length; i++) {
if (batchArrKeys[i].set === process.env.SET_BLOCK_EVENT) {
batchRecords.push({
type: batchType.BATCH_WRITE,
key: batchArrKeys[i],
ops: ops,
});
}
}
await client.batchWrite(batchRecords, batchPolicy1);
if (
eventData.bins.e_c_dun !== user.handleUn &&
!eventData.bins.i_u.includes(user.handleUn)
) {
const notificationData = {
id: id,
ty: 22,
vi: false,
wo: user.handleUn,
ti: Date.now(),
nm: `${user.fn} ${user.ln}`,
badge: user.badges[0],
pi: user.p_i,
cat: 3,
re: eventData.bins.e_c_dun,
};
await sendNotificationMessage("post_notification", notificationData);
return { msg: "Event interested added", notificationData };
} else {
return { msg: "Event interested added" };
}
} catch (error) {
throw createError.BadRequest(error.message);
}
}
async addEventNotInterest(id, user, value, array) {
const client = await getAerospikeClient();
var batchRecords = [];
const batchType = Aerospike.batchType;
const event_key = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_BLOCK_EVENT,
id
);
const eventData = await client.get(event_key);
var ops = eventData.bins.n_i_u.includes(user.handleUn)
? [Aerospike.lists.removeByValue("n_i_u", user.handleUn)]
: [
Aerospike.lists.removeByValue("i_u", user.handleUn),
Aerospike.lists.append("n_i_u", user.handleUn),
];
let batchPolicy1 = new Aerospike.BatchPolicy({});
var batchArrKeys = [event_key];
for (let i = 0; i < batchArrKeys.length; i++) {
if (batchArrKeys[i].set === process.env.SET_BLOCK_EVENT) {
batchRecords.push({
type: batchType.BATCH_WRITE,
key: batchArrKeys[i],
ops: ops,
});
}
}
await client.batchWrite(batchRecords, batchPolicy1);
try {
return { msg: "Event not interested added" };
} catch (err) {
throw createError.BadRequest(err.message);
}
}
async addEventJoin(id, user, value, array) {
try {
const client = await getAerospikeClient();
var batchRecords = [];
const batchType = Aerospike.batchType;
const event_key = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_BLOCK_EVENT,
id
);
const eventData = await client.get(event_key);
var ops = array.includes(user.handleUn)
? [
Aerospike.lists.removeByValue("i_u", user.handleUn),
Aerospike.lists.removeByValue("n_i_u", user.handleUn),
Aerospike.lists.removeByValue("j_u", user.handleUn),
]
: [
Aerospike.lists.removeByValue("i_u", user.handleUn),
Aerospike.lists.removeByValue("n_i_u", user.handleUn),
Aerospike.lists.removeByValue("j_u", user.handleUn),
Aerospike.lists.append("n_i_u", user.handleUn),
];
let batchPolicy1 = new Aerospike.BatchPolicy({});
var batchArrKeys = [event_key];
for (let i = 0; i < batchArrKeys.length; i++) {
if (batchArrKeys[i].set === process.env.SET_BLOCK_EVENT) {
batchRecords.push({
type: batchType.BATCH_WRITE,
key: batchArrKeys[i],
ops: ops,
});
}
}
await client.batchWrite(batchRecords, batchPolicy1);
if (
eventData.bins.e_c_dun !== user.handleUn &&
!eventData.bins.j_u.includes(j_u)
) {
console.log("With notification");
const notificationData = {
id: id,
ty: 23,
vi: false,
wo: user.handleUn,
ti: Date.now(),
nm: `${user.fn} ${user.ln}`,
badge: user.badges[0],
pi: user.p_i,
cat: 3,
re: eventData.bins.e_c_dun,
};
await sendNotificationMessage("post_notification", notificationData);
return { msg: "Event join added", notificationData };
} else {
return { msg: "Event join added" };
}
} catch (err) {
throw createError.BadRequest(err.message);
}
}
async hideEvent(username, id) {
console.log(username, id);
return new Promise(async (resolve, reject) => {
const client = await getAerospikeClient();
const key = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_BLOCK_EVENT,
id
);
const data = await client.get(key);
console.log(">>", data);
var ops;
if (data.bins.hide.includes(username)) {
ops = [Aerospike.lists.removeByValue("hide", username)];
} else {
ops = [Aerospike.lists.append("hide", username)];
}
client.operate(key, ops, async (err, result) => {
if (err) {
throw createError.BadRequest(err.message);
} else {
resolve({
msg: data.bins.hide.includes(username)
? "Unhide event"
: "Hide event",
});
}
});
});
}
async handleReportEvent(id, username, body) {
console.log(username, id, body);
return new Promise(async (resolve, reject) => {
const client = await getAerospikeClient();
const key = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_BLOCK_EVENT,
id
);
const reportKey = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_REPORT,
id
);
const data = await client.get(key);
var ops;
const getData = await client.get(report_key);
const temp = getData.bins[body.value.toString()];
ops = [
Aerospike.lists.append(body.value.toString(), {
u_id: username,
time: now.micro(),
}),
];
await client.operate(reportKey, ops);
const eventops = [Aerospike.lists.append("report", username)];
await client.operate(key, eventops);
resolve({ msg: "You reported this event" });
});
}
async createBlockEvent(file, body, user) {
console.log("i reached create blockevent page blockEventModel page", body);
var publicURL;
if (file) {
console.log("File >>", file);
const result = await uploadImage(file);
console.log(result);
publicURL = result;
}
var batchRecords = [];
const batchType = Aerospike.batchType;
let batchPolicy1 = new Aerospike.BatchPolicy({});
const client = await getAerospikeClient();
const event_id = now.micro();
const result = new Date(body.startdate).getTime() - new Date().getTime();
var activity;
if (result < 0) {
activity = false;
} else {
activity = true;
}
const event_key = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_BLOCK_EVENT,
event_id.toString()
);
const user_key = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_USERS,
user.handleUn
);
// const post_key = new Aerospike.Key(
// process.env.CLUSTER_NAME,
// process.env.SET_GROUP_POSTS,
// event_id.toString()
// );
const group_meta_key = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_GROUP_META,
body.blockId
);
const report_key = new Aerospike.Key(
process.env.CLUSTER_NAME,
process.env.SET_REPORT,
event_id.toString()
);
var batchArrKeys = [event_key, group_meta_key, report_key, user_key];
const tagsArray = body?.tags?.split(",") || [];
for (let i = 0; i < batchArrKeys.length; i++) {
if (batchArrKeys[i].set === process.env.SET_BLOCK_EVENT) {
batchRecords.push({
type: batchType.BATCH_WRITE,
key: batchArrKeys[i],
ops: [
Aerospike.operations.write("id", event_id),
Aerospike.operations.write("tags", tagsArray),
Aerospike.operations.write("tit", body.title || ""),
Aerospike.operations.write("eventimage", publicURL || ""),
Aerospike.operations.write("time", body.time || ""),
Aerospike.operations.write("timezone", body.timezone || ""),
Aerospike.operations.write("des", body.description || ""),
Aerospike.operations.write("tm", body.terms || ""), //terms & condition
Aerospike.operations.write("type", body.type || ""),
Aerospike.operations.write("link", body.link || ""),
Aerospike.operations.write("s_d", body.startdate || ""),
Aerospike.operations.write("e_d", body.enddate || ""),
Aerospike.operations.write("city", body.city || ""),
Aerospike.operations.write("country", body.country || ""),
Aerospike.operations.write("i_u", []),
Aerospike.operations.write("n_i_u", []), //not_interested_user
Aerospike.operations.write("j_u", []), //joined_user
Aerospike.operations.write("price", Number(body.price) || ""),
Aerospike.operations.write(
"is_paid",
Number(body.isPaid) > 0 ? true : false
),
Aerospike.operations.write("active", activity),
Aerospike.operations.write("e_c_img", user.p_i || ""), //event_creator
Aerospike.operations.write("e_c_fn", user.fn || ""),
Aerospike.operations.write("e_c_ln", user.ln || ""),
Aerospike.operations.write("e_c_dun", user.handleUn || ""),
Aerospike.operations.write("delete", false),
Aerospike.operations.write("isEvent", true),
Aerospike.operations.write("g_id", body.g_id || ""),
Aerospike.operations.write("g_name", body.g_n || ""),
Aerospike.operations.write("c_id", body.coin_id || ""), //coin id
Aerospike.operations.write("pin", body.pin || ""),
Aerospike.operations.write("slot", body.slotValue || ""),
Aerospike.operations.write("joined", 0),
Aerospike.operations.write("hide", []),
Aerospike.operations.write("reports", []),
Aerospike.operations.write("c_t", new Date().getTime()),
Aerospike.operations.write("g_p_img", body.g_p_img),
],
});
} else if (batchArrKeys[i].set === process.env.SET_GROUP_META) {
batchRecords.push({
type: batchType.BATCH_WRITE,
key: batchArrKeys[i],
ops: [Aerospike.lists.append("events", event_id)],
});
} else if (batchArrKeys[i].set === process.env.SET_REPORT) {
batchRecords.push({
type: batchType.BATCH_WRITE,
key: batchArrKeys[i],
ops: [
Aerospike.operations.write("_id", event_id),
Aerospike.operations.write("type", "social"),
Aerospike.operations.write("cat", "block event"),
Aerospike.operations.write("1", []),
Aerospike.operations.write("2", []),
Aerospike.operations.write("3", []),
Aerospike.operations.write("4", []),
Aerospike.operations.write("5", []),
Aerospike.operations.write("6", []),
Aerospike.operations.write("7", []),
Aerospike.operations.write("8", []),
Aerospike.operations.write("9", []),
Aerospike.operations.write("10", []),
Aerospike.operations.write("msg", []),
],
});
} else if (batchArrKeys[i].set === process.env.SET_USERS) {
batchRecords.push({
type: batchType.BATCH_WRITE,
key: batchArrKeys[i],
ops: [AO.incr("post_c", 1)],
});
}
}
// console.log(batchRecords);
await client.batchWrite(batchRecords, batchPolicy1);
//update redis records to add new and remove last
await redisHelper.addNewRecordAndRemoveOldest(
["eventsFeed:eventIds"],
event_id,
Date.now()
);
const activityData = {
link: event_id,
c_type: 3,
c_at: event_id,
};
const data = await client.get(event_key);
try {
return { msg: "Event created", event: data.bins };
} catch (err) {
throw createError.Conflict({ msg: err.message });
}
}
}
module.exports = new BlockPostEventModel();
Editor is loading...
Leave a Comment