Untitled
import { UserActivityModel } from './../models/MongoDB/schemas/userActivityModel'; import Redis from "ioredis"; import WebSocket, { WebSocketServer } from 'ws'; import { decodeJWT } from "../helpers/jwt"; import { feedAndSellAnimalNew_socket, getDataPlantCareAllNew_V3_socket, updateFactoryStatus_socket, updateFarmAnimalStatus_socket } from "../services/farm.service"; import { getListOrder_socket, log_socket } from "../services/order.service"; import { checkAndProcessPendingRewards } from "../services/friendAndHelp.service"; import { v4 as uuidv4 } from 'uuid'; import { CustomDebugLoggerDev } from '../loggers/winston/debug.log'; let wss: WebSocketServer; class UserActivityCache { constructor(public userId: string, public lastActiveTime: number) { } private sockets: Map<string, WebSocket> = new Map(); public addSocket(socket: WebSocket): void { const socketId = uuidv4(); this.sockets.set(socketId, socket); socket['socketId'] = socketId; socket['requestId'] = null; } public removeSocket(socket: WebSocket): void { const socketId = socket['socketId']; if (socketId) { this.sockets.delete(socketId); } } public get activeSockets(): WebSocket[] { return Array.from(this.sockets.values()).filter(socket => socket.readyState === WebSocket.OPEN); } public get socketIds(): string[] { return Array.from(this.sockets.keys()); } } const userActivities: Map<string | number, UserActivityCache> = new Map(); let userSockets = new Map<string | number, WebSocket>(); export const initSocket = (serverApp) => { const options = { server: serverApp, perMessageDeflate: false, clientTracking: true, maxPayload: 1024 * 1024 * 100, pingInterval: 30000, pingTimeout: 60000 }; wss = new WebSocketServer(options); // const redisClient = new Redis({ // host: process.env.HOST_REDIS, // port: parseInt(process.env.PORT_REDIS), // password: process.env.PASS_REDIS, // db: 4 // }); const pubClient = new Redis({ host: process.env.HOST_REDIS, port: parseInt(process.env.PORT_REDIS), password: process.env.PASS_REDIS, db: 4 }); const subClient = pubClient.duplicate(); subClient.subscribe('globalSocketChannel'); subClient.on('message', (channel, message) => { if (channel === 'globalSocketChannel') { const parsedMessage = JSON.parse(message); const { userId, event, data } = parsedMessage; wss.clients.forEach((client) => { if (client['userData'] && client['userData'].id === userId) { client.send(JSON.stringify({ event, data })); } }); } }); // Sử dụng Redis để lưu trữ và đồng bộ hóa trạng thái const requestQueue = new Redis({ host: process.env.HOST_REDIS, port: parseInt(process.env.PORT_REDIS), password: process.env.PASS_REDIS, db: 5 }); // Kiểm tra nội dung của hàng đợi const requests = requestQueue.lrange('requests_queue', 0, -1); // console.log('Current requests in queue:', requests); wss.on('connection', async (ws, req) => { try { const token = req.url?.split('token=')[1]; if (!token) { ws.close(4001, 'Unauthorized access'); return; } const decoded = decodeJWT(token) as any; userSockets.set(Number(decoded.id), ws); ws['userData'] = decoded; updateUserLastActiveTime(ws['userData'].id); if (!userActivities.has(ws['userData'].id)) { loadUserActivityFromDB(ws['userData'].id).then((userData) => { if (userData) { userActivities.set(ws['userData'].id, new UserActivityCache(userData.userId, userData.lastActiveTime.getTime())); } else { userActivities.set(ws['userData'].id, new UserActivityCache(ws['userData'].id, Date.now())); } }); } // Thêm socket vào UserActivityCache const userActivity = getUserActivity(ws['userData'].id); userActivity.addSocket(ws); // Thêm socket vào userActivity console.log(`Socket added for userId ${ws['userData'].id}:`, ws['socketId']); ws.on('message', async (message) => { updateUserLastActiveTime(ws['userData'].id); const data = JSON.parse(message.toString()); // // Kiểm tra nếu event là 'ping' // if (data.event === 'ping') { // ws.send(JSON.stringify({ event: 'pong' })); // return; // Không đẩy vào hàng đợi Redis // } if (data.event === 'getDataPlantCareAllNew-v3' || data.event === 'getDataAnimalCareNew') { const requestId = uuidv4(); ws['requestId'] = requestId; console.log(`Sending request with requestId: ${requestId} for userId: ${ws['userData'].id}`); await requestQueue.set(`request:${requestId}`, JSON.stringify({ id: requestId, event: data.event, payload: data.payload })); await requestQueue.rpush('requests_queue', requestId); // Xử lý yêu cầu processNextRequest(requestQueue, ws); } }); // Ping-Pong ws.on('message', (message) => { const data = JSON.parse(message.toString()); if (data.event === 'ping') { ws.send(JSON.stringify({ event: 'pong' })); } }); ws.on('close', () => { getUserActivity(ws['userData'].id).removeSocket(ws); saveUserActivityToDB(ws['userData'].id); userSockets.delete(Number(decoded.id)); const userId = ws['userData'].id; const userActivity = getUserActivity(userId); if (userActivity.activeSockets.length === 0) { userActivities.delete(userId); } }); ws.on('error', (error) => { console.error('WebSocket error:', error); }); } catch (error) { console.error('Token error:', error); ws.close(); } }); async function processNextRequest(requestQueue, ws) { const requestId = await requestQueue.lpop('requests_queue'); CustomDebugLoggerDev.debug('requestQueue.lpop(requests_queue):::::::::::::::::::::::::', requestId); if (requestId) { const request = await requestQueue.get(`request:${requestId}`); CustomDebugLoggerDev.debug('request:::::::::::::::::::::::::', request); if (request) { const { id, event, payload } = JSON.parse(request); const handler = getEventHandler(event); try { const result = await handler(ws['userData'], payload); sendResponse(id, event, result); // CustomDebugLoggerDev.debug('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:::::::::::::::::::::::::::::::::::::::::::::::::::::::', `Processed event: ${event} for requestId: ${id}`); await requestQueue.del(`request:${requestId}`); } catch (error) { console.error(`Error processing event: ${event} for requestId: ${id}`, error); sendError(id, event, error.message); processNextRequest(requestQueue, ws); } } else { console.log(`No request found for requestId: ${requestId}`); } } } function getEventHandler(event) { switch (event) { case 'getDataPlantCareAllNew-v3': return getDataPlantCareAllNew_V3_socket; case 'getDataAnimalCareNew': return feedAndSellAnimalNew_socket; case 'statusFactoryUpdate': return updateFactoryStatus_socket; case 'statusFarmAnimalUpdate': return updateFarmAnimalStatus_socket; case 'getListOrder': return getListOrder_socket; case 'log': return log_socket; default: throw new Error(`Handler not found for event: ${event}`); } } function sendResponse(requestId, event, result) { const ws = getWebSocket(requestId); if (ws) { if (ws.readyState === WebSocket.OPEN) { // console.log(`Sending response for requestId: ${requestId}, event: ${event}`); // if (event === 'ping') { // ws.send(JSON.stringify({ event: 'pong' })); // } else { ws.send(JSON.stringify({ event: `${event}-response`, errorCode: result.errorCode || 0, data: result.data || null, message: result.message || '', timestamp: result.timestamp || null })); // } // Xóa event đã lưu trong hàng đợi Redis const requestQueue = new Redis({ host: process.env.HOST_REDIS, port: parseInt(process.env.PORT_REDIS), password: process.env.PASS_REDIS, db: 5 }); requestQueue.del(`request:${requestId}`); // Xóa yêu cầu khỏi hàng đợi } else { console.log('Socket is not open for requestId:', requestId); } } else { console.log('No socket found for requestId:', requestId); } } function sendError(requestId, event, errorMessage) { const ws = getWebSocket(requestId); if (ws && ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ event: `${event}-error`, error: errorMessage })); // ws.send(JSON.stringify({ id: requestId, event: `${event}-error`, error: errorMessage })); } } function getWebSocket(requestId) { // console.log('No socket found for userActivity:0000', Array.from(userActivities.entries())); console.log('No socket found for userActivity:1111', Array.from(userActivities.entries()).map(([userId, activity]) => ({ userId, lastActiveTime: activity.lastActiveTime, sockets: activity.activeSockets }))); // const ws = userSockets.get(Number(userId)); for (const [userId, userActivity] of Array.from(userActivities.entries())) { for (const socket of userActivity.activeSockets) { console.log('Checking socket:', socket['requestId']); if (socket['requestId'] === requestId) { return socket; } } } console.log('No socket found for requestId:', requestId); return null; } return wss; }; export const getSocket_BKKK123 = () => { return { emitToUser: (userId: string | number, event: string, data: any) => { const userActivity = getUserActivity(userId); if (userActivity) { userActivity.activeSockets.forEach((ws) => { ws.send(JSON.stringify({ event, data })); }); } } }; }; export const getSocket = () => { return { emitToUser: (userId: string | number, event: string, data: any) => { const ws = userSockets.get(Number(userId)); if (ws && ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ event, data })); } }, emitToAllUsers: (event: string, data: any) => { userSockets.forEach((ws) => { if (ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ event, data })); } }); } }; }; async function loadUserActivityFromDB(userId: string | number) { return await UserActivityModel.findOne({ userId }).exec(); } async function saveUserActivityToDB(userId: string | number) { const userActivity = getUserActivity(userId); await UserActivityModel.updateOne( { userId }, { $set: { lastActiveTime: new Date(userActivity.lastActiveTime), socketIds: userActivity.socketIds } }, { upsert: true } ); } function updateUserLastActiveTime(userId: string | number) { const userActivity = getUserActivity(userId); userActivity.lastActiveTime = Date.now(); } function getUserActivity(userId: string | number): UserActivityCache { let userActivity = userActivities.get(userId); if (!userActivity) { userActivity = new UserActivityCache(userId.toString(), Date.now()); userActivities.set(userId, userActivity); } return userActivity; } export const checkUserOnlineStatus = (userId: string | number): boolean => { const userActivity = getUserActivity(userId); const userSocket = userSockets.get(userId); if (!userActivity || !userSocket) return false; const lastActiveTimeMs = new Date(userActivity.lastActiveTime).getTime(); const currentTime = Date.now(); const inactiveThreshold = 60 * 1000; // 60 giây return currentTime - lastActiveTimeMs < inactiveThreshold && userSocket.readyState === WebSocket.OPEN; }; // function checkAllUsersOnlineStatus() { // const currentTime = Date.now(); // const inactiveThreshold = 60 * 1000; // 60 giây // userActivities.forEach((activity, userId) => { // const lastActiveTimeMs = new Date(activity.lastActiveTime).getTime(); // if (currentTime - lastActiveTimeMs > inactiveThreshold) { // // Xóa người dùng khỏi các map nếu họ không còn hoạt động // userActivities.delete(userId); // userSockets.delete(userId); // console.log(`User ${userId} marked as offline`); // } // }); // setTimeout(checkAllUsersOnlineStatus, 60000); // } // checkAllUsersOnlineStatus();
Leave a Comment