using NetMQ; using NetMQ.Sockets; using RPG.Services.Core.Network.Command; using RPG.Services.Core.Options; namespace RPG.Services.Core.Network; internal class ServiceEndPoint { private readonly NetMQSocket _socket; private CancellationTokenSource? _receiveCancellation; public delegate Task CommandEventHandler(ServiceCommand command); public event CommandEventHandler? OnCommand; public ServiceEndPoint(ServiceNodeOptions.Entry optionsEntry) { _socket = new PullSocket($"@tcp://{optionsEntry.Host}:{optionsEntry.Port}"); _socket.Options.ReceiveHighWatermark = 10000; } public void Start() { _receiveCancellation = new(); _ = Task.Run(() => Receive(_receiveCancellation.Token)); } private async Task Receive(CancellationToken cancellationToken) { try { while (!cancellationToken.IsCancellationRequested) { NetMQMessage netMessage = _socket.ReceiveMultipartMessage(); while (!netMessage.IsEmpty) { byte[] buffer = netMessage.Pop().Buffer; if (OnCommand != null) await OnCommand(ServiceCommandEncoder.DecodeCommand(buffer)); } } } catch { if (!_socket.IsDisposed) _socket.Close(); } } public async Task StopAsync() { if (_receiveCancellation != null) { await _receiveCancellation.CancelAsync(); } } }