Untitled

 avatar
unknown
typescript
a month ago
14 kB
2
Indexable
import { UserActivityModel } from './../models/MongoDB/schemas/userActivityModel';
import Redis from "ioredis";
import WebSocket, { WebSocketServer } from 'ws';
import { decodeJWT } from "../helpers/jwt";
import { feedAndSellAnimalNew_socket, getDataPlantCareAllNew_V3_socket, updateFactoryStatus_socket, updateFarmAnimalStatus_socket } from "../services/farm.service";
import { getListOrder_socket, log_socket } from "../services/order.service";
import { checkAndProcessPendingRewards } from "../services/friendAndHelp.service";
import { v4 as uuidv4 } from 'uuid';
import { CustomDebugLoggerDev } from '../loggers/winston/debug.log';

let wss: WebSocketServer;

class UserActivityCache {
  constructor(public userId: string, public lastActiveTime: number) { }

  private sockets: Map<string, WebSocket> = new Map();

  public addSocket(socket: WebSocket): void {
    const socketId = uuidv4();
    this.sockets.set(socketId, socket);
    socket['socketId'] = socketId;
    socket['requestId'] = null;
  }

  public removeSocket(socket: WebSocket): void {
    const socketId = socket['socketId'];
    if (socketId) {
      this.sockets.delete(socketId);
    }
  }

  public get activeSockets(): WebSocket[] {
    return Array.from(this.sockets.values()).filter(socket => socket.readyState === WebSocket.OPEN);
  }

  public get socketIds(): string[] {
    return Array.from(this.sockets.keys());
  }
}

const userActivities: Map<string | number, UserActivityCache> = new Map();
let userSockets = new Map<string | number, WebSocket>();

