Untitled
unknown
javascript
2 years ago
8.4 kB
5
Indexable
module.exports = (ws, spawn, ffmpegLogStream, fs, path, s3) => {
// console.log(ws);
// Message handler
let ffmpeg_processes = [];
let clients = {};
let rtmpDestination = [];
let streamId = '';
let feedStream = null;
let isSaveRecord = false;
ws.on('message', (message) => {
// Custom message handler functions
const handleConfigRtmpDestination = (data) => {
console.log('Configuring RTMP destination:', data);
// Your logic to handle configuring RTMP destination
rtmpDestination = data.rtmp_addresses;
streamId = data.streamId.toString();
const folderPath = path.join(__dirname, 'videos', streamId);
fs.mkdirSync(folderPath, { recursive: true });
clients[streamId] = {
folderPath: folderPath,
videoChunks: [],
};
}
const handleStart = (data) => {
console.log('Start command received using IOS:', data.isIOS);
// Your logic to handle start command
isSaveRecord = data.isSaveRecord;
var teeOutputs = rtmpDestination.map(function (destination) {
return '[f=flv:onfail=ignore]' + destination
}).join('|');
if (data.isIOS) {
var opts = [
'-hide_banner',
'-thread_queue_size', '1024',
'-loglevel', 'warning',
'-re', '-i', '-',
'-i', 'watermark.png', '-filter_complex', 'overlay=W-w-5:H-h-5',
'-s', '1280x720',
'-map', '0:v',
'-c:v', 'libx264', '-preset', 'ultrafast', '-b:v', '2000k', '-maxrate', '2200k', '-bufsize', '6000k', '-g', '240',
'-tune', 'zerolatency',
'-map', '0:a',
'-c:a', 'aac', '-b:a', '128k',
'-y',
'-use_wallclock_as_timestamps', '1',
'-async', '1',
'-r', '30',
'-flvflags', 'no_duration_filesize',
'-flags', '+global_header',
'-f', 'tee',
teeOutputs
];
} else {
var opts = [
'-hide_banner',
'-thread_queue_size', '1024',
'-loglevel', 'warning',
'-re', '-i', '-',
'-i', 'watermark.png', '-filter_complex', 'overlay=W-w-5:H-h-5',
'-s', '1280x720',
'-map', '0:v',
'-c:v', 'libx264', '-preset', 'ultrafast', '-b:v', '2000k', '-maxrate', '2200k', '-bufsize', '6000k', '-g', '240',
'-tune', 'zerolatency',
'-map', '0:a',
'-c:a', 'aac', '-b:a', '128k',
'-y',
'-use_wallclock_as_timestamps', '1',
'-async', '1',
'-r', '30',
'-flvflags', 'no_duration_filesize',
'-flags', '+global_header',
'-f', 'tee',
teeOutputs
];
}
console.log(opts);
function addTimestamp(line) {
const timestampedLine = `${new Date().toISOString()} - ${line}`;
return timestampedLine;
}
//only spawn ffmpeg if there's rtmp destination
if (rtmpDestination.length) {
console.log("spawning ffmpeg");
let item = spawn("ffmpeg", opts, { stdio: ['pipe', 'pipe', 'pipe'] });
// item.socketId = socket.id;
item.stderr.on('data', (data) => {
const dataWithTimestamp = addTimestamp(data.toString());
ffmpegLogStream.write(dataWithTimestamp)
})
item.on("uncaughtException", async (error) => {
console.log("uncaughtException", error);
handleEndProcess();
});
ffmpeg_processes.push(item);
ffmpeg_processes.map((ffmpeg) => {
ffmpeg.on("error", function (e) {
if (e !== null && e > 0) {
console.log(`child process ${ffmpeg.pid} error` + e);
// socket.emit("error", "ffmpeg error!" + e);
handleEndProcess();
}
});
});
ffmpeg_processes.map((ffmpeg) => {
ffmpeg.on("exit", function (e) {
if (e !== null && e > 0) {
console.log(`child process ${ffmpeg.pid} exit` + e);
// socket.emit("fatal", "ffmpeg exit!" + e);
handleEndProcess();
}
});
});
}
feedStream = (data) => {
//only spawn ffmpeg if there's rtmp destination
if (rtmpDestination.length) {
ffmpeg_processes.map((ffmpeg) => {
ffmpeg.stdin.write(data);
});
}
if (data) {
clients[streamId].videoChunks.push(Buffer.from(data, 'base64'));
}
};
}
const handleBinaryStream = (data) => {
if (!feedStream) {
console.log('handleBinaryStream feedStream empty')
ws.send("fatal", "RTMP not set yet.");
ws.close();
}
if (data) {
console.log('handleBinaryStream feedStream not empty')
feedStream(data);
}
}
const handleEndProcess = (data) => {
console.log('End process command received:', data);
// Your logic to handle end process command
if (!isSaveRecord) {
save_video_file(); // Save video file to disk
};
if (ffmpeg_processes.length > 0) { // Check if ffmpeg_processes array is not empty
try {
feedStream = null; // Reset feedStream to null
ffmpeg_processes.map((ffmpeg, index) => {
console.log("#", index, " ", ffmpeg.pid);
spawn("kill", ["-9", ffmpeg.pid]);
});
console.log('kill ffmpeg processes')
} catch (error) {
console.log(error);
console.log("fatal error: killing ffmpeg process attempt failed");
} finally {
ffmpeg_processes = [];
console.log("ffmpeg processes killed");
ws.close();
}
}
}
const save_video_file = () => {
console.time('save_video_file')
const videoFilePath = path.join(path.join(__dirname, 'videos', streamId), 'before_encode_video.mp4');
const inputPath = path.join(path.join(__dirname, 'videos', streamId), 'before_encode_video.mp4');
const outputPath = path.join(path.join(__dirname, 'videos', streamId), 'video.mp4');
fs.writeFileSync(videoFilePath, Buffer.concat(clients[streamId].videoChunks));
// const streamId = "WDChZYuSrh"; // Example streamId
const ffmpegSaveProcess = spawn('ffmpeg', ['-i', inputPath, '-c:v', 'copy', '-c:a', 'aac', '-strict', 'experimental', outputPath]);
ffmpegSaveProcess.on('error', (error) => {
console.error(`Error in ffmpeg process: ${error}`);
});
ffmpegSaveProcess.stdout.on('data', (data) => {
// console.log(`ffmpeg stdout: ${data}`);
});
ffmpegSaveProcess.stderr.on('data', (data) => {
console.error(`ffmpeg stderr: ${data}`);
});
ffmpegSaveProcess.on('close', (code) => {
if (code === 0) {
//delete input file
fs.unlinkSync(inputPath)
console.log('Video re-encoded and potentially fixed for seeking.');
console.log('Video file saved to: ', outputPath);
// Upload to DigitalOcean Spaces
uploadToSpaces(outputPath);
console.timeEnd('save_video_file')
} else {
console.error(`ffmpeg process exited with code ${code}`);
}
});
};
// Parse incoming message
let parsedMessage;
try {
if (!isBlob(message)) {
parsedMessage = JSON.parse(message);
if (parsedMessage.event) {
// Determine message event and call appropriate handler
switch (parsedMessage.event) {
case 'config_rtmpDestination':
handleConfigRtmpDestination(parsedMessage.data);
break;
case 'start':
handleStart(parsedMessage.data);
break;
case 'end_process':
handleEndProcess(parsedMessage.data);
break;
case 'error':
handleError(parsedMessage.data);
break;
default:
return;
// console.warn('Unknown message type:', parsedMessage.type);
}
}
}
} catch (error) {
handleBinaryStream(message);
}
});
function isBlob(data) {
// Check if the data is of Blob-like structure
// This is a simplified example, you might need to adjust this based on your specific requirements
return (
typeof data === 'object' &&
typeof data.size === 'number' &&
typeof data.type === 'string' &&
typeof data.arrayBuffer === 'function'
);
}
const uploadToSpaces = (filePath) => {
const fileStream = fs.createReadStream(filePath);
const uploadParams = {
Bucket: process.env.DO_SPACES_BUCKET,
Key: `${process.env.DO_STORAGE_FOLDER}/videos/${streamId}.mp4`, // Set the key as per your requirement
Body: fileStream,
contentType: 'video/mp4',
};
s3.upload(uploadParams, (err, data) => {
if (err) {
console.error("Error uploading to DigitalOcean Spaces:", err);
} else {
console.log("File uploaded successfully to DigitalOcean Spaces:", data.Location);
}
});
};
}Editor is loading...
Leave a Comment