58 lines
1.7 KiB
C#
58 lines
1.7 KiB
C#
using NetMQ;
|
|
using NetMQ.Sockets;
|
|
using RPG.Services.Core.Network.Command;
|
|
using RPG.Services.Core.Options;
|
|
|
|
namespace RPG.Services.Core.Network;
|
|
internal class ServiceEndPoint
|
|
{
|
|
private readonly NetMQSocket _socket;
|
|
|
|
private CancellationTokenSource? _receiveCancellation;
|
|
private Task? _receiveTask;
|
|
|
|
public delegate Task CommandEventHandler(ServiceCommand command);
|
|
public event CommandEventHandler? OnCommand;
|
|
|
|
public ServiceEndPoint(ServiceNodeOptions.Entry optionsEntry)
|
|
{
|
|
_socket = new PullSocket($"@tcp://{optionsEntry.Host}:{optionsEntry.Port}");
|
|
_socket.Options.ReceiveHighWatermark = 10000;
|
|
}
|
|
|
|
public void Start()
|
|
{
|
|
_receiveCancellation = new();
|
|
_receiveTask = Task.Run(() => Receive(_receiveCancellation.Token));
|
|
}
|
|
|
|
private async Task Receive(CancellationToken cancellationToken)
|
|
{
|
|
try
|
|
{
|
|
while (!cancellationToken.IsCancellationRequested)
|
|
{
|
|
NetMQMessage netMessage = _socket.ReceiveMultipartMessage();
|
|
while (!netMessage.IsEmpty)
|
|
{
|
|
byte[] buffer = netMessage.Pop().Buffer;
|
|
if (OnCommand != null)
|
|
await OnCommand(ServiceCommandEncoder.DecodeCommand(buffer));
|
|
}
|
|
}
|
|
}
|
|
catch (Exception exception) when (exception is not OperationCanceledException)
|
|
{
|
|
throw;
|
|
}
|
|
}
|
|
|
|
public async Task StopAsync()
|
|
{
|
|
if (_receiveCancellation != null && _receiveTask != null)
|
|
{
|
|
await _receiveCancellation.CancelAsync();
|
|
await _receiveTask;
|
|
}
|
|
}
|
|
}
|