Untitled

 avatar
unknown
typescript
a month ago
10 kB
1
Indexable
'use strict';

import { UserActivityModel } from './../models/MongoDB/schemas/userActivityModel';
import Redis from "ioredis";
import WebSocket, { WebSocketServer } from 'ws';
import { decodeJWT } from "../helpers/jwt";
import {
  feedAndSellAnimalNew_socket,
  getDataPlantCareAllNew_V3_socket,
  updateFactoryStatus_socket,
  updateFarmAnimalStatus_socket
} from "../services/farm.service";
import { getListOrder_socket, log_socket } from "../services/order.service";
import { checkAndProcessPendingRewards } from "../services/friendAndHelp.service";
import { v4 as uuidv4 } from 'uuid';
import { configEnv } from '../configs/system-configs';

class UserActivityCache {
  constructor(public userId: string, public lastActiveTime: number) { }

  private sockets: Map<string, WebSocket> = new Map();

  public addSocket(socket: WebSocket): void {
    const socketId = uuidv4();
    this.sockets.set(socketId, socket);
    socket['socketId'] = socketId;
  }

  public removeSocket(socket: WebSocket): void {
    const socketId = socket['socketId'];
    if (socketId) {
      this.sockets.delete(socketId);
    }
  }

  public get activeSockets(): WebSocket[] {
    return Array.from(this.sockets.values()).filter(socket => socket.readyState === WebSocket.OPEN);
  }

  public get socketIds(): string[] {
    return Array.from(this.sockets.keys());
  }
}

const userActivities: Map<string | number, UserActivityCache> = new Map();
let userSockets = new Map<string | number, WebSocket>();

const redisClient = new Redis({
  host: configEnv.REDIS_HOST || '10.201.42.74',
  port: parseInt(configEnv.REDIS_PORT) || 6379,
  password: configEnv.REDIS_PASSWORD || undefined,
  db: 4
});

const CHANNEL_NAME = 'websocket_channel';
const GLOBAL_CHANNEL = 'global_channel';

let wss: WebSocketServer;

export const initSocket = (serverApp) => {
  const options = {
    server: serverApp,
    perMessageDeflate: false,
    clientTracking: true,
    maxPayload: 1024 * 1024 * 100,
    pingInterval: 30000,
    pingTimeout: 60000
  };

  wss = new WebSocketServer(options);

  const pubClient = redisClient.duplicate();
  const subClient = redisClient.duplicate();

  subClient.subscribe(CHANNEL_NAME);

  wss.on('connection', (ws, req) => {
    const token = req.url?.split('token=')[1];
    if (!token) {
      ws.close(4001, 'Unauthorized access');
      return;
    }

    try {
      const decoded = decodeJWT(token) as any;

      if (!decoded || !decoded.id) {
        ws.close();
        return;
      }
      ws['userData'] = decoded;

      updateUserLastActiveTime(ws['userData'].id);
      if (!userActivities.has(ws['userData'].id)) {
        loadUserActivityFromDB(ws['userData'].id).then((userData) => {
          if (userData) {
            userActivities.set(ws['userData'].id, new UserActivityCache(userData.userId, userData.lastActiveTime.getTime()));
          }
        });
      }

      // Lưu trữ thông tin user vào Redis
      redisClient.hset(`user:${decoded.id}`, 'socket_id', ws.id);

      ws.on('message', (message) => {
        updateUserLastActiveTime(ws['userData'].id);
        const data = JSON.parse(message.toString());

        // Xử lý các sự kiện từ client
        if (data.event === 'ping') {
          ws.send(JSON.stringify({ event: 'pong' }));
        } else {
          // Gửi yêu cầu đến Redis channel
          pubClient.publish(CHANNEL_NAME, JSON.stringify({
            userId: decoded.id,
            event: data.event,
            payload: data.payload
          }));
        }
      });

      processWebSocketMessages();

      checkAndProcessPendingRewards(ws['userData'].id);

      ws.on('close', () => {
        //////////
        getUserActivity(ws['userData'].id).removeSocket(ws);
        saveUserActivityToDB(ws['userData'].id);
        // userSockets.delete(Number(decoded.id));

        const userId = ws['userData'].id;
        const userActivity = getUserActivity(userId);
        if (userActivity.activeSockets.length === 0) {
          userActivities.delete(userId);
        }
        /////////

        // Xóa thông tin user khỏi Redis khi kết nối đóng
        redisClient.hdel(`user:${decoded.id}`, 'socket_id');
      });

      ws.on('error', (error) => {
        console.error('WebSocket error:', error);
      });
    } catch (error) {
      console.error('Token error:', error);
      ws.close();
    }
  });

  return wss;
};

