Untitled
'use strict'; import { UserActivityModel } from './../models/MongoDB/schemas/userActivityModel'; import Redis from "ioredis"; import WebSocket, { WebSocketServer } from 'ws'; import { decodeJWT } from "../helpers/jwt"; import { feedAndSellAnimalNew_socket, getDataPlantCareAllNew_V3_socket, updateFactoryStatus_socket, updateFarmAnimalStatus_socket } from "../services/farm.service"; import { getListOrder_socket, log_socket } from "../services/order.service"; import { checkAndProcessPendingRewards } from "../services/friendAndHelp.service"; import { v4 as uuidv4 } from 'uuid'; import { configEnv } from '../configs/system-configs'; class UserActivityCache { constructor(public userId: string, public lastActiveTime: number) { } private sockets: Map<string, WebSocket> = new Map(); public addSocket(socket: WebSocket): void { const socketId = uuidv4(); this.sockets.set(socketId, socket); socket['socketId'] = socketId; } public removeSocket(socket: WebSocket): void { const socketId = socket['socketId']; if (socketId) { this.sockets.delete(socketId); } } public get activeSockets(): WebSocket[] { return Array.from(this.sockets.values()).filter(socket => socket.readyState === WebSocket.OPEN); } public get socketIds(): string[] { return Array.from(this.sockets.keys()); } } const userActivities: Map<string | number, UserActivityCache> = new Map(); let userSockets = new Map<string | number, WebSocket>(); const redisClient = new Redis({ host: configEnv.REDIS_HOST || '10.201.42.74', port: parseInt(configEnv.REDIS_PORT) || 6379, password: configEnv.REDIS_PASSWORD || undefined, db: 4 }); const CHANNEL_NAME = 'websocket_channel'; const GLOBAL_CHANNEL = 'global_channel'; let wss: WebSocketServer; export const initSocket = (serverApp) => { const options = { server: serverApp, perMessageDeflate: false, clientTracking: true, maxPayload: 1024 * 1024 * 100, pingInterval: 30000, pingTimeout: 60000 }; wss = new WebSocketServer(options); const pubClient = redisClient.duplicate(); const subClient = redisClient.duplicate(); subClient.subscribe(CHANNEL_NAME); wss.on('connection', (ws, req) => { const token = req.url?.split('token=')[1]; if (!token) { ws.close(4001, 'Unauthorized access'); return; } try { const decoded = decodeJWT(token) as any; if (!decoded || !decoded.id) { ws.close(); return; } ws['userData'] = decoded; updateUserLastActiveTime(ws['userData'].id); if (!userActivities.has(ws['userData'].id)) { loadUserActivityFromDB(ws['userData'].id).then((userData) => { if (userData) { userActivities.set(ws['userData'].id, new UserActivityCache(userData.userId, userData.lastActiveTime.getTime())); } }); } // Lưu trữ thông tin user vào Redis redisClient.hset(`user:${decoded.id}`, 'socket_id', ws.id); ws.on('message', (message) => { updateUserLastActiveTime(ws['userData'].id); const data = JSON.parse(message.toString()); // Xử lý các sự kiện từ client if (data.event === 'ping') { ws.send(JSON.stringify({ event: 'pong' })); } else { // Gửi yêu cầu đến Redis channel pubClient.publish(CHANNEL_NAME, JSON.stringify({ userId: decoded.id, event: data.event, payload: data.payload })); } }); processWebSocketMessages(); checkAndProcessPendingRewards(ws['userData'].id); ws.on('close', () => { ////////// getUserActivity(ws['userData'].id).removeSocket(ws); saveUserActivityToDB(ws['userData'].id); // userSockets.delete(Number(decoded.id)); const userId = ws['userData'].id; const userActivity = getUserActivity(userId); if (userActivity.activeSockets.length === 0) { userActivities.delete(userId); } ///////// // Xóa thông tin user khỏi Redis khi kết nối đóng redisClient.hdel(`user:${decoded.id}`, 'socket_id'); }); ws.on('error', (error) => { console.error('WebSocket error:', error); }); } catch (error) { console.error('Token error:', error); ws.close(); } }); return wss; }; const processWebSocketMessages = () => { const subClient = redisClient.duplicate(); subClient.on('message', (channel, message) => { if (channel === CHANNEL_NAME) { const { userId, event, payload } = JSON.parse(message); switch (event) { case 'statusFactoryUpdate': updateFactoryStatus_socket(userId, payload) .then(result => sendResponse(userId, event, result)) .catch(error => sendError(userId, event, error)); break; case 'statusFarmAnimalUpdate': updateFarmAnimalStatus_socket(userId, payload) .then(result => sendResponse(userId, event, result)) .catch(error => sendError(userId, event, error)); break; case 'getDataPlantCareAllNew-v3': getDataPlantCareAllNew_V3_socket(userId, payload) .then(result => sendResponse(userId, event, result)) .catch(error => sendError(userId, event, error)); break; case 'getDataAnimalCareNew': feedAndSellAnimalNew_socket(userId, payload) .then(result => sendResponse(userId, event, result)) .catch(error => sendError(userId, event, error)); break; case 'getListOrder': getListOrder_socket(userId) .then(result => sendResponse(userId, event, result)) .catch(error => sendError(userId, event, error)); break; case 'log': log_socket(userId, payload) .then(result => sendResponse(userId, event, result)) .catch(error => sendError(userId, event, error)); break; default: console.log(`Unknown event: ${event}`); } } }); }; const sendResponse = (userId: string, event: string, data: any) => { redisClient.hgetall(`user:${userId}`, (err, user) => { if (err) { console.error(err); return; } if (user && user.socket_id) { const socket = wss.clients.find(client => client.id === user.socket_id); if (socket && socket.readyState === WebSocket.OPEN) { socket.send(JSON.stringify({ event: `${event}-response`, data })); } } }); }; const sendError = (userId: string, event: string, error: string) => { redisClient.hgetall(`user:${userId}`, (err, user) => { if (err) { console.error(err); return; } if (user && user.socket_id) { const socket = wss.clients.find(client => client.id === user.socket_id); if (socket && socket.readyState === WebSocket.OPEN) { socket.send(JSON.stringify({ event: `${event}-error`, error })); } } }); }; export const getSocket = () => { const pubClient = redisClient.duplicate(); return { emitToUser: (userId: string | number, event: string, data: any) => { redisClient.hgetall(`user:${String(userId)}`, (err, user) => { if (err) { console.error(err); return; } if (user && user.socket_id) { const socket = wss.clients.find(client => client.id === user.socket_id); if (socket && socket.readyState === WebSocket.OPEN) { socket.send(JSON.stringify({ event, data })); } } }); }, emitToAllUsers: (event: string, data: any) => { pubClient.publish(GLOBAL_CHANNEL, JSON.stringify({ event, data })); } }; }; export const checkUserOnlineStatus = (userId: string | number): Promise<boolean> => { return new Promise((resolve, reject) => { redisClient.hexists(`user:${String(userId)}`, 'socket_id', (err, count) => { if (err) { reject(err); } else { resolve(count === 1); } }); }); }; async function loadUserActivityFromDB(userId: string | number) { return await UserActivityModel.findOne({ userId }).exec(); } async function saveUserActivityToDB(userId: string | number) { const userActivity = getUserActivity(userId); await UserActivityModel.updateOne( { userId }, { $set: { lastActiveTime: new Date(userActivity.lastActiveTime), socketIds: userActivity.socketIds } }, { upsert: true } ); } function updateUserLastActiveTime(userId: string | number) { const userActivity = getUserActivity(userId); userActivity.lastActiveTime = Date.now(); } function getUserActivity(userId: string | number): UserActivityCache { let userActivity = userActivities.get(userId); if (!userActivity) { userActivity = new UserActivityCache(userId.toString(), Date.now()); userActivities.set(userId, userActivity); } return userActivity; } function handleWebSocketMessage(ws: WebSocket, data: any) { const eventHandlers: Record<string, (...args: any[]) => Promise<any>> = { 'getDataPlantCareAllNew-v3': getDataPlantCareAllNew_V3_socket, 'getDataAnimalCareNew': feedAndSellAnimalNew_socket, 'getListOrder': getListOrder_socket, 'log': log_socket }; if (data.event in eventHandlers) { try { eventHandlers[data.event](ws['userData'], data.payload).then(result => { ws.send(JSON.stringify({ event: `${data.event}-response`, result })); }).catch(error => { console.error(error); ws.send(JSON.stringify({ event: `${data.event}-error`, error: 'An error occurred' })); }); } catch (error) { console.error(error); ws.send(JSON.stringify({ event: `${data.event}-error`, error: 'An error occurred' })); } } }
Leave a Comment