using System; using System.Buffers.Binary; using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; namespace KcpSharp { /// /// Multiplex many channels or conversations over the same transport. /// /// The state of the channel. public sealed class KcpMultiplexConnection : IKcpTransport, IKcpConversation, IKcpMultiplexConnection { private readonly IKcpTransport _transport; private readonly ConcurrentDictionary _conversations = new(); private bool _transportClosed; private bool _disposed; private readonly Action? _disposeAction; /// /// Construct a multiplexed connection over a transport. /// /// The underlying transport. public KcpMultiplexConnection(IKcpTransport transport) { _transport = transport ?? throw new ArgumentNullException(nameof(transport)); _disposeAction = null; } /// /// Construct a multiplexed connection over a transport. /// /// The underlying transport. /// The action to invoke when state object is removed. public KcpMultiplexConnection(IKcpTransport transport, Action? disposeAction) { _transport = transport ?? throw new ArgumentNullException(nameof(transport)); _disposeAction = disposeAction; } private void CheckDispose() { if (_disposed) { ThrowObjectDisposedException(); } } private static void ThrowObjectDisposedException() { throw new ObjectDisposedException(nameof(KcpMultiplexConnection)); } /// /// Process a newly received packet from the transport. /// /// The content of the packet with conversation ID. /// A token to cancel this operation. /// A that completes when the packet is handled by the corresponding channel or conversation. public ValueTask InputPakcetAsync(ReadOnlyMemory packet, CancellationToken cancellationToken = default) { ReadOnlySpan span = packet.Span; if (span.Length < 4) { return default; } if (_transportClosed || _disposed) { return default; } int id = (int)BinaryPrimitives.ReadUInt32LittleEndian(span); if (_conversations.TryGetValue(id, out (IKcpConversation Conversation, T? State) value)) { return value.Conversation.InputPakcetAsync(packet, cancellationToken); } return default; } /// /// Determine whether the multiplex connection contains a conversation with the specified id. /// /// The conversation ID. /// True if the multiplex connection contains the specified conversation. Otherwise false. public bool Contains(int id) { CheckDispose(); return _conversations.ContainsKey(id); } /// /// Create a raw channel with the specified conversation ID. /// /// The conversation ID. /// The options of the . /// The raw channel created. /// The current instance is disposed. /// Another channel or conversation with the same ID was already registered. public KcpRawChannel CreateRawChannel(int id, IPEndPoint remoteEndPoint, KcpRawChannelOptions? options = null) { KcpRawChannel? channel = new KcpRawChannel(remoteEndPoint, this, id, options); try { RegisterConversation(channel, id, default); if (_transportClosed) { channel.SetTransportClosed(); } return Interlocked.Exchange(ref channel, null)!; } finally { if (channel is not null) { channel.Dispose(); } } } /// /// Create a raw channel with the specified conversation ID. /// /// The conversation ID. /// The user state of this channel. /// The options of the . /// The raw channel created. /// The current instance is disposed. /// Another channel or conversation with the same ID was already registered. public KcpRawChannel CreateRawChannel(int id, IPEndPoint remoteEndPoint, T state, KcpRawChannelOptions? options = null) { var channel = new KcpRawChannel(remoteEndPoint, this, id, options); try { RegisterConversation(channel, id, state); if (_transportClosed) { channel.SetTransportClosed(); } return Interlocked.Exchange(ref channel, null)!; } finally { if (channel is not null) { channel.Dispose(); } } } /// /// Create a conversation with the specified conversation ID. /// /// The conversation ID. /// The options of the . /// The KCP conversation created. /// The current instance is disposed. /// Another channel or conversation with the same ID was already registered. public KcpConversation CreateConversation(int id, IPEndPoint remoteEndPoint, KcpConversationOptions? options = null) { var conversation = new KcpConversation(remoteEndPoint, this, id, options); try { RegisterConversation(conversation, id, default); if (_transportClosed) { conversation.SetTransportClosed(); } return Interlocked.Exchange(ref conversation, null)!; } finally { if (conversation is not null) { conversation.Dispose(); } } } /// /// Create a conversation with the specified conversation ID. /// /// The conversation ID. /// The user state of this conversation. /// The options of the . /// The KCP conversation created. /// The current instance is disposed. /// Another channel or conversation with the same ID was already registered. public KcpConversation CreateConversation(int id, IPEndPoint remoteEndPoint, T state, KcpConversationOptions? options = null) { var conversation = new KcpConversation(remoteEndPoint, this, id, options); try { RegisterConversation(conversation, id, state); if (_transportClosed) { conversation.SetTransportClosed(); } return Interlocked.Exchange(ref conversation, null)!; } finally { if (conversation is not null) { conversation.Dispose(); } } } /// /// Register a conversation or channel with the specified conversation ID and user state. /// /// The conversation or channel to register. /// The conversation ID. /// is not provided. /// The current instance is disposed. /// Another channel or conversation with the same ID was already registered. public void RegisterConversation(IKcpConversation conversation, int id) => RegisterConversation(conversation, id, default); /// /// Register a conversation or channel with the specified conversation ID and user state. /// /// The conversation or channel to register. /// The conversation ID. /// The user state /// is not provided. /// The current instance is disposed. /// Another channel or conversation with the same ID was already registered. public void RegisterConversation(IKcpConversation conversation, int id, T? state) { if (conversation is null) { throw new ArgumentNullException(nameof(conversation)); } CheckDispose(); (IKcpConversation addedConversation, T? _) = _conversations.GetOrAdd(id, (conversation, state)); if (!ReferenceEquals(addedConversation, conversation)) { throw new InvalidOperationException("Duplicated conversation."); } if (_disposed) { if (_conversations.TryRemove(id, out (IKcpConversation Conversation, T? State) value) && _disposeAction is not null) { _disposeAction.Invoke(value.State); } ThrowObjectDisposedException(); } } /// /// Unregister a conversation or channel with the specified conversation ID. /// /// The conversation ID. /// The conversation unregistered. Returns null when the conversation with the specified ID is not found. public IKcpConversation? UnregisterConversation(int id) { return UnregisterConversation(id, out _); } /// /// Unregister a conversation or channel with the specified conversation ID. /// /// The conversation ID. /// The user state. /// The conversation unregistered. Returns null when the conversation with the specified ID is not found. public IKcpConversation? UnregisterConversation(int id, out T? state) { if (!_transportClosed && !_disposed && _conversations.TryRemove(id, out (IKcpConversation Conversation, T? State) value)) { value.Conversation.SetTransportClosed(); state = value.State; if (_disposeAction is not null) { _disposeAction.Invoke(state); } return value.Conversation; } state = default; return default; } /// public ValueTask SendPacketAsync(Memory packet, IPEndPoint endpoint, CancellationToken cancellationToken = default) { if (_transportClosed || _disposed) { return default; } return _transport.SendPacketAsync(packet, endpoint, cancellationToken); } /// public void SetTransportClosed() { _transportClosed = true; foreach ((IKcpConversation conversation, T? _) in _conversations.Values) { conversation.SetTransportClosed(); } } /// public void Dispose() { if (_disposed) { return; } _transportClosed = true; _disposed = true; while (!_conversations.IsEmpty) { foreach (int id in _conversations.Keys) { if (_conversations.TryRemove(id, out (IKcpConversation Conversation, T? State) value)) { value.Conversation.Dispose(); if (_disposeAction is not null) { _disposeAction.Invoke(value.State); } } } } } public void SetHandshakeHandler(int size, Func handshakeHandler) { throw new NotImplementedException("SetHandshakeHandler not designed for this type of transport."); } } }