Fixes 15661. Replace BlockingCollection with Channel in LimitedConcurrencyLibraryScheduler to prevent blocking in an asynchronous context.

This commit is contained in:
Noah Potash
2025-11-28 11:08:53 -05:00
parent ca33bcebf0
commit c5147341e3

View File

@@ -4,6 +4,7 @@ using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks; using System.Threading.Tasks;
using MediaBrowser.Controller.Configuration; using MediaBrowser.Controller.Configuration;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
@@ -29,7 +30,7 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr
/// </summary> /// </summary>
private readonly Lock _taskLock = new(); private readonly Lock _taskLock = new();
private readonly BlockingCollection<TaskQueueItem> _tasks = new(); private readonly Channel<TaskQueueItem> _tasks = Channel.CreateUnbounded<TaskQueueItem>();
private volatile int _workCounter; private volatile int _workCounter;
private Task? _cleanupTask; private Task? _cleanupTask;
@@ -77,7 +78,7 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr
lock (_taskLock) lock (_taskLock)
{ {
if (_tasks.Count > 0 || _workCounter > 0) if (_tasks.Reader.Count > 0 || _workCounter > 0)
{ {
_logger.LogDebug("Delay cleanup task, operations still running."); _logger.LogDebug("Delay cleanup task, operations still running.");
// tasks are still there so its still in use. Reschedule cleanup task. // tasks are still there so its still in use. Reschedule cleanup task.
@@ -144,9 +145,9 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr
_deadlockDetector.Value = stopToken.TaskStop; _deadlockDetector.Value = stopToken.TaskStop;
try try
{ {
foreach (var item in _tasks.GetConsumingEnumerable(stopToken.GlobalStop.Token)) while (!stopToken.GlobalStop.Token.IsCancellationRequested)
{ {
stopToken.GlobalStop.Token.ThrowIfCancellationRequested(); var item = await _tasks.Reader.ReadAsync(stopToken.GlobalStop.Token).ConfigureAwait(false);
try try
{ {
var newWorkerLimit = Interlocked.Increment(ref _workCounter) > 0; var newWorkerLimit = Interlocked.Increment(ref _workCounter) > 0;
@@ -264,7 +265,7 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr
for (var i = 0; i < workItems.Length; i++) for (var i = 0; i < workItems.Length; i++)
{ {
var item = workItems[i]!; var item = workItems[i]!;
_tasks.Add(item, CancellationToken.None); await _tasks.Writer.WriteAsync(item, CancellationToken.None).ConfigureAwait(false);
} }
Worker(); Worker();
@@ -283,13 +284,12 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr
} }
_disposed = true; _disposed = true;
_tasks.CompleteAdding(); _tasks.Writer.Complete();
foreach (var item in _taskRunners) foreach (var item in _taskRunners)
{ {
await item.Key.CancelAsync().ConfigureAwait(false); await item.Key.CancelAsync().ConfigureAwait(false);
} }
_tasks.Dispose();
if (_cleanupTask is not null) if (_cleanupTask is not null)
{ {
await _cleanupTask.ConfigureAwait(false); await _cleanupTask.ConfigureAwait(false);