59 lines
1.7 KiB
C#
59 lines
1.7 KiB
C#
|
using NetMQ;
|
|||
|
using NetMQ.Sockets;
|
|||
|
using RPG.Services.Core.Network.Command;
|
|||
|
using RPG.Services.Core.Options;
|
|||
|
|
|||
|
namespace RPG.Services.Core.Network;
|
|||
|
internal class ServiceEndPoint
|
|||
|
{
|
|||
|
private readonly NetMQSocket _socket;
|
|||
|
|
|||
|
private CancellationTokenSource? _receiveCancellation;
|
|||
|
private Task? _receiveTask;
|
|||
|
|
|||
|
public delegate Task CommandEventHandler(ServiceCommand command);
|
|||
|
public event CommandEventHandler? OnCommand;
|
|||
|
|
|||
|
public ServiceEndPoint(ServiceNodeOptions.Entry optionsEntry)
|
|||
|
{
|
|||
|
_socket = new PullSocket($"@tcp://{optionsEntry.Host}:{optionsEntry.Port}");
|
|||
|
_socket.Options.ReceiveHighWatermark = 10000;
|
|||
|
}
|
|||
|
|
|||
|
public void Start()
|
|||
|
{
|
|||
|
_receiveCancellation = new();
|
|||
|
_receiveTask = Task.Run(() => Receive(_receiveCancellation.Token));
|
|||
|
}
|
|||
|
|
|||
|
private async Task Receive(CancellationToken cancellationToken)
|
|||
|
{
|
|||
|
try
|
|||
|
{
|
|||
|
while (!cancellationToken.IsCancellationRequested)
|
|||
|
{
|
|||
|
NetMQMessage netMessage = _socket.ReceiveMultipartMessage();
|
|||
|
while (!netMessage.IsEmpty)
|
|||
|
{
|
|||
|
byte[] buffer = netMessage.Pop().Buffer;
|
|||
|
if (OnCommand != null)
|
|||
|
await OnCommand(ServiceCommandEncoder.DecodeCommand(buffer));
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
catch (Exception exception) when (exception is not OperationCanceledException)
|
|||
|
{
|
|||
|
throw;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
public async Task StopAsync()
|
|||
|
{
|
|||
|
if (_receiveCancellation != null && _receiveTask != null)
|
|||
|
{
|
|||
|
await _receiveCancellation.CancelAsync();
|
|||
|
await _receiveTask;
|
|||
|
}
|
|||
|
}
|
|||
|
}
|