Untitled

 avatar
unknown
javascript
a year ago
8.4 kB
4
Indexable
module.exports = (ws, spawn, ffmpegLogStream, fs, path, s3) => {
	// console.log(ws);
	// Message handler
	let ffmpeg_processes = [];
	let clients = {};
	let rtmpDestination = [];
	let streamId = '';
	let feedStream = null;
	let isSaveRecord = false;

	ws.on('message', (message) => {
		// Custom message handler functions
		const handleConfigRtmpDestination = (data) => {
			console.log('Configuring RTMP destination:', data);
			// Your logic to handle configuring RTMP destination
			rtmpDestination = data.rtmp_addresses;
			streamId = data.streamId.toString();
			const folderPath = path.join(__dirname, 'videos', streamId);
			fs.mkdirSync(folderPath, { recursive: true });

			clients[streamId] = {
				folderPath: folderPath,
				videoChunks: [],
			};
		}

		const handleStart = (data) => {
			console.log('Start command received using IOS:', data.isIOS);
			// Your logic to handle start command
			isSaveRecord = data.isSaveRecord;
			var teeOutputs = rtmpDestination.map(function (destination) {
				return '[f=flv:onfail=ignore]' + destination
			}).join('|');

			if (data.isIOS) {
				var opts = [
					'-hide_banner',
					'-thread_queue_size', '1024',
					'-loglevel', 'warning',
					'-re', '-i', '-',
					'-i', 'watermark.png', '-filter_complex', 'overlay=W-w-5:H-h-5',
					'-s', '1280x720',
					'-map', '0:v',
					'-c:v', 'libx264', '-preset', 'ultrafast', '-b:v', '2000k', '-maxrate', '2200k', '-bufsize', '6000k', '-g', '240',
					'-tune', 'zerolatency',
					'-map', '0:a',
					'-c:a', 'aac', '-b:a', '128k',
					'-y',
					'-use_wallclock_as_timestamps', '1',
					'-async', '1',
					'-r', '30',
					'-flvflags', 'no_duration_filesize',
					'-flags', '+global_header',
					'-f', 'tee',
					teeOutputs
				];
			} else {
				var opts = [
					'-hide_banner',
					'-thread_queue_size', '1024',
					'-loglevel', 'warning',
					'-re', '-i', '-',
					'-i', 'watermark.png', '-filter_complex', 'overlay=W-w-5:H-h-5',
					'-s', '1280x720',
					'-map', '0:v',
					'-c:v', 'libx264', '-preset', 'ultrafast', '-b:v', '2000k', '-maxrate', '2200k', '-bufsize', '6000k', '-g', '240',
					'-tune', 'zerolatency',
					'-map', '0:a',
					'-c:a', 'aac', '-b:a', '128k',
					'-y',
					'-use_wallclock_as_timestamps', '1',
					'-async', '1',
					'-r', '30',
					'-flvflags', 'no_duration_filesize',
					'-flags', '+global_header',
					'-f', 'tee',
					teeOutputs
				];
			}

			console.log(opts);

			function addTimestamp(line) {
				const timestampedLine = `${new Date().toISOString()} - ${line}`;
				return timestampedLine;
			}

			//only spawn ffmpeg if there's rtmp destination
			if (rtmpDestination.length) {
				console.log("spawning ffmpeg");

				let item = spawn("ffmpeg", opts, { stdio: ['pipe', 'pipe', 'pipe'] });
				// item.socketId = socket.id;

				item.stderr.on('data', (data) => {
					const dataWithTimestamp = addTimestamp(data.toString());
					ffmpegLogStream.write(dataWithTimestamp)
				})

				item.on("uncaughtException", async (error) => {
					console.log("uncaughtException", error);
					handleEndProcess();
				});
				ffmpeg_processes.push(item);

				ffmpeg_processes.map((ffmpeg) => {
					ffmpeg.on("error", function (e) {
						if (e !== null && e > 0) {
							console.log(`child process ${ffmpeg.pid} error` + e);
							// socket.emit("error", "ffmpeg error!" + e);
							handleEndProcess();
						}
					});
				});

				ffmpeg_processes.map((ffmpeg) => {
					ffmpeg.on("exit", function (e) {
						if (e !== null && e > 0) {
							console.log(`child process ${ffmpeg.pid} exit` + e);
							// socket.emit("fatal", "ffmpeg exit!" + e);
							handleEndProcess();
						}
					});
				});
			}

			feedStream = (data) => {
				//only spawn ffmpeg if there's rtmp destination
				if (rtmpDestination.length) {
					ffmpeg_processes.map((ffmpeg) => {
						ffmpeg.stdin.write(data);
					});
				}

				if (data) {
					clients[streamId].videoChunks.push(Buffer.from(data, 'base64'));
				}
			};
		}

		const handleBinaryStream = (data) => {
			if (!feedStream) {
				console.log('handleBinaryStream feedStream empty')
				ws.send("fatal", "RTMP not set yet.");
				ws.close();
			}

			if (data) {
				console.log('handleBinaryStream feedStream not empty')
				feedStream(data);
			}
		}

		const handleEndProcess = (data) => {
			console.log('End process command received:', data);
			// Your logic to handle end process command
			if (!isSaveRecord) {
				save_video_file(); // Save video file to disk
			};
			if (ffmpeg_processes.length > 0) { // Check if ffmpeg_processes array is not empty
				try {
					feedStream = null; // Reset feedStream to null
					ffmpeg_processes.map((ffmpeg, index) => {
						console.log("#", index, " ", ffmpeg.pid);
						spawn("kill", ["-9", ffmpeg.pid]);
					});

					console.log('kill ffmpeg processes')

				} catch (error) {
					console.log(error);
					console.log("fatal error: killing ffmpeg process attempt failed");

				} finally {
					ffmpeg_processes = [];
					console.log("ffmpeg processes killed");
					ws.close();
				}
			}
		}

		const save_video_file = () => {
			console.time('save_video_file')
			const videoFilePath = path.join(path.join(__dirname, 'videos', streamId), 'before_encode_video.mp4');
			const inputPath = path.join(path.join(__dirname, 'videos', streamId), 'before_encode_video.mp4');
			const outputPath = path.join(path.join(__dirname, 'videos', streamId), 'video.mp4');

			fs.writeFileSync(videoFilePath, Buffer.concat(clients[streamId].videoChunks));
			
			// const streamId = "WDChZYuSrh"; // Example streamId
			const ffmpegSaveProcess = spawn('ffmpeg', ['-i', inputPath, '-c:v', 'copy', '-c:a', 'aac', '-strict', 'experimental', outputPath]);

			ffmpegSaveProcess.on('error', (error) => {
				console.error(`Error in ffmpeg process: ${error}`);
			});

			ffmpegSaveProcess.stdout.on('data', (data) => {
				// console.log(`ffmpeg stdout: ${data}`);
			});

			ffmpegSaveProcess.stderr.on('data', (data) => {
				console.error(`ffmpeg stderr: ${data}`);
			});

			ffmpegSaveProcess.on('close', (code) => {
				if (code === 0) {

					//delete input file
					fs.unlinkSync(inputPath)

					console.log('Video re-encoded and potentially fixed for seeking.');
					console.log('Video file saved to: ', outputPath);

					// Upload to DigitalOcean Spaces
					uploadToSpaces(outputPath);
					console.timeEnd('save_video_file')
				} else {
					console.error(`ffmpeg process exited with code ${code}`);
				}
			});
		};

		// Parse incoming message
		let parsedMessage;
		try {
			if (!isBlob(message)) {
				parsedMessage = JSON.parse(message);
				if (parsedMessage.event) {
					// Determine message event and call appropriate handler
					switch (parsedMessage.event) {
						case 'config_rtmpDestination':
							handleConfigRtmpDestination(parsedMessage.data);
							break;
						case 'start':
							handleStart(parsedMessage.data);
							break;
						case 'end_process':
							handleEndProcess(parsedMessage.data);
							break;
						case 'error':
							handleError(parsedMessage.data);
							break;
						default:
							return;
						// console.warn('Unknown message type:', parsedMessage.type);
					}
				}
			}

		} catch (error) {
			handleBinaryStream(message);
		}
	});

	function isBlob(data) {
		// Check if the data is of Blob-like structure
		// This is a simplified example, you might need to adjust this based on your specific requirements
		return (
			typeof data === 'object' &&
			typeof data.size === 'number' &&
			typeof data.type === 'string' &&
			typeof data.arrayBuffer === 'function'
		);
	}

	const uploadToSpaces = (filePath) => {
		const fileStream = fs.createReadStream(filePath);

		const uploadParams = {
			Bucket: process.env.DO_SPACES_BUCKET,
			Key: `${process.env.DO_STORAGE_FOLDER}/videos/${streamId}.mp4`, // Set the key as per your requirement
			Body: fileStream,
			contentType: 'video/mp4',
		};

		s3.upload(uploadParams, (err, data) => {
			if (err) {
				console.error("Error uploading to DigitalOcean Spaces:", err);
			} else {
				console.log("File uploaded successfully to DigitalOcean Spaces:", data.Location);
			}
		});
	};
}
Editor is loading...
Leave a Comment