using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
namespace NahidaImpact.Kcp
{
///
/// Multiplex many channels or conversations over the same transport.
///
/// The state of the channel.
public sealed class KcpMultiplexConnection : IKcpTransport, IKcpConversation, IKcpMultiplexConnection
{
private readonly IKcpTransport _transport;
private readonly ConcurrentDictionary _conversations = new();
private bool _transportClosed;
private bool _disposed;
private readonly Action _disposeAction;
///
/// Construct a multiplexed connection over a transport.
///
/// The underlying transport.
public KcpMultiplexConnection(IKcpTransport transport)
{
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
_disposeAction = null;
}
///
/// Construct a multiplexed connection over a transport.
///
/// The underlying transport.
/// The action to invoke when state object is removed.
public KcpMultiplexConnection(IKcpTransport transport, Action disposeAction)
{
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
_disposeAction = disposeAction;
}
private void CheckDispose()
{
if (_disposed)
{
ThrowObjectDisposedException();
}
}
private static void ThrowObjectDisposedException()
{
throw new ObjectDisposedException(nameof(KcpMultiplexConnection));
}
///
/// Process a newly received packet from the transport.
///
/// The content of the packet with conversation ID.
/// A token to cancel this operation.
/// A that completes when the packet is handled by the corresponding channel or conversation.
public ValueTask InputPakcetAsync(UdpReceiveResult packet, CancellationToken cancellationToken = default)
{
ReadOnlySpan span = packet.Buffer.AsSpan();
if (span.Length < KcpGlobalVars.CONVID_LENGTH)
{
return default;
}
if (_transportClosed || _disposed)
{
return default;
}
var id = BinaryPrimitives.ReadInt64BigEndian(span);
if (_conversations.TryGetValue(id, out (IKcpConversation Conversation, T? State) value))
{
return value.Conversation.InputPakcetAsync(packet, cancellationToken);
}
return default;
}
///
/// Determine whether the multiplex connection contains a conversation with the specified id.
///
/// The conversation ID.
/// True if the multiplex connection contains the specified conversation. Otherwise false.
public bool Contains(long id)
{
CheckDispose();
return _conversations.ContainsKey(id);
}
///
/// Create a raw channel with the specified conversation ID.
///
/// The conversation ID.
/// The remote Endpoint
/// The options of the .
/// The raw channel created.
/// The current instance is disposed.
/// Another channel or conversation with the same ID was already registered.
public KcpRawChannel CreateRawChannel(long id, IPEndPoint remoteEndpoint, KcpRawChannelOptions options = null)
{
KcpRawChannel channel = new KcpRawChannel(remoteEndpoint, this, id, options);
try
{
RegisterConversation(channel, id, default);
if (_transportClosed)
{
channel.SetTransportClosed();
}
return Interlocked.Exchange(ref channel, null)!;
}
finally
{
if (channel is not null)
{
channel.Dispose();
}
}
}
///
/// Create a raw channel with the specified conversation ID.
///
/// The conversation ID.
/// The remote Endpoint
/// The user state of this channel.
/// The options of the .
/// The raw channel created.
/// The current instance is disposed.
/// Another channel or conversation with the same ID was already registered.
public KcpRawChannel CreateRawChannel(long id, IPEndPoint remoteEndpoint, T state, KcpRawChannelOptions options = null)
{
var channel = new KcpRawChannel(remoteEndpoint, this, id, options);
try
{
RegisterConversation(channel, id, state);
if (_transportClosed)
{
channel.SetTransportClosed();
}
return Interlocked.Exchange(ref channel, null)!;
}
finally
{
if (channel is not null)
{
channel.Dispose();
}
}
}
///
/// Create a conversation with the specified conversation ID.
///
/// The conversation ID.
/// The remote Endpoint
/// The options of the .
/// The KCP conversation created.
/// The current instance is disposed.
/// Another channel or conversation with the same ID was already registered.
public KcpConversation CreateConversation(long id, IPEndPoint remoteEndpoint, KcpConversationOptions options = null)
{
var conversation = new KcpConversation(remoteEndpoint, this, id, options);
try
{
RegisterConversation(conversation, id, default);
if (_transportClosed)
{
conversation.SetTransportClosed();
}
return Interlocked.Exchange(ref conversation, null)!;
}
finally
{
if (conversation is not null)
{
conversation.Dispose();
}
}
}
///
/// Create a conversation with the specified conversation ID.
///
/// The conversation ID.
/// The remote Endpoint
/// The user state of this conversation.
/// The options of the .
/// The KCP conversation created.
/// The current instance is disposed.
/// Another channel or conversation with the same ID was already registered.
public KcpConversation CreateConversation(long id, IPEndPoint remoteEndpoint, T state, KcpConversationOptions options = null)
{
var conversation = new KcpConversation(remoteEndpoint, this, id, options);
try
{
RegisterConversation(conversation, id, state);
if (_transportClosed)
{
conversation.SetTransportClosed();
}
return Interlocked.Exchange(ref conversation, null)!;
}
finally
{
if (conversation is not null)
{
conversation.Dispose();
}
}
}
///
/// Register a conversation or channel with the specified conversation ID and user state.
///
/// The conversation or channel to register.
/// The conversation ID.
/// is not provided.
/// The current instance is disposed.
/// Another channel or conversation with the same ID was already registered.
public void RegisterConversation(IKcpConversation conversation, long id)
=> RegisterConversation(conversation, id, default);
///
/// Register a conversation or channel with the specified conversation ID and user state.
///
/// The conversation or channel to register.
/// The conversation ID.
/// The user state
/// is not provided.
/// The current instance is disposed.
/// Another channel or conversation with the same ID was already registered.
public void RegisterConversation(IKcpConversation conversation, long id, T? state)
{
if (conversation is null)
{
throw new ArgumentNullException(nameof(conversation));
}
CheckDispose();
(IKcpConversation addedConversation, T? _) = _conversations.GetOrAdd(id, (conversation, state));
if (!ReferenceEquals(addedConversation, conversation))
{
throw new InvalidOperationException("Duplicated conversation.");
}
if (_disposed)
{
if (_conversations.TryRemove(id, out (IKcpConversation Conversation, T? State) value) && _disposeAction is not null)
{
_disposeAction.Invoke(value.State);
}
ThrowObjectDisposedException();
}
}
///
/// Unregister a conversation or channel with the specified conversation ID.
///
/// The conversation ID.
/// The conversation unregistered. Returns null when the conversation with the specified ID is not found.
public IKcpConversation UnregisterConversation(long id)
{
return UnregisterConversation(id, out _);
}
///
/// Unregister a conversation or channel with the specified conversation ID.
///
/// The conversation ID.
/// The user state.
/// The conversation unregistered. Returns null when the conversation with the specified ID is not found.
public IKcpConversation UnregisterConversation(long id, out T? state)
{
if (!_transportClosed && !_disposed && _conversations.TryRemove(id, out (IKcpConversation Conversation, T? State) value))
{
value.Conversation.SetTransportClosed();
state = value.State;
if (_disposeAction is not null)
{
_disposeAction.Invoke(state);
}
return value.Conversation;
}
state = default;
return default;
}
///
public ValueTask SendPacketAsync(Memory packet, IPEndPoint remoteEndpoint, CancellationToken cancellationToken = default)
{
if (_transportClosed || _disposed)
{
return default;
}
return _transport.SendPacketAsync(packet, remoteEndpoint, cancellationToken);
}
///
public void SetTransportClosed()
{
_transportClosed = true;
foreach ((IKcpConversation conversation, T? _) in _conversations.Values)
{
conversation.SetTransportClosed();
}
}
///
public void Dispose()
{
if (_disposed)
{
return;
}
_transportClosed = true;
_disposed = true;
while (!_conversations.IsEmpty)
{
foreach (var id in _conversations.Keys)
{
if (_conversations.TryRemove(id, out (IKcpConversation Conversation, T? State) value))
{
value.Conversation.Dispose();
if (_disposeAction is not null)
{
_disposeAction.Invoke(value.State);
}
}
}
}
}
public void SetCallbacks(int handshakeSize, Func handshakeHandler)
{
throw new NotSupportedException();
}
}
}