Untitled
unknown
typescript
9 months ago
10 kB
3
Indexable
'use strict';
import { UserActivityModel } from './../models/MongoDB/schemas/userActivityModel';
import Redis from "ioredis";
import WebSocket, { WebSocketServer } from 'ws';
import { decodeJWT } from "../helpers/jwt";
import {
feedAndSellAnimalNew_socket,
getDataPlantCareAllNew_V3_socket,
updateFactoryStatus_socket,
updateFarmAnimalStatus_socket
} from "../services/farm.service";
import { getListOrder_socket, log_socket } from "../services/order.service";
import { checkAndProcessPendingRewards } from "../services/friendAndHelp.service";
import { v4 as uuidv4 } from 'uuid';
import { configEnv } from '../configs/system-configs';
class UserActivityCache {
constructor(public userId: string, public lastActiveTime: number) { }
private sockets: Map<string, WebSocket> = new Map();
public addSocket(socket: WebSocket): void {
const socketId = uuidv4();
this.sockets.set(socketId, socket);
socket['socketId'] = socketId;
}
public removeSocket(socket: WebSocket): void {
const socketId = socket['socketId'];
if (socketId) {
this.sockets.delete(socketId);
}
}
public get activeSockets(): WebSocket[] {
return Array.from(this.sockets.values()).filter(socket => socket.readyState === WebSocket.OPEN);
}
public get socketIds(): string[] {
return Array.from(this.sockets.keys());
}
}
const userActivities: Map<string | number, UserActivityCache> = new Map();
let userSockets = new Map<string | number, WebSocket>();
const redisClient = new Redis({
host: configEnv.REDIS_HOST || '10.201.42.74',
port: parseInt(configEnv.REDIS_PORT) || 6379,
password: configEnv.REDIS_PASSWORD || undefined,
db: 4
});
const CHANNEL_NAME = 'websocket_channel';
const GLOBAL_CHANNEL = 'global_channel';
let wss: WebSocketServer;
export const initSocket = (serverApp) => {
const options = {
server: serverApp,
perMessageDeflate: false,
clientTracking: true,
maxPayload: 1024 * 1024 * 100,
pingInterval: 30000,
pingTimeout: 60000
};
wss = new WebSocketServer(options);
const pubClient = redisClient.duplicate();
const subClient = redisClient.duplicate();
subClient.subscribe(CHANNEL_NAME);
wss.on('connection', (ws, req) => {
const token = req.url?.split('token=')[1];
if (!token) {
ws.close(4001, 'Unauthorized access');
return;
}
try {
const decoded = decodeJWT(token) as any;
if (!decoded || !decoded.id) {
ws.close();
return;
}
ws['userData'] = decoded;
updateUserLastActiveTime(ws['userData'].id);
if (!userActivities.has(ws['userData'].id)) {
loadUserActivityFromDB(ws['userData'].id).then((userData) => {
if (userData) {
userActivities.set(ws['userData'].id, new UserActivityCache(userData.userId, userData.lastActiveTime.getTime()));
}
});
}
// Lưu trữ thông tin user vào Redis
redisClient.hset(`user:${decoded.id}`, 'socket_id', ws.id);
ws.on('message', (message) => {
updateUserLastActiveTime(ws['userData'].id);
const data = JSON.parse(message.toString());
// Xử lý các sự kiện từ client
if (data.event === 'ping') {
ws.send(JSON.stringify({ event: 'pong' }));
} else {
// Gửi yêu cầu đến Redis channel
pubClient.publish(CHANNEL_NAME, JSON.stringify({
userId: decoded.id,
event: data.event,
payload: data.payload
}));
}
});
processWebSocketMessages();
checkAndProcessPendingRewards(ws['userData'].id);
ws.on('close', () => {
//////////
getUserActivity(ws['userData'].id).removeSocket(ws);
saveUserActivityToDB(ws['userData'].id);
// userSockets.delete(Number(decoded.id));
const userId = ws['userData'].id;
const userActivity = getUserActivity(userId);
if (userActivity.activeSockets.length === 0) {
userActivities.delete(userId);
}
/////////
// Xóa thông tin user khỏi Redis khi kết nối đóng
redisClient.hdel(`user:${decoded.id}`, 'socket_id');
});
ws.on('error', (error) => {
console.error('WebSocket error:', error);
});
} catch (error) {
console.error('Token error:', error);
ws.close();
}
});
return wss;
};
const processWebSocketMessages = () => {
const subClient = redisClient.duplicate();
subClient.on('message', (channel, message) => {
if (channel === CHANNEL_NAME) {
const { userId, event, payload } = JSON.parse(message);
switch (event) {
case 'statusFactoryUpdate':
updateFactoryStatus_socket(userId, payload)
.then(result => sendResponse(userId, event, result))
.catch(error => sendError(userId, event, error));
break;
case 'statusFarmAnimalUpdate':
updateFarmAnimalStatus_socket(userId, payload)
.then(result => sendResponse(userId, event, result))
.catch(error => sendError(userId, event, error));
break;
case 'getDataPlantCareAllNew-v3':
getDataPlantCareAllNew_V3_socket(userId, payload)
.then(result => sendResponse(userId, event, result))
.catch(error => sendError(userId, event, error));
break;
case 'getDataAnimalCareNew':
feedAndSellAnimalNew_socket(userId, payload)
.then(result => sendResponse(userId, event, result))
.catch(error => sendError(userId, event, error));
break;
case 'getListOrder':
getListOrder_socket(userId)
.then(result => sendResponse(userId, event, result))
.catch(error => sendError(userId, event, error));
break;
case 'log':
log_socket(userId, payload)
.then(result => sendResponse(userId, event, result))
.catch(error => sendError(userId, event, error));
break;
default:
console.log(`Unknown event: ${event}`);
}
}
});
};
const sendResponse = (userId: string, event: string, data: any) => {
redisClient.hgetall(`user:${userId}`, (err, user) => {
if (err) {
console.error(err);
return;
}
if (user && user.socket_id) {
const socket = wss.clients.find(client => client.id === user.socket_id);
if (socket && socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify({
event: `${event}-response`,
data
}));
}
}
});
};
const sendError = (userId: string, event: string, error: string) => {
redisClient.hgetall(`user:${userId}`, (err, user) => {
if (err) {
console.error(err);
return;
}
if (user && user.socket_id) {
const socket = wss.clients.find(client => client.id === user.socket_id);
if (socket && socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify({
event: `${event}-error`,
error
}));
}
}
});
};
export const getSocket = () => {
const pubClient = redisClient.duplicate();
return {
emitToUser: (userId: string | number, event: string, data: any) => {
redisClient.hgetall(`user:${String(userId)}`, (err, user) => {
if (err) {
console.error(err);
return;
}
if (user && user.socket_id) {
const socket = wss.clients.find(client => client.id === user.socket_id);
if (socket && socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify({ event, data }));
}
}
});
},
emitToAllUsers: (event: string, data: any) => {
pubClient.publish(GLOBAL_CHANNEL, JSON.stringify({ event, data }));
}
};
};
export const checkUserOnlineStatus = (userId: string | number): Promise<boolean> => {
return new Promise((resolve, reject) => {
redisClient.hexists(`user:${String(userId)}`, 'socket_id', (err, count) => {
if (err) {
reject(err);
} else {
resolve(count === 1);
}
});
});
};
async function loadUserActivityFromDB(userId: string | number) {
return await UserActivityModel.findOne({ userId }).exec();
}
async function saveUserActivityToDB(userId: string | number) {
const userActivity = getUserActivity(userId);
await UserActivityModel.updateOne(
{ userId },
{
$set: {
lastActiveTime: new Date(userActivity.lastActiveTime),
socketIds: userActivity.socketIds
}
},
{ upsert: true }
);
}
function updateUserLastActiveTime(userId: string | number) {
const userActivity = getUserActivity(userId);
userActivity.lastActiveTime = Date.now();
}
function getUserActivity(userId: string | number): UserActivityCache {
let userActivity = userActivities.get(userId);
if (!userActivity) {
userActivity = new UserActivityCache(userId.toString(), Date.now());
userActivities.set(userId, userActivity);
}
return userActivity;
}
function handleWebSocketMessage(ws: WebSocket, data: any) {
const eventHandlers: Record<string, (...args: any[]) => Promise<any>> = {
'getDataPlantCareAllNew-v3': getDataPlantCareAllNew_V3_socket,
'getDataAnimalCareNew': feedAndSellAnimalNew_socket,
'getListOrder': getListOrder_socket,
'log': log_socket
};
if (data.event in eventHandlers) {
try {
eventHandlers[data.event](ws['userData'], data.payload).then(result => {
ws.send(JSON.stringify({
event: `${data.event}-response`,
result
}));
}).catch(error => {
console.error(error);
ws.send(JSON.stringify({ event: `${data.event}-error`, error: 'An error occurred' }));
});
} catch (error) {
console.error(error);
ws.send(JSON.stringify({ event: `${data.event}-error`, error: 'An error occurred' }));
}
}
}Editor is loading...
Leave a Comment