Untitled
unknown
javascript
2 years ago
3.9 kB
12
Indexable
gstreamer chains: https://pastecode.io/s/my03aheg
publisher:
import udp from "dgram";
import { WebPubSubServiceClient } from "@azure/web-pubsub";
import { EventHubProducerClient } from "@azure/event-hubs";
import { DefaultAzureCredential } from "@azure/identity";
// Event hubs
const eventHubsResourceName = "isr-video";
const fullyQualifiedNamespace = `${eventHubsResourceName}.servicebus.windows.net`;
const eventHubName = "test-20";
// Azure Identity - passwordless authentication
const credential = new DefaultAzureCredential();
const server = udp.createSocket({ type: 'udp4' });
const serviceClient = new EventHubProducerClient(fullyQualifiedNamespace,
eventHubName,
credential)
// const groupClient = serviceClient.group(process.env.PUBSUB_GROUP_NAME || 'video');
server.on('message', async (message) => {
// console.log(`Recevive msg`)
// console.log(message.length)
const batch = await serviceClient.createBatch();
batch.tryAdd({ body: message });
await serviceClient.sendBatch(batch);
});
server.on('error', (err) => {
console.error(`UDP server error:\n${err.stack}`);
server.close();
});
server.on('listening', () => {
const address = server.address();
console.log(`UDP server listening on ${address.address}:${address.port}`);
});
server.bind(1234);
consumer:
import { EventHubConsumerClient } from "@azure/event-hubs";
import { DefaultAzureCredential } from "@azure/identity";
import udp from "dgram";
import { connect } from "net";
import { PassThrough } from "stream";
// import { StreamUDP } from "stream-udp";
const udpClient = new udp.createSocket('udp4');
// udpClient.bind(7776, "localhost")
const credential = new DefaultAzureCredential();
// Event hubs
const eventHubsResourceName = "test-video";
const fullyQualifiedNamespace = `${eventHubsResourceName}.servicebus.windows.net`;
const eventHubName = "test-video";
const consumerGroup = "video";
const consumerClient = new EventHubConsumerClient(
consumerGroup,
fullyQualifiedNamespace,
eventHubName,
credential
);
const stream = new PassThrough();
//udpClient.connect(7777, 'localhost');
// stream.pipe(udpClient);
// connect('localhost'm)
// udpClient.send(Buffer.from(event.body), 7777, "localhost", (error) => {
// // console.log(`Received message ${group} ${data}}`);
// if (error) {
// console.log(error);
// udpClient.close();
// } else {
// // console.log("Data sent !!!");
// }
// });
// console.timeStamp('event')
console.log('event')
const subscription = consumerClient.subscribe(
{
processEvents: async (events, context) => {
console.log(events)
if (events.length === 0) {
console.log(
`No events received within wait time. Waiting for next interval`
);
return;
}
// console.log(`event recivied: ${Date.now()}`)
for (const event of events) {
// console.log(context.partitionId)
// console.log(
// `Received event: '${event.body.length}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'`
// );
// console.log(Buffer.from(event.body.toString(), 'base64'));
udpClient.send(Buffer.from(event.body), 7777, "localhost", (error) => {
console.log(`${event.body.length}`);
if (error) {
console.log(error);
udpClient.close();
} else {
// console.log("Data sent !!!");
}
});
// stream.push(event.body);
}
// console.log(`event sent: ${Date.now()}`)
// Update the checkpoint.
},
processError: async (err, context) => {
console.log(`Error : ${err}`);
},
},
{
startPosition: {
offset: "@latest",
},
skipParsingBodyAsJson: true,
maxWaitTimeInSeconds: 1,
}
);
Editor is loading...