Untitled
unknown
typescript
10 months ago
14 kB
4
Indexable
import { UserActivityModel } from './../models/MongoDB/schemas/userActivityModel';
import Redis from "ioredis";
import WebSocket, { WebSocketServer } from 'ws';
import { decodeJWT } from "../helpers/jwt";
import { feedAndSellAnimalNew_socket, getDataPlantCareAllNew_V3_socket, updateFactoryStatus_socket, updateFarmAnimalStatus_socket } from "../services/farm.service";
import { getListOrder_socket, log_socket } from "../services/order.service";
import { checkAndProcessPendingRewards } from "../services/friendAndHelp.service";
import { v4 as uuidv4 } from 'uuid';
import { CustomDebugLoggerDev } from '../loggers/winston/debug.log';
let wss: WebSocketServer;
class UserActivityCache {
constructor(public userId: string, public lastActiveTime: number) { }
private sockets: Map<string, WebSocket> = new Map();
public addSocket(socket: WebSocket): void {
const socketId = uuidv4();
this.sockets.set(socketId, socket);
socket['socketId'] = socketId;
socket['requestId'] = null;
}
public removeSocket(socket: WebSocket): void {
const socketId = socket['socketId'];
if (socketId) {
this.sockets.delete(socketId);
}
}
public get activeSockets(): WebSocket[] {
return Array.from(this.sockets.values()).filter(socket => socket.readyState === WebSocket.OPEN);
}
public get socketIds(): string[] {
return Array.from(this.sockets.keys());
}
}
const userActivities: Map<string | number, UserActivityCache> = new Map();
let userSockets = new Map<string | number, WebSocket>();
export const initSocket = (serverApp) => {
const options = {
server: serverApp,
perMessageDeflate: false,
clientTracking: true,
maxPayload: 1024 * 1024 * 100,
pingInterval: 30000,
pingTimeout: 60000
};
wss = new WebSocketServer(options);
// const redisClient = new Redis({
// host: process.env.HOST_REDIS,
// port: parseInt(process.env.PORT_REDIS),
// password: process.env.PASS_REDIS,
// db: 4
// });
const pubClient = new Redis({
host: process.env.HOST_REDIS,
port: parseInt(process.env.PORT_REDIS),
password: process.env.PASS_REDIS,
db: 4
});
const subClient = pubClient.duplicate();
subClient.subscribe('globalSocketChannel');
subClient.on('message', (channel, message) => {
if (channel === 'globalSocketChannel') {
const parsedMessage = JSON.parse(message);
const { userId, event, data } = parsedMessage;
wss.clients.forEach((client) => {
if (client['userData'] && client['userData'].id === userId) {
client.send(JSON.stringify({ event, data }));
}
});
}
});
// Sử dụng Redis để lưu trữ và đồng bộ hóa trạng thái
const requestQueue = new Redis({
host: process.env.HOST_REDIS,
port: parseInt(process.env.PORT_REDIS),
password: process.env.PASS_REDIS,
db: 5
});
// Kiểm tra nội dung của hàng đợi
const requests = requestQueue.lrange('requests_queue', 0, -1);
// console.log('Current requests in queue:', requests);
wss.on('connection', async (ws, req) => {
try {
const token = req.url?.split('token=')[1];
if (!token) {
ws.close(4001, 'Unauthorized access');
return;
}
const decoded = decodeJWT(token) as any;
userSockets.set(Number(decoded.id), ws);
ws['userData'] = decoded;
updateUserLastActiveTime(ws['userData'].id);
if (!userActivities.has(ws['userData'].id)) {
loadUserActivityFromDB(ws['userData'].id).then((userData) => {
if (userData) {
userActivities.set(ws['userData'].id, new UserActivityCache(userData.userId, userData.lastActiveTime.getTime()));
} else {
userActivities.set(ws['userData'].id, new UserActivityCache(ws['userData'].id, Date.now()));
}
});
}
// Thêm socket vào UserActivityCache
const userActivity = getUserActivity(ws['userData'].id);
userActivity.addSocket(ws); // Thêm socket vào userActivity
console.log(`Socket added for userId ${ws['userData'].id}:`, ws['socketId']);
ws.on('message', async (message) => {
updateUserLastActiveTime(ws['userData'].id);
const data = JSON.parse(message.toString());
// // Kiểm tra nếu event là 'ping'
// if (data.event === 'ping') {
// ws.send(JSON.stringify({ event: 'pong' }));
// return; // Không đẩy vào hàng đợi Redis
// }
if (data.event === 'getDataPlantCareAllNew-v3' || data.event === 'getDataAnimalCareNew') {
const requestId = uuidv4();
ws['requestId'] = requestId;
console.log(`Sending request with requestId: ${requestId} for userId: ${ws['userData'].id}`);
await requestQueue.set(`request:${requestId}`, JSON.stringify({ id: requestId, event: data.event, payload: data.payload }));
await requestQueue.rpush('requests_queue', requestId);
// Xử lý yêu cầu
const requestQueueId = await requestQueue.llen('requests_queue');
CustomDebugLoggerDev.debug('requestQueueId:::::::::::::::::::::xxxxxxx::::xxxxx', requestQueueId);
if (requestQueueId === 1) {
processNextRequest(requestQueue, ws);
}
}
});
// Ping-Pong
ws.on('message', (message) => {
const data = JSON.parse(message.toString());
if (data.event === 'ping') {
ws.send(JSON.stringify({ event: 'pong' }));
}
});
ws.on('close', () => {
getUserActivity(ws['userData'].id).removeSocket(ws);
saveUserActivityToDB(ws['userData'].id);
userSockets.delete(Number(decoded.id));
const userId = ws['userData'].id;
const userActivity = getUserActivity(userId);
if (userActivity.activeSockets.length === 0) {
userActivities.delete(userId);
}
});
ws.on('error', (error) => {
console.error('WebSocket error:', error);
});
} catch (error) {
console.error('Token error:', error);
ws.close();
}
});
async function processNextRequest(requestQueue, ws) {
const requestId = await requestQueue.lindex('requests_queue', 0);
CustomDebugLoggerDev.debug('requestQueue.lpop(requests_queue):::::::::::::::::::::::::', requestId);
if (requestId) {
const request = await requestQueue.get(`request:${requestId}`);
CustomDebugLoggerDev.debug('request:::::::::::::::::::::::::', request);
if (request) {
const { id, event, payload } = JSON.parse(request);
const handler = getEventHandler(event);
try {
const result = await handler(ws['userData'], payload);
sendResponse(id, event, result);
// CustomDebugLoggerDev.debug('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:::::::::::::::::::::::::::::::::::::::::::::::::::::::', `Processed event: ${event} for requestId: ${id}`);
await requestQueue.del(`request:${requestId}`);
} catch (error) {
console.error(`Error processing event: ${event} for requestId: ${id}`, error);
sendError(id, event, error.message);
}
await requestQueue.lpop('requests_queue');
processNextRequest(requestQueue, ws);
} else {
console.log(`No request found for requestId: ${requestId}`);
}
}
}
function getEventHandler(event) {
switch (event) {
case 'getDataPlantCareAllNew-v3':
return getDataPlantCareAllNew_V3_socket;
case 'getDataAnimalCareNew':
return feedAndSellAnimalNew_socket;
case 'statusFactoryUpdate':
return updateFactoryStatus_socket;
case 'statusFarmAnimalUpdate':
return updateFarmAnimalStatus_socket;
case 'getListOrder':
return getListOrder_socket;
case 'log':
return log_socket;
default:
throw new Error(`Handler not found for event: ${event}`);
}
}
function sendResponse(requestId, event, result) {
const ws = getWebSocket(requestId);
if (ws) {
if (ws.readyState === WebSocket.OPEN) {
// console.log(`Sending response for requestId: ${requestId}, event: ${event}`);
// if (event === 'ping') {
// ws.send(JSON.stringify({ event: 'pong' }));
// } else {
ws.send(JSON.stringify({
event: `${event}-response`,
errorCode: result.errorCode || 0,
data: result.data || null,
message: result.message || '',
timestamp: result.timestamp || null
}));
// }
// Xóa event đã lưu trong hàng đợi Redis
const requestQueue = new Redis({
host: process.env.HOST_REDIS,
port: parseInt(process.env.PORT_REDIS),
password: process.env.PASS_REDIS,
db: 5
});
requestQueue.del(`request:${requestId}`); // Xóa yêu cầu khỏi hàng đợi
} else {
console.log('Socket is not open for requestId:', requestId);
}
} else {
console.log('No socket found for requestId:', requestId);
}
}
function sendError(requestId, event, errorMessage) {
const ws = getWebSocket(requestId);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ event: `${event}-error`, error: errorMessage }));
// ws.send(JSON.stringify({ id: requestId, event: `${event}-error`, error: errorMessage }));
}
}
function getWebSocket(requestId) {
// console.log('No socket found for userActivity:0000', Array.from(userActivities.entries()));
console.log('No socket found for userActivity:1111', Array.from(userActivities.entries()).map(([userId, activity]) => ({
userId,
lastActiveTime: activity.lastActiveTime,
sockets: activity.activeSockets
})));
// const ws = userSockets.get(Number(userId));
for (const [userId, userActivity] of Array.from(userActivities.entries())) {
for (const socket of userActivity.activeSockets) {
console.log('Checking socket:', socket['requestId']);
if (socket['requestId'] === requestId) {
return socket;
}
}
}
console.log('No socket found for requestId:', requestId);
return null;
}
return wss;
};
export const getSocket_BKKK123 = () => {
return {
emitToUser: (userId: string | number, event: string, data: any) => {
const userActivity = getUserActivity(userId);
if (userActivity) {
userActivity.activeSockets.forEach((ws) => {
ws.send(JSON.stringify({ event, data }));
});
}
}
};
};
export const getSocket = () => {
return {
emitToUser: (userId: string | number, event: string, data: any) => {
const ws = userSockets.get(Number(userId));
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ event, data }));
}
},
emitToAllUsers: (event: string, data: any) => {
userSockets.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ event, data }));
}
});
}
};
};
async function loadUserActivityFromDB(userId: string | number) {
return await UserActivityModel.findOne({ userId }).exec();
}
async function saveUserActivityToDB(userId: string | number) {
const userActivity = getUserActivity(userId);
await UserActivityModel.updateOne(
{ userId },
{
$set: {
lastActiveTime: new Date(userActivity.lastActiveTime),
socketIds: userActivity.socketIds
}
},
{ upsert: true }
);
}
function updateUserLastActiveTime(userId: string | number) {
const userActivity = getUserActivity(userId);
userActivity.lastActiveTime = Date.now();
}
function getUserActivity(userId: string | number): UserActivityCache {
let userActivity = userActivities.get(userId);
if (!userActivity) {
userActivity = new UserActivityCache(userId.toString(), Date.now());
userActivities.set(userId, userActivity);
}
return userActivity;
}
export const checkUserOnlineStatus = (userId: string | number): boolean => {
const userActivity = getUserActivity(userId);
const userSocket = userSockets.get(userId);
if (!userActivity || !userSocket) return false;
const lastActiveTimeMs = new Date(userActivity.lastActiveTime).getTime();
const currentTime = Date.now();
const inactiveThreshold = 60 * 1000; // 60 giây
return currentTime - lastActiveTimeMs < inactiveThreshold && userSocket.readyState === WebSocket.OPEN;
};
// function checkAllUsersOnlineStatus() {
// const currentTime = Date.now();
// const inactiveThreshold = 60 * 1000; // 60 giây
// userActivities.forEach((activity, userId) => {
// const lastActiveTimeMs = new Date(activity.lastActiveTime).getTime();
// if (currentTime - lastActiveTimeMs > inactiveThreshold) {
// // Xóa người dùng khỏi các map nếu họ không còn hoạt động
// userActivities.delete(userId);
// userSockets.delete(userId);
// console.log(`User ${userId} marked as offline`);
// }
// });
// setTimeout(checkAllUsersOnlineStatus, 60000);
// }
// checkAllUsersOnlineStatus();Editor is loading...
Leave a Comment