Untitled
user_8163913
plain_text
2 years ago
6.4 kB
6
Indexable
const fs = require("fs");
const https = require("https");
const WebSocket = require("ws");
const express = require("express");
const bodyParser = require("body-parser");
const portFinder = require("portfinder");
const server = express();
const url = require("url");
const fetch = require("node-fetch");
const privateKey = fs.readFileSync("config/privkey.pem", "utf8");
const certificate = fs.readFileSync("config/fullchain.pem", "utf8");
const envConfig = JSON.parse(fs.readFileSync("config/env.json", "utf8"));
const credentials = { key: privateKey, cert: certificate };
server.use(bodyParser.json());
const cloudServer = envConfig.chickenboy_site.endpoint;
server.listen(3000, () => {
console.log("Listening on port 3000");
});
server.get("/api/start-stream", async (req, res) => {
try {
const { robotId, streamId } = req.query;
const streamVerificationUrl = `${cloudServer}/api/v2/robots/verify-video-stream?robotId=${robotId}&streamId=${streamId}`;
const verificationResponse = await fetch(streamVerificationUrl);
const verificationData = await verificationResponse.json();
console.log("verificationData", verificationData);
if (verificationData[0]) {
console.log("verificationData", verificationData);
const port = await portFinder.getPortPromise();
startStream(port, streamId, robotId);
res.header("Access-Control-Allow-Origin", "*");
res.header("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept");
res.json({ port });
} else {
res.json({ error: "Can't handle the request" });
}
} catch (err) {
console.log("Error:", err);
res.status(500).json({ error: "An error occurred" });
}
});
server.get("/healthcheck", (req, res) => {
if (envConfig.chickenboy_site.endpoint) {
res.json({
status: "Ok",
});
} else {
console.log("chickenboy_site.endpoint is not defined in environment config");
res.status(500).json({
status: "Error",
});
}
});
server.get("/connectivity", (req, resp) => {
fetch(`${cloudServer}/login`)
.then((res) => {
if (res.status !== 200) {
throw new Error();
}
resp.json({
status: "Ok",
});
})
.catch((err) => {
console.log("Could not connect to ChickenBoy Website. Check endpoint and credentials in environment config");
resp.status(500).json({
status: err,
});
});
});
function startStream(port, streamId, robotId) {
console.log("Starting stream on port:", port);
const RECORD_STREAM = false;
const STREAM_SECRET = streamId;
const httpsServer = https
.createServer(credentials, (request, response) => {
console.log("Request received");
const params = request.url.substr(1).split("/");
if (params[0] !== STREAM_SECRET) {
console.log(`Failed Stream Connection: ${request.socket.remoteAddress}:${request.socket.remotePort} - wrong secret.`);
response.end();
}
console.log(`Stream Connected: ${request.socket.remoteAddress}:${request.socket.remotePort}`);
request.on("data", (data) => {
socketServer.broadcast(data);
if (request.socket.recording) {
try {
request.socket.recording.write(data);
} catch (err) {
console.log("Broadcasting data error:", err);
}
}
});
request.on("end", () => {
if (socketServer.connectionCount === 0) {
console.log("The robot has stopped streaming to the node server, and there is no one else connected");
closeCloudServer(socketServer);
}
if (request.socket.recording) {
try {
request.socket.recording.close();
} catch (err) {
console.log("Ending the stream caused the following error:", err);
}
}
});
request.on("error", (err) => {
console.log("The node server is erroring out with message:", err);
});
if (RECORD_STREAM) {
const path = `recordings/${Date.now()}.ts`;
request.socket.recording = fs.createWriteStream(path);
}
})
.listen(port);
const socketServer = new WebSocket.Server({ server: httpsServer });
socketServer.connectionCount = 0;
socketServer.stopStreamServer = false;
socketServer.streamId = streamId;
socketServer.robotId = robotId;
function noop() {}
function heartbeat() {
this.isAlive = true;
}
function closeCloudServer(socket) {
const closeStreamUrl = `${cloudServer}/api/v2/robots/close-video-stream?robotId=${socket.robotId}&streamId=${socket.streamId}`;
fetch(closeStreamUrl)
.then((res) => res.json())
.then((json) => {
socket.close();
socket.stopStreamServer = true;
httpsServer.close();
console.log(json.result);
});
}
socketServer.on("connection", (socket, upgradeReq) => {
const location = url.parse((upgradeReq || socket.upgradeReq).url, true);
if (location.query.streamId !== socketServer.streamId) {
socket.close();
}
socketServer.connectionCount++;
console.log(
"New WebSocket Connection:",
(upgradeReq || socket.upgradeReq).socket.remoteAddress,
(upgradeReq || socket.upgradeReq).headers["user-agent"],
`(${socketServer.connectionCount} total)`
);
socket.isAlive = true;
socket.on("pong", heartbeat);
socket.on("error", (err) => {
console.log("Node server is erroring out:");
console.log(err.stack);
});
socket.on("close", (code, message) => {
socketServer.connectionCount--;
console.log("Disconnected WebSocket:", `(${socketServer.connectionCount} total)`);
if (socketServer.connectionCount === 0) {
closeCloudServer(socket);
}
});
});
socketServer.broadcast = (data) => {
if (socketServer.connectionCount === 0) {
console.log("When no more clients are connected, close the stream, and the robot is streaming");
closeCloudServer(socketServer);
}
socketServer.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
};
const interval = setInterval(() => {
socketServer.clients.forEach((socket) => {
if (socket.isAlive === false) return socket.terminate();
socket.isAlive = false;
socket.ping(noop);
});
}, 1000 * 60);
}
Editor is loading...