Untitled

mail@pastecode.io avatar
unknown
javascript
8 months ago
3.9 kB
1
Indexable
Never
gstreamer chains: https://pastecode.io/s/my03aheg


publisher:
import udp from "dgram";
import { WebPubSubServiceClient } from "@azure/web-pubsub";
import { EventHubProducerClient } from "@azure/event-hubs";
import { DefaultAzureCredential } from "@azure/identity";

// Event hubs 
const eventHubsResourceName = "isr-video";
const fullyQualifiedNamespace = `${eventHubsResourceName}.servicebus.windows.net`;
const eventHubName = "test-20";

// Azure Identity - passwordless authentication
const credential = new DefaultAzureCredential();
const server = udp.createSocket({ type: 'udp4' });


const serviceClient = new EventHubProducerClient(fullyQualifiedNamespace,
    eventHubName,
    credential)
// const groupClient = serviceClient.group(process.env.PUBSUB_GROUP_NAME || 'video');

server.on('message', async (message) => {
    // console.log(`Recevive msg`)
    // console.log(message.length)
    const batch = await serviceClient.createBatch();
    batch.tryAdd({ body: message });
    await serviceClient.sendBatch(batch);
});

server.on('error', (err) => {
    console.error(`UDP server error:\n${err.stack}`);
    server.close();
});

server.on('listening', () => {
    const address = server.address();
    console.log(`UDP server listening on ${address.address}:${address.port}`);
});

server.bind(1234);


consumer:
import { EventHubConsumerClient } from "@azure/event-hubs";
import { DefaultAzureCredential } from "@azure/identity";
import udp from "dgram";
import { connect } from "net";

import { PassThrough } from "stream";
// import { StreamUDP } from "stream-udp";

const udpClient = new udp.createSocket('udp4');
// udpClient.bind(7776, "localhost")
const credential = new DefaultAzureCredential();

// Event hubs
const eventHubsResourceName = "test-video";
const fullyQualifiedNamespace = `${eventHubsResourceName}.servicebus.windows.net`;
const eventHubName = "test-video";
const consumerGroup = "video";
const consumerClient = new EventHubConsumerClient(
  consumerGroup,
  fullyQualifiedNamespace,
  eventHubName,
  credential
);
const stream = new PassThrough();
//udpClient.connect(7777, 'localhost');
// stream.pipe(udpClient);
// connect('localhost'm)
// udpClient.send(Buffer.from(event.body), 7777, "localhost", (error) => {
//     // console.log(`Received message ${group} ${data}}`);
//     if (error) {
//       console.log(error);
//       udpClient.close();
//     } else {
//       //   console.log("Data sent !!!");
//     }
//   });

// console.timeStamp('event')
console.log('event')

const subscription = consumerClient.subscribe(
  {
    processEvents: async (events, context) => {
      console.log(events)
      if (events.length === 0) {
        console.log(
          `No events received within wait time. Waiting for next interval`
        );
        return;
      }
      // console.log(`event recivied: ${Date.now()}`)
      for (const event of events) {
        // console.log(context.partitionId)
        // console.log(
        //   `Received event: '${event.body.length}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'`
        // );
        // console.log(Buffer.from(event.body.toString(), 'base64'));
        udpClient.send(Buffer.from(event.body), 7777, "localhost", (error) => {
          console.log(`${event.body.length}`);
          if (error) {
            console.log(error);
            udpClient.close();
          } else {
          // console.log("Data sent !!!");
          }
        });

        // stream.push(event.body);
      }

      // console.log(`event sent: ${Date.now()}`)
      // Update the checkpoint.
    },

    processError: async (err, context) => {
      console.log(`Error : ${err}`);
    },
  },
  {
    startPosition: {
      offset: "@latest",
    },
    skipParsingBodyAsJson: true,
    maxWaitTimeInSeconds: 1,
  }
);