Untitled

 avatar
user_8163913
plain_text
a year ago
6.4 kB
1
Indexable
Never
const fs = require("fs");
const https = require("https");
const WebSocket = require("ws");
const express = require("express");
const bodyParser = require("body-parser");
const portFinder = require("portfinder");
const server = express();
const url = require("url");
const fetch = require("node-fetch");

const privateKey = fs.readFileSync("config/privkey.pem", "utf8");
const certificate = fs.readFileSync("config/fullchain.pem", "utf8");
const envConfig = JSON.parse(fs.readFileSync("config/env.json", "utf8"));

const credentials = { key: privateKey, cert: certificate };
server.use(bodyParser.json());

const cloudServer = envConfig.chickenboy_site.endpoint;

server.listen(3000, () => {
  console.log("Listening on port 3000");
});

server.get("/api/start-stream", async (req, res) => {
  try {
    const { robotId, streamId } = req.query;

    const streamVerificationUrl = `${cloudServer}/api/v2/robots/verify-video-stream?robotId=${robotId}&streamId=${streamId}`;
    const verificationResponse = await fetch(streamVerificationUrl);
    const verificationData = await verificationResponse.json();

    console.log("verificationData", verificationData);

    if (verificationData[0]) {
      console.log("verificationData", verificationData);
      const port = await portFinder.getPortPromise();
      startStream(port, streamId, robotId);

      res.header("Access-Control-Allow-Origin", "*");
      res.header("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept");
      res.json({ port });
    } else {
      res.json({ error: "Can't handle the request" });
    }
  } catch (err) {
    console.log("Error:", err);
    res.status(500).json({ error: "An error occurred" });
  }
});

server.get("/healthcheck", (req, res) => {
  if (envConfig.chickenboy_site.endpoint) {
    res.json({
      status: "Ok",
    });
  } else {
    console.log("chickenboy_site.endpoint is not defined in environment config");
    res.status(500).json({
      status: "Error",
    });
  }
});

server.get("/connectivity", (req, resp) => {
  fetch(`${cloudServer}/login`)
    .then((res) => {
      if (res.status !== 200) {
        throw new Error();
      }
      resp.json({
        status: "Ok",
      });
    })
    .catch((err) => {
      console.log("Could not connect to ChickenBoy Website. Check endpoint and credentials in environment config");
      resp.status(500).json({
        status: err,
      });
    });
});

function startStream(port, streamId, robotId) {
  console.log("Starting stream on port:", port);
  const RECORD_STREAM = false;
  const STREAM_SECRET = streamId;
  const httpsServer = https
    .createServer(credentials, (request, response) => {
      console.log("Request received");
      const params = request.url.substr(1).split("/");

      if (params[0] !== STREAM_SECRET) {
        console.log(`Failed Stream Connection: ${request.socket.remoteAddress}:${request.socket.remotePort} - wrong secret.`);
        response.end();
      }
      console.log(`Stream Connected: ${request.socket.remoteAddress}:${request.socket.remotePort}`);
      request.on("data", (data) => {
        socketServer.broadcast(data);
        if (request.socket.recording) {
          try {
            request.socket.recording.write(data);
          } catch (err) {
            console.log("Broadcasting data error:", err);
          }
        }
      });
      request.on("end", () => {
        if (socketServer.connectionCount === 0) {
          console.log("The robot has stopped streaming to the node server, and there is no one else connected");
          closeCloudServer(socketServer);
        }
        if (request.socket.recording) {
          try {
            request.socket.recording.close();
          } catch (err) {
            console.log("Ending the stream caused the following error:", err);
          }
        }
      });
      request.on("error", (err) => {
        console.log("The node server is erroring out with message:", err);
      });
      if (RECORD_STREAM) {
        const path = `recordings/${Date.now()}.ts`;
        request.socket.recording = fs.createWriteStream(path);
      }
    })
    .listen(port);

  const socketServer = new WebSocket.Server({ server: httpsServer });
  socketServer.connectionCount = 0;
  socketServer.stopStreamServer = false;
  socketServer.streamId = streamId;
  socketServer.robotId = robotId;

  function noop() {}

  function heartbeat() {
    this.isAlive = true;
  }

  function closeCloudServer(socket) {
    const closeStreamUrl = `${cloudServer}/api/v2/robots/close-video-stream?robotId=${socket.robotId}&streamId=${socket.streamId}`;
    fetch(closeStreamUrl)
      .then((res) => res.json())
      .then((json) => {
        socket.close();
        socket.stopStreamServer = true;
        httpsServer.close();
        console.log(json.result);
      });
  }

  socketServer.on("connection", (socket, upgradeReq) => {
    const location = url.parse((upgradeReq || socket.upgradeReq).url, true);
    if (location.query.streamId !== socketServer.streamId) {
      socket.close();
    }
    socketServer.connectionCount++;
    console.log(
      "New WebSocket Connection:",
      (upgradeReq || socket.upgradeReq).socket.remoteAddress,
      (upgradeReq || socket.upgradeReq).headers["user-agent"],
      `(${socketServer.connectionCount} total)`
    );
    socket.isAlive = true;
    socket.on("pong", heartbeat);
    socket.on("error", (err) => {
      console.log("Node server is erroring out:");
      console.log(err.stack);
    });
    socket.on("close", (code, message) => {
      socketServer.connectionCount--;
      console.log("Disconnected WebSocket:", `(${socketServer.connectionCount} total)`);
      if (socketServer.connectionCount === 0) {
        closeCloudServer(socket);
      }
    });
  });

  socketServer.broadcast = (data) => {
    if (socketServer.connectionCount === 0) {
      console.log("When no more clients are connected, close the stream, and the robot is streaming");
      closeCloudServer(socketServer);
    }
    socketServer.clients.forEach((client) => {
      if (client.readyState === WebSocket.OPEN) {
        client.send(data);
      }
    });
  };

  const interval = setInterval(() => {
    socketServer.clients.forEach((socket) => {
      if (socket.isAlive === false) return socket.terminate();
      socket.isAlive = false;
      socket.ping(noop);
    });
  }, 1000 * 60);
}