KafkaSR/RPG.Services.Gateserver/Session/NetworkSession.cs

135 lines
4.3 KiB
C#

using System.Net.Sockets;
using Google.Protobuf;
using RPG.Network.Proto;
using RPG.Services.Core.Network;
using RPG.Services.Core.Session;
using RPG.Services.Gateserver.Network;
namespace RPG.Services.Gateserver.Session;
internal class NetworkSession : RPGSession
{
private const int ReadTimeoutMs = 30000;
private const int ReceiveBufferSize = 16384;
private readonly byte[] _recvBuffer;
private readonly List<RPGServiceType> _boundServices;
public Socket? Socket { private get; set; }
public PlayerGetTokenCsReq? GetTokenCsReq { get; private set; }
public NetworkSession(ulong sessionId, ServiceBox serviceBox) : base(sessionId, serviceBox)
{
_boundServices = [];
_recvBuffer = GC.AllocateUninitializedArray<byte>(ReceiveBufferSize);
}
public async Task RunAsync()
{
if (Socket == null) throw new InvalidOperationException("RunAsync called but socket was not set!");
int recvBufferIdx = 0;
Memory<byte> recvBufferMem = _recvBuffer.AsMemory();
while (true)
{
int readAmount = await ReadWithTimeoutAsync(Socket, recvBufferMem[recvBufferIdx..], ReadTimeoutMs);
if (readAmount == 0) break;
recvBufferIdx += readAmount;
int consumed = ConsumeData(recvBufferMem, recvBufferIdx);
if (consumed > 0)
Buffer.BlockCopy(_recvBuffer, consumed, _recvBuffer, 0, recvBufferIdx -= consumed);
}
}
private int ConsumeData(ReadOnlyMemory<byte> recvBufferMem, int recvBufferIdx)
{
int consumedBytes = 0;
do
{
switch (NetPacket.TryDeserialize(recvBufferMem[consumedBytes..recvBufferIdx], out NetPacket? packet, out int bytesRead))
{
case NetPacket.DeserializationResult.Success:
HandlePacket(packet!);
consumedBytes += bytesRead;
break;
case NetPacket.DeserializationResult.BufferExceeded:
return consumedBytes;
case NetPacket.DeserializationResult.Corrupted:
throw new InvalidDataException("The network stream was corrupted");
}
}
while (recvBufferIdx - consumedBytes >= NetPacket.Overhead);
return consumedBytes;
}
public void ServiceBound(RPGServiceType serviceType)
{
_boundServices.Add(serviceType);
}
public async Task SendAsync<TBody>(ushort cmdType, TBody body) where TBody : IMessage<TBody> =>
await SendAsync(new(cmdType, ReadOnlyMemory<byte>.Empty, body.ToByteArray()));
public async Task SendAsync(NetPacket packet)
{
if (Socket == null) return;
byte[] buffer = GC.AllocateUninitializedArray<byte>(packet.Size);
packet.Serialize(buffer);
await Socket!.SendAsync(buffer);
}
private void HandlePacket(NetPacket packet)
{
switch ((CmdType)packet.CmdType)
{
case CmdType.CmdPlayerGetTokenCsReq:
HandlePlayerGetTokenCsReq(PlayerGetTokenCsReq.Parser.ParseFrom(packet.Body.Span));
break;
case CmdType.CmdPlayerKeepAliveNotify:
break;
default:
ForwardToGameserver(packet);
break;
}
}
private void HandlePlayerGetTokenCsReq(PlayerGetTokenCsReq req)
{
GetTokenCsReq = req;
PlayerUid = uint.Parse(req.AccountUid);
BindService(RPGServiceType.Gameserver);
}
private void ForwardToGameserver(NetPacket packet)
{
SendToService(RPGServiceType.Gameserver, ServiceCommandType.ForwardGameMessage, new CmdForwardGameMessage
{
SessionId = SessionId,
CmdType = packet.CmdType,
Payload = ByteString.CopyFrom(packet.Body.Span)
});
}
private static async ValueTask<int> ReadWithTimeoutAsync(Socket socket, Memory<byte> buffer, int timeoutMs)
{
CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(timeoutMs));
return await socket.ReceiveAsync(buffer, cts.Token);
}
public override void Dispose()
{
Socket?.Close();
foreach (RPGServiceType serviceType in _boundServices)
{
UnbindService(serviceType);
}
}
}