Unregistering sessions properly, more logging, proper shutdown

This commit is contained in:
xeon 2024-01-20 01:39:12 +03:00
parent 1c10032dd9
commit 2a0543e79e
7 changed files with 56 additions and 26 deletions

View file

@ -28,6 +28,7 @@ message CmdBindContainerResult
{
uint32 retcode = 1;
uint64 session_id = 2;
RPGServiceType service_type = 3;
}
message CmdUnbindContainer

View file

@ -9,7 +9,6 @@ internal class ServiceEndPoint
private readonly NetMQSocket _socket;
private CancellationTokenSource? _receiveCancellation;
private Task? _receiveTask;
public delegate Task CommandEventHandler(ServiceCommand command);
public event CommandEventHandler? OnCommand;
@ -23,7 +22,7 @@ internal class ServiceEndPoint
public void Start()
{
_receiveCancellation = new();
_receiveTask = Task.Run(() => Receive(_receiveCancellation.Token));
_ = Task.Run(() => Receive(_receiveCancellation.Token));
}
private async Task Receive(CancellationToken cancellationToken)
@ -41,18 +40,17 @@ internal class ServiceEndPoint
}
}
}
catch (Exception exception) when (exception is not OperationCanceledException)
catch
{
throw;
if (!_socket.IsDisposed) _socket.Close();
}
}
public async Task StopAsync()
{
if (_receiveCancellation != null && _receiveTask != null)
if (_receiveCancellation != null)
{
await _receiveCancellation.CancelAsync();
await _receiveTask;
}
}
}

View file

@ -1,17 +1,20 @@
using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace RPG.Services.Core.Session;
public class SessionManager
{
private readonly ConcurrentDictionary<ulong, RPGSession> _sessions;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger _logger;
public SessionManager(IServiceProvider serviceProvider)
public SessionManager(IServiceProvider serviceProvider, ILogger<SessionManager> logger)
{
_sessions = [];
_serviceProvider = serviceProvider;
_logger = logger;
}
public TSession? Create<TSession>(ulong id) where TSession : RPGSession
@ -21,6 +24,8 @@ public class SessionManager
TSession session = ActivatorUtilities.CreateInstance<TSession>(_serviceProvider, id);
_sessions[id] = session;
_logger.LogInformation("New session (id={id}) registered", id);
return session;
}
@ -41,6 +46,7 @@ public class SessionManager
if (_sessions.TryRemove(session.SessionId, out _))
{
session.Dispose();
_logger.LogInformation("Session with id {id} was unregistered", session.SessionId);
}
}
}

View file

@ -26,7 +26,8 @@ internal class GameserverCommandHandler : ServiceCommandHandler
Send(ServiceCommandType.BindContainerResult, new CmdBindContainerResult
{
Retcode = 1,
SessionId = cmdBindContainer.SessionId
SessionId = cmdBindContainer.SessionId,
ServiceType = RPGServiceType.Gameserver
}, command.SenderType);
return Task.CompletedTask;
}
@ -35,7 +36,8 @@ internal class GameserverCommandHandler : ServiceCommandHandler
Send(ServiceCommandType.BindContainerResult, new CmdBindContainerResult
{
Retcode = 0,
SessionId = cmdBindContainer.SessionId
SessionId = cmdBindContainer.SessionId,
ServiceType = RPGServiceType.Gameserver
}, command.SenderType);
return Task.CompletedTask;

View file

@ -22,6 +22,9 @@ internal class GateserverCommandHandler : ServiceCommandHandler
CmdBindContainerResult result = CmdBindContainerResult.Parser.ParseFrom(command.Body.Span);
if (_sessionManager.TryGet(result.SessionId, out NetworkSession? session))
{
session.ServiceBound(result.ServiceType);
if (result.ServiceType == RPGServiceType.Gameserver)
{
PlayerGetTokenScRsp rsp;
if (result.Retcode != 0)
@ -41,6 +44,7 @@ internal class GateserverCommandHandler : ServiceCommandHandler
await session.SendAsync((ushort)CmdType.CmdPlayerGetTokenScRsp, rsp);
}
}
}
[ServiceCommand(ServiceCommandType.ForwardGameMessage)]
public async Task OnForwardGameMessage(ServiceCommand command)

View file

@ -59,6 +59,8 @@ internal class TcpGateway
while (!cancellationToken.IsCancellationRequested)
{
Socket clientSocket = await _socket!.AcceptAsync(cancellationToken);
_logger.LogInformation("New TCP connection from {remoteEndPoint}", clientSocket.RemoteEndPoint);
_ = RunSessionAsync(clientSocket);
}
}
@ -66,22 +68,27 @@ internal class TcpGateway
{
throw;
}
catch { /* Operation canceled */ }
}
private async Task RunSessionAsync(Socket socket)
{
try
{
NetworkSession? session = _sessionManager.Create<NetworkSession>(Interlocked.Increment(ref _sessionIdCounter));
if (session == null) return;
try
{
session.Socket = socket;
await session.RunAsync();
}
catch (Exception exception) when (exception is not OperationCanceledException)
{
_logger.LogError("Unhandled exception occurred: {exception}", exception);
}
catch { /* Operation canceled */ }
finally
{
_sessionManager.Remove(session);
}
}
}

View file

@ -12,12 +12,14 @@ internal class NetworkSession : RPGSession
private const int ReceiveBufferSize = 16384;
private readonly byte[] _recvBuffer;
private readonly List<RPGServiceType> _boundServices;
public Socket? Socket { private get; set; }
public PlayerGetTokenCsReq? GetTokenCsReq { get; private set; }
public NetworkSession(ulong sessionId, ServiceBox serviceBox) : base(sessionId, serviceBox)
{
_boundServices = [];
_recvBuffer = GC.AllocateUninitializedArray<byte>(ReceiveBufferSize);
}
@ -48,6 +50,11 @@ internal class NetworkSession : RPGSession
}
}
public void ServiceBound(RPGServiceType serviceType)
{
_boundServices.Add(serviceType);
}
public async Task SendAsync<TBody>(ushort cmdType, TBody body) where TBody : IMessage<TBody>
{
await SendAsync(new(cmdType, ReadOnlyMemory<byte>.Empty, body.ToByteArray()));
@ -105,5 +112,10 @@ internal class NetworkSession : RPGSession
public override void Dispose()
{
Socket?.Close();
foreach (RPGServiceType serviceType in _boundServices)
{
UnbindService(serviceType);
}
}
}