Untitled
unknown
plain_text
2 years ago
5.4 kB
0
Indexable
using DittoService.Protos; using Grpc.Core; using System.Collections.Concurrent; using System.Security.Cryptography.X509Certificates; namespace DittoService.Services { public class DittoService : Ditto.DittoBase { private readonly ConcurrentDictionary<string, ClientContext> clients = new ConcurrentDictionary<string, ClientContext>(); public event EventHandler<ClientEventArgs> ClientAdded; public event EventHandler<MessageReceivedEventArgs> MessageReceived; public event EventHandler<ClientEventArgs> ClientDisconnected; public override async Task Speak( IAsyncStreamReader<Msg> requestStream, IServerStreamWriter<Msg> responseStream, ServerCallContext context) { try { string clientId = Guid.NewGuid().ToString(); var clientContext = new ClientContext(clientId, requestStream, responseStream, context); clients.TryAdd(clientId, clientContext); ClientAdded?.Invoke(this, new ClientEventArgs(clientId)); await clientContext.HandleMessagesAsync(MessageReceived); clients.TryRemove(clientId, out _); ClientDisconnected?.Invoke(this, new ClientEventArgs(clientId)); //while (await requestStream.MoveNext() // && !context.CancellationToken.IsCancellationRequested) //{ // // read incoming message // var current = requestStream.Current; // Console.WriteLine($"Message from Client: {current.Text}"); // // write outgoing message // await SendResponseMessage(current, responseStream); // if (constants.streamWriterList.Count() == 0) // { // constants.streamWriterList.Add(responseStream); // } // else // { // current.Text += "This message was sent by another client"; // await SendResponseMessage(current, constants.streamWriterList[0]); // } //} } catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled) { Console.WriteLine("Operation Cancelled"); } Console.WriteLine("Operation Complete."); } private async Task SendResponseMessage(Msg current, IServerStreamWriter<Msg> responseStream) { await responseStream.WriteAsync(new Msg { Text = $"Ditto from Server: {current.Text}" }); } public class ClientContext { public string ClientId { get; } private readonly IAsyncStreamReader<Msg> requestStream; private readonly IServerStreamWriter<Msg> responseStream; private readonly ServerCallContext context; public ClientContext(string clientId, IAsyncStreamReader<Msg> requestStream, IServerStreamWriter<Msg> responseStream, ServerCallContext context) { ClientId = clientId; this.requestStream = requestStream; this.responseStream = responseStream; this.context = context; } public async Task HandleMessagesAsync(EventHandler<MessageReceivedEventArgs> messageReceivedHandler) { try { await foreach (var message in requestStream.ReadAllAsync()) { messageReceivedHandler?.Invoke(this, new MessageReceivedEventArgs(ClientId, message)); Console.WriteLine("hello :" +message.Text ); } } catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled) { // Ignore cancelled exception } } public async Task SendAsync(Msg message) { try { await responseStream.WriteAsync(message); } catch (Exception ex) { Console.WriteLine($"Error sending message to client {ClientId}: {ex.Message}"); } } } public class ClientEventArgs : EventArgs { public string ClientId { get; } public ClientEventArgs(string clientId) { ClientId = clientId; } } public class MessageReceivedEventArgs : EventArgs { public string ClientId { get; } public Msg Message { get; } public MessageReceivedEventArgs(string clientId, Msg message) { ClientId = clientId; Message = message; printing(Message); } public void printing(Msg msg) { Console.WriteLine("This is printed in the Server :" + msg.Text); } }
Editor is loading...