using GameServer.Extensions; using System.Buffers; using KcpSharp; using GameServer.Network.Messages; namespace GameServer.Network.Kcp; internal class KcpConnection : IConnection { private readonly byte[] _recvBuffer; private readonly KcpConversation _conv; private readonly ManualResetEvent _sendEvent; private uint _upStreamSeqNo; private uint _downStreamSeqNo; public KcpConnection(KcpConversation conv) { _conv = conv; _recvBuffer = GC.AllocateUninitializedArray(8192); _sendEvent = new ManualResetEvent(true); } public bool Active => !_conv.TransportClosed; public async ValueTask ReceiveMessageAsync() { KcpConversationReceiveResult result = await _conv.ReceiveAsync(_recvBuffer.AsMemory(), CancellationToken.None); if (result.TransportClosed) return null; ReadOnlyMemory buffer = _recvBuffer.AsMemory(0, result.BytesReceived); BaseMessage message = MessageManager.DecodeMessage(buffer.Slice(BaseMessage.LengthFieldSize, buffer.Span.ReadInt24LittleEndian())); if (message.SeqNo < _downStreamSeqNo) return null; _downStreamSeqNo = message.SeqNo; return message; } public Task OnServerMessageAvailable(BaseMessage message) { message.SeqNo = NextUpStreamSeqNo(); return SendAsyncImpl(message); } private async Task SendAsyncImpl(BaseMessage message) { int networkSize = message.NetworkSize; using IMemoryOwner memoryOwner = MemoryPool.Shared.Rent(networkSize); Memory memory = memoryOwner.Memory; memory.Span.WriteInt24LittleEndian(networkSize - BaseMessage.LengthFieldSize); MessageManager.EncodeMessage(memory[BaseMessage.LengthFieldSize..], message); if (_conv == null) throw new InvalidOperationException("Trying to send message when conv is null"); if (!_sendEvent.WaitOne(0)) { await Task.Yield(); _ = _sendEvent.WaitOne(); } _sendEvent.Reset(); await _conv.SendAsync(memoryOwner.Memory[..networkSize]); _sendEvent.Set(); } private uint NextUpStreamSeqNo() { return Interlocked.Increment(ref _upStreamSeqNo); } }