export const initSocket = (serverApp) => {
  const options = {
    server: serverApp,
    perMessageDeflate: false,
    clientTracking: true,
    maxPayload: 1024 * 1024 * 100,
    pingInterval: 30000,
    pingTimeout: 60000
  };

  wss = new WebSocketServer(options);

  // const redisClient = new Redis({
  //   host: process.env.HOST_REDIS,
  //   port: parseInt(process.env.PORT_REDIS),
  //   password: process.env.PASS_REDIS,
  //   db: 4
  // });

  const pubClient = new Redis({
    host: process.env.HOST_REDIS,
    port: parseInt(process.env.PORT_REDIS),
    password: process.env.PASS_REDIS,
    db: 4
  });

  const subClient = pubClient.duplicate();
  subClient.subscribe('globalSocketChannel');
  subClient.on('message', (channel, message) => {
    if (channel === 'globalSocketChannel') {
      const parsedMessage = JSON.parse(message);
      const { userId, event, data } = parsedMessage;
      wss.clients.forEach((client) => {
        if (client['userData'] && client['userData'].id === userId) {
          client.send(JSON.stringify({ event, data }));
        }
      });
    }
  });

  // Sử dụng Redis để lưu trữ và đồng bộ hóa trạng thái
  const requestQueue = new Redis({
    host: process.env.HOST_REDIS,
    port: parseInt(process.env.PORT_REDIS),
    password: process.env.PASS_REDIS,
    db: 5
  });

  // Kiểm tra nội dung của hàng đợi
  const requests = requestQueue.lrange('requests_queue', 0, -1);
  // console.log('Current requests in queue:', requests);

  wss.on('connection', async (ws, req) => {
    try {
      const token = req.url?.split('token=')[1];
      if (!token) {
        ws.close(4001, 'Unauthorized access');
        return;
      }

      const decoded = decodeJWT(token) as any;
      userSockets.set(Number(decoded.id), ws);
      ws['userData'] = decoded;

      updateUserLastActiveTime(ws['userData'].id);

      if (!userActivities.has(ws['userData'].id)) {
        loadUserActivityFromDB(ws['userData'].id).then((userData) => {
          if (userData) {
            userActivities.set(ws['userData'].id, new UserActivityCache(userData.userId, userData.lastActiveTime.getTime()));
          } else {
            userActivities.set(ws['userData'].id, new UserActivityCache(ws['userData'].id, Date.now()));
          }
        });
      }

      // Thêm socket vào UserActivityCache
      const userActivity = getUserActivity(ws['userData'].id);
      userActivity.addSocket(ws); // Thêm socket vào userActivity
      console.log(`Socket added for userId ${ws['userData'].id}:`, ws['socketId']);

      ws.on('message', async (message) => {
        updateUserLastActiveTime(ws['userData'].id);
        const data = JSON.parse(message.toString());

        // // Kiểm tra nếu event là 'ping'
        // if (data.event === 'ping') {
        //   ws.send(JSON.stringify({ event: 'pong' }));
        //   return; // Không đẩy vào hàng đợi Redis
        // }

        if (data.event === 'getDataPlantCareAllNew-v3' || data.event === 'getDataAnimalCareNew') {
          const requestId = uuidv4();
          ws['requestId'] = requestId;
          console.log(`Sending request with requestId: ${requestId} for userId: ${ws['userData'].id}`);

          await requestQueue.set(`request:${requestId}`, JSON.stringify({ id: requestId, event: data.event, payload: data.payload }));
          await requestQueue.rpush('requests_queue', requestId);

          // Xử lý yêu cầu
          const requestQueueId = await requestQueue.llen('requests_queue');
          CustomDebugLoggerDev.debug('requestQueueId:::::::::::::::::::::xxxxxxx::::xxxxx', requestQueueId);

          if (requestQueueId === 1) {
            processNextRequest(requestQueue, ws);
          }
        }

      });

      // Ping-Pong
      ws.on('message', (message) => {
        const data = JSON.parse(message.toString());
        if (data.event === 'ping') {
          ws.send(JSON.stringify({ event: 'pong' }));
        }
      });

      ws.on('close', () => {
        getUserActivity(ws['userData'].id).removeSocket(ws);
        saveUserActivityToDB(ws['userData'].id);
        userSockets.delete(Number(decoded.id));

        const userId = ws['userData'].id;
        const userActivity = getUserActivity(userId);
        if (userActivity.activeSockets.length === 0) {
          userActivities.delete(userId);
        }
      });

      ws.on('error', (error) => {
        console.error('WebSocket error:', error);
      });

    } catch (error) {
      console.error('Token error:', error);
      ws.close();
    }
  });

  async function processNextRequest(requestQueue, ws) {
    const requestId = await requestQueue.lindex('requests_queue', 0);
    CustomDebugLoggerDev.debug('requestQueue.lpop(requests_queue):::::::::::::::::::::::::', requestId);

    if (requestId) {
      const request = await requestQueue.get(`request:${requestId}`);
      CustomDebugLoggerDev.debug('request:::::::::::::::::::::::::', request);
      if (request) {
        const { id, event, payload } = JSON.parse(request);
        const handler = getEventHandler(event);

        try {
          const result = await handler(ws['userData'], payload);
          sendResponse(id, event, result);
          // CustomDebugLoggerDev.debug('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:::::::::::::::::::::::::::::::::::::::::::::::::::::::', `Processed event: ${event} for requestId: ${id}`);
          await requestQueue.del(`request:${requestId}`);
        } catch (error) {
          console.error(`Error processing event: ${event} for requestId: ${id}`, error);
          sendError(id, event, error.message);
        }
        await requestQueue.lpop('requests_queue');
        processNextRequest(requestQueue, ws);
      } else {
        console.log(`No request found for requestId: ${requestId}`);
      }
    }
  }

  function getEventHandler(event) {
    switch (event) {
      case 'getDataPlantCareAllNew-v3':
        return getDataPlantCareAllNew_V3_socket;
      case 'getDataAnimalCareNew':
        return feedAndSellAnimalNew_socket;
      case 'statusFactoryUpdate':
        return updateFactoryStatus_socket;
      case 'statusFarmAnimalUpdate':
        return updateFarmAnimalStatus_socket;
      case 'getListOrder':
        return getListOrder_socket;
      case 'log':
        return log_socket;
      default:
        throw new Error(`Handler not found for event: ${event}`);
    }
  }

  function sendResponse(requestId, event, result) {
    const ws = getWebSocket(requestId);
    if (ws) {
      if (ws.readyState === WebSocket.OPEN) {
        // console.log(`Sending response for requestId: ${requestId}, event: ${event}`);
        // if (event === 'ping') {
        //   ws.send(JSON.stringify({ event: 'pong' }));
        // } else {
        ws.send(JSON.stringify({
          event: `${event}-response`,
          errorCode: result.errorCode || 0,
          data: result.data || null,
          message: result.message || '',
          timestamp: result.timestamp || null
        }));
        // }

        // Xóa event đã lưu trong hàng đợi Redis
        const requestQueue = new Redis({
          host: process.env.HOST_REDIS,
          port: parseInt(process.env.PORT_REDIS),
          password: process.env.PASS_REDIS,
          db: 5
        });
        requestQueue.del(`request:${requestId}`); // Xóa yêu cầu khỏi hàng đợi
      } else {
        console.log('Socket is not open for requestId:', requestId);
      }
    } else {
      console.log('No socket found for requestId:', requestId);
    }
  }

  function sendError(requestId, event, errorMessage) {
    const ws = getWebSocket(requestId);
    if (ws && ws.readyState === WebSocket.OPEN) {
      ws.send(JSON.stringify({ event: `${event}-error`, error: errorMessage }));
      // ws.send(JSON.stringify({ id: requestId, event: `${event}-error`, error: errorMessage }));
    }
  }

  function getWebSocket(requestId) {
    // console.log('No socket found for userActivity:0000', Array.from(userActivities.entries()));
    console.log('No socket found for userActivity:1111', Array.from(userActivities.entries()).map(([userId, activity]) => ({
      userId,
      lastActiveTime: activity.lastActiveTime,
      sockets: activity.activeSockets
    })));
    // const ws = userSockets.get(Number(userId));

    for (const [userId, userActivity] of Array.from(userActivities.entries())) {
      for (const socket of userActivity.activeSockets) {
        console.log('Checking socket:', socket['requestId']);
        if (socket['requestId'] === requestId) {
          return socket;
        }
      }
    }
    console.log('No socket found for requestId:', requestId);
    return null;
  }

  return wss;
};

