Untitled
unknown
javascript
a year ago
8.4 kB
4
Indexable
module.exports = (ws, spawn, ffmpegLogStream, fs, path, s3) => { // console.log(ws); // Message handler let ffmpeg_processes = []; let clients = {}; let rtmpDestination = []; let streamId = ''; let feedStream = null; let isSaveRecord = false; ws.on('message', (message) => { // Custom message handler functions const handleConfigRtmpDestination = (data) => { console.log('Configuring RTMP destination:', data); // Your logic to handle configuring RTMP destination rtmpDestination = data.rtmp_addresses; streamId = data.streamId.toString(); const folderPath = path.join(__dirname, 'videos', streamId); fs.mkdirSync(folderPath, { recursive: true }); clients[streamId] = { folderPath: folderPath, videoChunks: [], }; } const handleStart = (data) => { console.log('Start command received using IOS:', data.isIOS); // Your logic to handle start command isSaveRecord = data.isSaveRecord; var teeOutputs = rtmpDestination.map(function (destination) { return '[f=flv:onfail=ignore]' + destination }).join('|'); if (data.isIOS) { var opts = [ '-hide_banner', '-thread_queue_size', '1024', '-loglevel', 'warning', '-re', '-i', '-', '-i', 'watermark.png', '-filter_complex', 'overlay=W-w-5:H-h-5', '-s', '1280x720', '-map', '0:v', '-c:v', 'libx264', '-preset', 'ultrafast', '-b:v', '2000k', '-maxrate', '2200k', '-bufsize', '6000k', '-g', '240', '-tune', 'zerolatency', '-map', '0:a', '-c:a', 'aac', '-b:a', '128k', '-y', '-use_wallclock_as_timestamps', '1', '-async', '1', '-r', '30', '-flvflags', 'no_duration_filesize', '-flags', '+global_header', '-f', 'tee', teeOutputs ]; } else { var opts = [ '-hide_banner', '-thread_queue_size', '1024', '-loglevel', 'warning', '-re', '-i', '-', '-i', 'watermark.png', '-filter_complex', 'overlay=W-w-5:H-h-5', '-s', '1280x720', '-map', '0:v', '-c:v', 'libx264', '-preset', 'ultrafast', '-b:v', '2000k', '-maxrate', '2200k', '-bufsize', '6000k', '-g', '240', '-tune', 'zerolatency', '-map', '0:a', '-c:a', 'aac', '-b:a', '128k', '-y', '-use_wallclock_as_timestamps', '1', '-async', '1', '-r', '30', '-flvflags', 'no_duration_filesize', '-flags', '+global_header', '-f', 'tee', teeOutputs ]; } console.log(opts); function addTimestamp(line) { const timestampedLine = `${new Date().toISOString()} - ${line}`; return timestampedLine; } //only spawn ffmpeg if there's rtmp destination if (rtmpDestination.length) { console.log("spawning ffmpeg"); let item = spawn("ffmpeg", opts, { stdio: ['pipe', 'pipe', 'pipe'] }); // item.socketId = socket.id; item.stderr.on('data', (data) => { const dataWithTimestamp = addTimestamp(data.toString()); ffmpegLogStream.write(dataWithTimestamp) }) item.on("uncaughtException", async (error) => { console.log("uncaughtException", error); handleEndProcess(); }); ffmpeg_processes.push(item); ffmpeg_processes.map((ffmpeg) => { ffmpeg.on("error", function (e) { if (e !== null && e > 0) { console.log(`child process ${ffmpeg.pid} error` + e); // socket.emit("error", "ffmpeg error!" + e); handleEndProcess(); } }); }); ffmpeg_processes.map((ffmpeg) => { ffmpeg.on("exit", function (e) { if (e !== null && e > 0) { console.log(`child process ${ffmpeg.pid} exit` + e); // socket.emit("fatal", "ffmpeg exit!" + e); handleEndProcess(); } }); }); } feedStream = (data) => { //only spawn ffmpeg if there's rtmp destination if (rtmpDestination.length) { ffmpeg_processes.map((ffmpeg) => { ffmpeg.stdin.write(data); }); } if (data) { clients[streamId].videoChunks.push(Buffer.from(data, 'base64')); } }; } const handleBinaryStream = (data) => { if (!feedStream) { console.log('handleBinaryStream feedStream empty') ws.send("fatal", "RTMP not set yet."); ws.close(); } if (data) { console.log('handleBinaryStream feedStream not empty') feedStream(data); } } const handleEndProcess = (data) => { console.log('End process command received:', data); // Your logic to handle end process command if (!isSaveRecord) { save_video_file(); // Save video file to disk }; if (ffmpeg_processes.length > 0) { // Check if ffmpeg_processes array is not empty try { feedStream = null; // Reset feedStream to null ffmpeg_processes.map((ffmpeg, index) => { console.log("#", index, " ", ffmpeg.pid); spawn("kill", ["-9", ffmpeg.pid]); }); console.log('kill ffmpeg processes') } catch (error) { console.log(error); console.log("fatal error: killing ffmpeg process attempt failed"); } finally { ffmpeg_processes = []; console.log("ffmpeg processes killed"); ws.close(); } } } const save_video_file = () => { console.time('save_video_file') const videoFilePath = path.join(path.join(__dirname, 'videos', streamId), 'before_encode_video.mp4'); const inputPath = path.join(path.join(__dirname, 'videos', streamId), 'before_encode_video.mp4'); const outputPath = path.join(path.join(__dirname, 'videos', streamId), 'video.mp4'); fs.writeFileSync(videoFilePath, Buffer.concat(clients[streamId].videoChunks)); // const streamId = "WDChZYuSrh"; // Example streamId const ffmpegSaveProcess = spawn('ffmpeg', ['-i', inputPath, '-c:v', 'copy', '-c:a', 'aac', '-strict', 'experimental', outputPath]); ffmpegSaveProcess.on('error', (error) => { console.error(`Error in ffmpeg process: ${error}`); }); ffmpegSaveProcess.stdout.on('data', (data) => { // console.log(`ffmpeg stdout: ${data}`); }); ffmpegSaveProcess.stderr.on('data', (data) => { console.error(`ffmpeg stderr: ${data}`); }); ffmpegSaveProcess.on('close', (code) => { if (code === 0) { //delete input file fs.unlinkSync(inputPath) console.log('Video re-encoded and potentially fixed for seeking.'); console.log('Video file saved to: ', outputPath); // Upload to DigitalOcean Spaces uploadToSpaces(outputPath); console.timeEnd('save_video_file') } else { console.error(`ffmpeg process exited with code ${code}`); } }); }; // Parse incoming message let parsedMessage; try { if (!isBlob(message)) { parsedMessage = JSON.parse(message); if (parsedMessage.event) { // Determine message event and call appropriate handler switch (parsedMessage.event) { case 'config_rtmpDestination': handleConfigRtmpDestination(parsedMessage.data); break; case 'start': handleStart(parsedMessage.data); break; case 'end_process': handleEndProcess(parsedMessage.data); break; case 'error': handleError(parsedMessage.data); break; default: return; // console.warn('Unknown message type:', parsedMessage.type); } } } } catch (error) { handleBinaryStream(message); } }); function isBlob(data) { // Check if the data is of Blob-like structure // This is a simplified example, you might need to adjust this based on your specific requirements return ( typeof data === 'object' && typeof data.size === 'number' && typeof data.type === 'string' && typeof data.arrayBuffer === 'function' ); } const uploadToSpaces = (filePath) => { const fileStream = fs.createReadStream(filePath); const uploadParams = { Bucket: process.env.DO_SPACES_BUCKET, Key: `${process.env.DO_STORAGE_FOLDER}/videos/${streamId}.mp4`, // Set the key as per your requirement Body: fileStream, contentType: 'video/mp4', }; s3.upload(uploadParams, (err, data) => { if (err) { console.error("Error uploading to DigitalOcean Spaces:", err); } else { console.log("File uploaded successfully to DigitalOcean Spaces:", data.Location); } }); }; }
Editor is loading...
Leave a Comment