Untitled
unknown
javascript
2 years ago
3.9 kB
3
Indexable
gstreamer chains: https://pastecode.io/s/my03aheg publisher: import udp from "dgram"; import { WebPubSubServiceClient } from "@azure/web-pubsub"; import { EventHubProducerClient } from "@azure/event-hubs"; import { DefaultAzureCredential } from "@azure/identity"; // Event hubs const eventHubsResourceName = "isr-video"; const fullyQualifiedNamespace = `${eventHubsResourceName}.servicebus.windows.net`; const eventHubName = "test-20"; // Azure Identity - passwordless authentication const credential = new DefaultAzureCredential(); const server = udp.createSocket({ type: 'udp4' }); const serviceClient = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential) // const groupClient = serviceClient.group(process.env.PUBSUB_GROUP_NAME || 'video'); server.on('message', async (message) => { // console.log(`Recevive msg`) // console.log(message.length) const batch = await serviceClient.createBatch(); batch.tryAdd({ body: message }); await serviceClient.sendBatch(batch); }); server.on('error', (err) => { console.error(`UDP server error:\n${err.stack}`); server.close(); }); server.on('listening', () => { const address = server.address(); console.log(`UDP server listening on ${address.address}:${address.port}`); }); server.bind(1234); consumer: import { EventHubConsumerClient } from "@azure/event-hubs"; import { DefaultAzureCredential } from "@azure/identity"; import udp from "dgram"; import { connect } from "net"; import { PassThrough } from "stream"; // import { StreamUDP } from "stream-udp"; const udpClient = new udp.createSocket('udp4'); // udpClient.bind(7776, "localhost") const credential = new DefaultAzureCredential(); // Event hubs const eventHubsResourceName = "test-video"; const fullyQualifiedNamespace = `${eventHubsResourceName}.servicebus.windows.net`; const eventHubName = "test-video"; const consumerGroup = "video"; const consumerClient = new EventHubConsumerClient( consumerGroup, fullyQualifiedNamespace, eventHubName, credential ); const stream = new PassThrough(); //udpClient.connect(7777, 'localhost'); // stream.pipe(udpClient); // connect('localhost'm) // udpClient.send(Buffer.from(event.body), 7777, "localhost", (error) => { // // console.log(`Received message ${group} ${data}}`); // if (error) { // console.log(error); // udpClient.close(); // } else { // // console.log("Data sent !!!"); // } // }); // console.timeStamp('event') console.log('event') const subscription = consumerClient.subscribe( { processEvents: async (events, context) => { console.log(events) if (events.length === 0) { console.log( `No events received within wait time. Waiting for next interval` ); return; } // console.log(`event recivied: ${Date.now()}`) for (const event of events) { // console.log(context.partitionId) // console.log( // `Received event: '${event.body.length}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'` // ); // console.log(Buffer.from(event.body.toString(), 'base64')); udpClient.send(Buffer.from(event.body), 7777, "localhost", (error) => { console.log(`${event.body.length}`); if (error) { console.log(error); udpClient.close(); } else { // console.log("Data sent !!!"); } }); // stream.push(event.body); } // console.log(`event sent: ${Date.now()}`) // Update the checkpoint. }, processError: async (err, context) => { console.log(`Error : ${err}`); }, }, { startPosition: { offset: "@latest", }, skipParsingBodyAsJson: true, maxWaitTimeInSeconds: 1, } );
Editor is loading...