using System; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Sources; namespace KcpSharp { internal sealed class KcpConversationUpdateActivation : IValueTaskSource, IDisposable { private readonly Timer _timer; private ManualResetValueTaskSourceCore _mrvtsc; private bool _disposed; private bool _notificationPending; private bool _signaled; private bool _activeWait; private CancellationToken _cancellationToken; private CancellationTokenRegistration _cancellationRegistration; private readonly WaitList _waitList; ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => _mrvtsc.GetStatus(token); void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _mrvtsc.OnCompleted(continuation, state, token, flags); KcpConversationUpdateNotification IValueTaskSource.GetResult(short token) { _cancellationRegistration.Dispose(); try { return _mrvtsc.GetResult(token); } finally { _mrvtsc.Reset(); lock (this) { _signaled = false; _activeWait = false; _cancellationRegistration = default; } } } public KcpConversationUpdateActivation(int interval) { _timer = new Timer(state => { var reference = (WeakReference?)state!; if (reference.TryGetTarget(out KcpConversationUpdateActivation? target)) { target.Notify(); } }, new WeakReference(this), interval, interval); _mrvtsc = new ManualResetValueTaskSourceCore { RunContinuationsAsynchronously = true }; _waitList = new WaitList(this); } public void Notify() { if (_disposed) { return; } lock (this) { if (_disposed || _notificationPending) { return; } if (_activeWait && !_signaled) { _signaled = true; _cancellationToken = default; _mrvtsc.SetResult(default); } else { _notificationPending = true; } } } private void NotifyPacketReceived() { lock (this) { if (_disposed) { return; } if (_activeWait && !_signaled) { if (_waitList.Occupy(out KcpConversationUpdateNotification notification)) { _signaled = true; _cancellationToken = default; bool timerNotification = _notificationPending; _notificationPending = false; _mrvtsc.SetResult(notification.WithTimerNotification(timerNotification)); } } } } public ValueTask WaitAsync(CancellationToken cancellationToken) { short token; lock (this) { if (_disposed) { return default; } if (cancellationToken.IsCancellationRequested) { return new ValueTask(Task.FromCanceled(cancellationToken)); } if (_activeWait) { throw new InvalidOperationException(); } if (_waitList.Occupy(out KcpConversationUpdateNotification notification)) { bool timerNotification = _notificationPending; _notificationPending = false; return new ValueTask(notification.WithTimerNotification(timerNotification)); } if (_notificationPending) { _notificationPending = false; return default; } _activeWait = true; Debug.Assert(!_signaled); _cancellationToken = cancellationToken; token = _mrvtsc.Version; } _cancellationRegistration = cancellationToken.UnsafeRegister(state => ((KcpConversationUpdateActivation?)state)!.CancelWaiting(), this); return new ValueTask(this, token); } private void CancelWaiting() { lock (this) { if (_activeWait && !_signaled) { CancellationToken cancellationToken = _cancellationToken; _signaled = true; _cancellationToken = default; _mrvtsc.SetException(new OperationCanceledException(cancellationToken)); } } } public ValueTask InputPacketAsync(ReadOnlyMemory packet, CancellationToken cancellationToken) { if (_disposed) { return default; } return _waitList.InputPacketAsync(packet, cancellationToken); } public void Dispose() { lock (this) { if (_disposed) { return; } _disposed = true; if (_activeWait && !_signaled) { _signaled = true; _cancellationToken = default; _mrvtsc.SetResult(default); } } _timer.Dispose(); _waitList.Dispose(); } class WaitList : IValueTaskSource, IKcpConversationUpdateNotificationSource, IDisposable { private readonly KcpConversationUpdateActivation _parent; private LinkedList? _list; private ManualResetValueTaskSourceCore _mrvtsc; private bool _available; // activeWait private bool _occupied; private bool _signaled; private bool _disposed; private ReadOnlyMemory _packet; private CancellationToken _cancellationToken; private CancellationTokenRegistration _cancellationRegistration; public ReadOnlyMemory Packet { get { lock (this) { if (_available && _occupied && !_signaled) { return _packet; } } return default; } } ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => _mrvtsc.GetStatus(token); void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _mrvtsc.OnCompleted(continuation, state, token, flags); void IValueTaskSource.GetResult(short token) { _cancellationRegistration.Dispose(); try { _mrvtsc.GetResult(token); } finally { _mrvtsc.Reset(); lock (this) { _available = false; _occupied = false; _signaled = false; _cancellationRegistration = default; } } } public WaitList(KcpConversationUpdateActivation parent) { _parent = parent; _mrvtsc = new ManualResetValueTaskSourceCore { RunContinuationsAsynchronously = true }; } public ValueTask InputPacketAsync(ReadOnlyMemory packet, CancellationToken cancellationToken) { WaitItem? waitItem = null; short token = 0; lock (this) { if (_disposed) { return default; } if (cancellationToken.IsCancellationRequested) { return new ValueTask(Task.FromCanceled(cancellationToken)); } if (_available) { waitItem = new WaitItem(this, packet, cancellationToken); _list ??= new LinkedList(); _list.AddLast(waitItem.Node); } else { token = _mrvtsc.Version; _available = true; Debug.Assert(!_occupied); Debug.Assert(!_signaled); _packet = packet; _cancellationToken = cancellationToken; } } ValueTask task; if (waitItem is null) { _cancellationRegistration = cancellationToken.UnsafeRegister(state => ((WaitList?)state)!.CancelWaiting(), this); task = new ValueTask(this, token); } else { waitItem.RegisterCancellationToken(); task = new ValueTask(waitItem.Task); } _parent.NotifyPacketReceived(); return task; } private void CancelWaiting() { lock (this) { if (_available && !_occupied && !_signaled) { _signaled = true; CancellationToken cancellationToken = _cancellationToken; _packet = default; _cancellationToken = default; _mrvtsc.SetException(new OperationCanceledException(cancellationToken)); } } } public bool Occupy(out KcpConversationUpdateNotification notification) { lock (this) { if (_disposed) { notification = default; return false; } if (_available && !_occupied && !_signaled) { _occupied = true; notification = new KcpConversationUpdateNotification(this, true); return true; } if (_list is null) { notification = default; return false; } LinkedListNode? node = _list.First; if (node is not null) { _list.Remove(node); notification = new KcpConversationUpdateNotification(node.Value, true); return true; } } notification = default; return false; } public void Release() { lock (this) { if (_available && _occupied && !_signaled) { _signaled = true; _packet = default; _cancellationToken = default; _mrvtsc.SetResult(true); } } } internal bool TryRemove(WaitItem item) { lock (this) { LinkedList? list = _list; if (list is null) { return false; } LinkedListNode node = item.Node; if (node.Previous is null && node.Next is null) { return false; } list.Remove(node); return true; } } public void Dispose() { if (_disposed) { return; } lock (this) { _disposed = true; if (_available && !_occupied && !_signaled) { _signaled = true; _packet = default; _cancellationToken = default; _mrvtsc.SetResult(false); } LinkedList? list = _list; if (list is not null) { _list = null; LinkedListNode? node = list.First; LinkedListNode? next = node?.Next; while (node is not null) { node.Value.Release(); list.Remove(node); node = next; next = node?.Next; } } } } } class WaitItem : TaskCompletionSource, IKcpConversationUpdateNotificationSource { private readonly WaitList _parent; private ReadOnlyMemory _packet; private CancellationToken _cancellationToken; private CancellationTokenRegistration _cancellationRegistration; private bool _released; public LinkedListNode Node { get; } public ReadOnlyMemory Packet { get { lock (this) { if (!_released) { return _packet; } } return default; } } public WaitItem(WaitList parent, ReadOnlyMemory packet, CancellationToken cancellationToken) { _parent = parent; _packet = packet; _cancellationToken = cancellationToken; Node = new LinkedListNode(this); } public void RegisterCancellationToken() { _cancellationRegistration = _cancellationToken.UnsafeRegister(state => ((WaitItem?)state)!.CancelWaiting(), this); } private void CancelWaiting() { CancellationTokenRegistration cancellationRegistration; if (_parent.TryRemove(this)) { CancellationToken cancellationToken; lock (this) { _released = true; cancellationToken = _cancellationToken; cancellationRegistration = _cancellationRegistration; _packet = default; _cancellationToken = default; _cancellationRegistration = default; } TrySetCanceled(cancellationToken); } _cancellationRegistration.Dispose(); } public void Release() { CancellationTokenRegistration cancellationRegistration; lock (this) { _released = true; cancellationRegistration = _cancellationRegistration; _packet = default; _cancellationToken = default; _cancellationRegistration = default; } TrySetResult(); cancellationRegistration.Dispose(); } } } }