export const getSocket_BKKK123 = () => {
  return {
    emitToUser: (userId: string | number, event: string, data: any) => {
      const userActivity = getUserActivity(userId);
      if (userActivity) {
        userActivity.activeSockets.forEach((ws) => {
          ws.send(JSON.stringify({ event, data }));
        });
      }
    }
  };
};

export const getSocket = () => {
  return {
    emitToUser: (userId: string | number, event: string, data: any) => {
      const ws = userSockets.get(Number(userId));
      if (ws && ws.readyState === WebSocket.OPEN) {
        ws.send(JSON.stringify({ event, data }));
      }
    },

    emitToAllUsers: (event: string, data: any) => {
      userSockets.forEach((ws) => {
        if (ws.readyState === WebSocket.OPEN) {
          ws.send(JSON.stringify({ event, data }));
        }
      });
    }
  };
};

async function loadUserActivityFromDB(userId: string | number) {
  return await UserActivityModel.findOne({ userId }).exec();
}

async function saveUserActivityToDB(userId: string | number) {
  const userActivity = getUserActivity(userId);
  await UserActivityModel.updateOne(
    { userId },
    {
      $set: {
        lastActiveTime: new Date(userActivity.lastActiveTime),
        socketIds: userActivity.socketIds
      }
    },
    { upsert: true }
  );
}

function updateUserLastActiveTime(userId: string | number) {
  const userActivity = getUserActivity(userId);
  userActivity.lastActiveTime = Date.now();
}

function getUserActivity(userId: string | number): UserActivityCache {
  let userActivity = userActivities.get(userId);
  if (!userActivity) {
    userActivity = new UserActivityCache(userId.toString(), Date.now());
    userActivities.set(userId, userActivity);
  }
  return userActivity;
}

export const checkUserOnlineStatus = (userId: string | number): boolean => {
  const userActivity = getUserActivity(userId);
  const userSocket = userSockets.get(userId);

  if (!userActivity || !userSocket) return false;

  const lastActiveTimeMs = new Date(userActivity.lastActiveTime).getTime();
  const currentTime = Date.now();
  const inactiveThreshold = 60 * 1000; // 60 giây

  return currentTime - lastActiveTimeMs < inactiveThreshold && userSocket.readyState === WebSocket.OPEN;
};

// function checkAllUsersOnlineStatus() {
//   const currentTime = Date.now();
//   const inactiveThreshold = 60 * 1000; // 60 giây

//   userActivities.forEach((activity, userId) => {
//     const lastActiveTimeMs = new Date(activity.lastActiveTime).getTime();
//     if (currentTime - lastActiveTimeMs > inactiveThreshold) {
//       // Xóa người dùng khỏi các map nếu họ không còn hoạt động
//       userActivities.delete(userId);
//       userSockets.delete(userId);
//       console.log(`User ${userId} marked as offline`);
//     }
//   });

//   setTimeout(checkAllUsersOnlineStatus, 60000);
// }

// checkAllUsersOnlineStatus();
Leave a Comment