KafkaSR/RPG.Services.Core/Network/ServiceEndPoint.cs
2024-01-30 02:49:21 +03:00

56 lines
1.5 KiB
C#

using NetMQ;
using NetMQ.Sockets;
using RPG.Services.Core.Network.Command;
using RPG.Services.Core.Options;
namespace RPG.Services.Core.Network;
internal class ServiceEndPoint
{
private readonly NetMQSocket _socket;
private CancellationTokenSource? _receiveCancellation;
public delegate Task CommandEventHandler(ServiceCommand command);
public event CommandEventHandler? OnCommand;
public ServiceEndPoint(ServiceNodeOptions.Entry optionsEntry)
{
_socket = new PullSocket($"@tcp://{optionsEntry.Host}:{optionsEntry.Port}");
_socket.Options.ReceiveHighWatermark = 10000;
}
public void Start()
{
_receiveCancellation = new();
_ = Task.Run(() => Receive(_receiveCancellation.Token));
}
private async Task Receive(CancellationToken cancellationToken)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
NetMQMessage netMessage = _socket.ReceiveMultipartMessage();
while (!netMessage.IsEmpty)
{
byte[] buffer = netMessage.Pop().Buffer;
if (OnCommand != null)
await OnCommand(ServiceCommandEncoder.DecodeCommand(buffer));
}
}
}
catch
{
if (!_socket.IsDisposed) _socket.Close();
}
}
public async Task StopAsync()
{
if (_receiveCancellation != null)
{
await _receiveCancellation.CancelAsync();
}
}
}