const processWebSocketMessages = () => {
  const subClient = redisClient.duplicate();
  subClient.on('message', (channel, message) => {
    if (channel === CHANNEL_NAME) {
      const { userId, event, payload } = JSON.parse(message);

      switch (event) {
        case 'statusFactoryUpdate':
          updateFactoryStatus_socket(userId, payload)
            .then(result => sendResponse(userId, event, result))
            .catch(error => sendError(userId, event, error));
          break;
        case 'statusFarmAnimalUpdate':
          updateFarmAnimalStatus_socket(userId, payload)
            .then(result => sendResponse(userId, event, result))
            .catch(error => sendError(userId, event, error));
          break;
        case 'getDataPlantCareAllNew-v3':
          getDataPlantCareAllNew_V3_socket(userId, payload)
            .then(result => sendResponse(userId, event, result))
            .catch(error => sendError(userId, event, error));
          break;
        case 'getDataAnimalCareNew':
          feedAndSellAnimalNew_socket(userId, payload)
            .then(result => sendResponse(userId, event, result))
            .catch(error => sendError(userId, event, error));
          break;
        case 'getListOrder':
          getListOrder_socket(userId)
            .then(result => sendResponse(userId, event, result))
            .catch(error => sendError(userId, event, error));
          break;
        case 'log':
          log_socket(userId, payload)
            .then(result => sendResponse(userId, event, result))
            .catch(error => sendError(userId, event, error));
          break;
        default:
          console.log(`Unknown event: ${event}`);
      }
    }
  });
};

const sendResponse = (userId: string, event: string, data: any) => {
  redisClient.hgetall(`user:${userId}`, (err, user) => {
    if (err) {
      console.error(err);
      return;
    }

    if (user && user.socket_id) {
      const socket = wss.clients.find(client => client.id === user.socket_id);
      if (socket && socket.readyState === WebSocket.OPEN) {
        socket.send(JSON.stringify({
          event: `${event}-response`,
          data
        }));
      }
    }
  });
};

const sendError = (userId: string, event: string, error: string) => {
  redisClient.hgetall(`user:${userId}`, (err, user) => {
    if (err) {
      console.error(err);
      return;
    }

    if (user && user.socket_id) {
      const socket = wss.clients.find(client => client.id === user.socket_id);
      if (socket && socket.readyState === WebSocket.OPEN) {
        socket.send(JSON.stringify({
          event: `${event}-error`,
          error
        }));
      }
    }
  });
};

export const getSocket = () => {
  const pubClient = redisClient.duplicate();
  return {
    emitToUser: (userId: string | number, event: string, data: any) => {
      redisClient.hgetall(`user:${String(userId)}`, (err, user) => {
        if (err) {
          console.error(err);
          return;
        }

        if (user && user.socket_id) {
          const socket = wss.clients.find(client => client.id === user.socket_id);
          if (socket && socket.readyState === WebSocket.OPEN) {
            socket.send(JSON.stringify({ event, data }));
          }
        }
      });
    },
    emitToAllUsers: (event: string, data: any) => {
      pubClient.publish(GLOBAL_CHANNEL, JSON.stringify({ event, data }));
    }
  };
};

export const checkUserOnlineStatus = (userId: string | number): Promise<boolean> => {
  return new Promise((resolve, reject) => {
    redisClient.hexists(`user:${String(userId)}`, 'socket_id', (err, count) => {
      if (err) {
        reject(err);
      } else {
        resolve(count === 1);
      }
    });
  });
};

async function loadUserActivityFromDB(userId: string | number) {
  return await UserActivityModel.findOne({ userId }).exec();
}

async function saveUserActivityToDB(userId: string | number) {
  const userActivity = getUserActivity(userId);
  await UserActivityModel.updateOne(
    { userId },
    {
      $set: {
        lastActiveTime: new Date(userActivity.lastActiveTime),
        socketIds: userActivity.socketIds
      }
    },
    { upsert: true }
  );
}

function updateUserLastActiveTime(userId: string | number) {
  const userActivity = getUserActivity(userId);
  userActivity.lastActiveTime = Date.now();
}

function getUserActivity(userId: string | number): UserActivityCache {
  let userActivity = userActivities.get(userId);
  if (!userActivity) {
    userActivity = new UserActivityCache(userId.toString(), Date.now());
    userActivities.set(userId, userActivity);
  }
  return userActivity;
}

function handleWebSocketMessage(ws: WebSocket, data: any) {
  const eventHandlers: Record<string, (...args: any[]) => Promise<any>> = {
    'getDataPlantCareAllNew-v3': getDataPlantCareAllNew_V3_socket,
    'getDataAnimalCareNew': feedAndSellAnimalNew_socket,
    'getListOrder': getListOrder_socket,
    'log': log_socket
  };

  if (data.event in eventHandlers) {
    try {
      eventHandlers[data.event](ws['userData'], data.payload).then(result => {
        ws.send(JSON.stringify({
          event: `${data.event}-response`,
          result
        }));
      }).catch(error => {
        console.error(error);
        ws.send(JSON.stringify({ event: `${data.event}-error`, error: 'An error occurred' }));
      });
    } catch (error) {
      console.error(error);
      ws.send(JSON.stringify({ event: `${data.event}-error`, error: 'An error occurred' }));
    }
  }
}
Leave a Comment