Untitled
unknown
plain_text
3 years ago
5.4 kB
11
Indexable
using DittoService.Protos;
using Grpc.Core;
using System.Collections.Concurrent;
using System.Security.Cryptography.X509Certificates;
namespace DittoService.Services
{
public class DittoService : Ditto.DittoBase
{
private readonly ConcurrentDictionary<string, ClientContext> clients = new ConcurrentDictionary<string, ClientContext>();
public event EventHandler<ClientEventArgs> ClientAdded;
public event EventHandler<MessageReceivedEventArgs> MessageReceived;
public event EventHandler<ClientEventArgs> ClientDisconnected;
public override async Task Speak(
IAsyncStreamReader<Msg> requestStream,
IServerStreamWriter<Msg> responseStream,
ServerCallContext context)
{
try
{
string clientId = Guid.NewGuid().ToString();
var clientContext = new ClientContext(clientId, requestStream, responseStream, context);
clients.TryAdd(clientId, clientContext);
ClientAdded?.Invoke(this, new ClientEventArgs(clientId));
await clientContext.HandleMessagesAsync(MessageReceived);
clients.TryRemove(clientId, out _);
ClientDisconnected?.Invoke(this, new ClientEventArgs(clientId));
//while (await requestStream.MoveNext()
// && !context.CancellationToken.IsCancellationRequested)
//{
// // read incoming message
// var current = requestStream.Current;
// Console.WriteLine($"Message from Client: {current.Text}");
// // write outgoing message
// await SendResponseMessage(current, responseStream);
// if (constants.streamWriterList.Count() == 0)
// {
// constants.streamWriterList.Add(responseStream);
// }
// else
// {
// current.Text += "This message was sent by another client";
// await SendResponseMessage(current, constants.streamWriterList[0]);
// }
//}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
Console.WriteLine("Operation Cancelled");
}
Console.WriteLine("Operation Complete.");
}
private async Task SendResponseMessage(Msg current,
IServerStreamWriter<Msg> responseStream)
{
await responseStream.WriteAsync(new Msg
{
Text = $"Ditto from Server: {current.Text}"
});
}
public class ClientContext
{
public string ClientId { get; }
private readonly IAsyncStreamReader<Msg> requestStream;
private readonly IServerStreamWriter<Msg> responseStream;
private readonly ServerCallContext context;
public ClientContext(string clientId, IAsyncStreamReader<Msg> requestStream, IServerStreamWriter<Msg> responseStream, ServerCallContext context)
{
ClientId = clientId;
this.requestStream = requestStream;
this.responseStream = responseStream;
this.context = context;
}
public async Task HandleMessagesAsync(EventHandler<MessageReceivedEventArgs> messageReceivedHandler)
{
try
{
await foreach (var message in requestStream.ReadAllAsync())
{
messageReceivedHandler?.Invoke(this, new MessageReceivedEventArgs(ClientId, message));
Console.WriteLine("hello :" +message.Text );
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
// Ignore cancelled exception
}
}
public async Task SendAsync(Msg message)
{
try
{
await responseStream.WriteAsync(message);
}
catch (Exception ex)
{
Console.WriteLine($"Error sending message to client {ClientId}: {ex.Message}");
}
}
}
public class ClientEventArgs : EventArgs
{
public string ClientId { get; }
public ClientEventArgs(string clientId)
{
ClientId = clientId;
}
}
public class MessageReceivedEventArgs : EventArgs
{
public string ClientId { get; }
public Msg Message { get; }
public MessageReceivedEventArgs(string clientId, Msg message)
{
ClientId = clientId;
Message = message;
printing(Message);
}
public void printing(Msg msg)
{
Console.WriteLine("This is printed in the Server :" + msg.Text);
}
}Editor is loading...