Есть ли что-нибудь вроде асинхронного BlockingCollection?
Я хочу await в результате BlockingCollection<T>.Take() асинхронно, поэтому я не блокирую поток. Ищем что-нибудь вроде этого:
var item = await blockingCollection.TakeAsync();
Я знаю, что мог бы сделать это:
var item = await Task.Run(() => blockingCollection.Take());
но это как-то убивает всю идею, потому что другой поток (о ThreadPool) блокируется вместо этого.
есть ли альтернатива?
2 ответов:
есть три альтернативы, которые я знаю.
первое-это
BufferBlock<T>с поток данных TPL. Если у вас есть только один потребитель, вы можете использоватьOutputAvailableAsyncилиReceiveAsync, или просто связать его вActionBlock<T>. Для получения дополнительной информации смотрите мой блог.два других типа, которые я создал, доступны в моем AsyncEx библиотека.
AsyncCollection<T>- этоasyncрядом-эквивалентBlockingCollection<T>, способных обертывания параллельной коллекции производителя / потребителя, такой какConcurrentQueue<T>илиConcurrentBag<T>. Вы можете использоватьTakeAsyncасинхронно потреблять элементы из коллекции. Для получения дополнительной информации смотрите мой блог.
AsyncProducerConsumerQueue<T>является более портативнымasync-совместимая очередь производителя / потребителя. Вы можете использоватьDequeueAsyncасинхронно потреблять элементы из очереди. Для получения дополнительной информации смотрите мой блог.все три из этих альтернатив позвольте синхронных и асинхронных ставит и принимает.
...или вы можете сделать это:
using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; public class AsyncQueue<T> { private readonly SemaphoreSlim _sem; private readonly ConcurrentQueue<T> _que; public AsyncQueue() { _sem = new SemaphoreSlim(0); _que = new ConcurrentQueue<T>(); } public void Enqueue(T item) { _que.Enqueue(item); _sem.Release(); } public void EnqueueRange(IEnumerable<T> source) { var n = 0; foreach (var item in source) { _que.Enqueue(item); n++; } _sem.Release(n); } public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken)) { for (; ; ) { await _sem.WaitAsync(cancellationToken); T item; if (_que.TryDequeue(out item)) { return item; } } } }простая, полностью функциональная асинхронная очередь FIFO.
Примечание:
SemaphoreSlim.WaitAsyncбыл добавлен в .NET 4.5 до этого, это было не так просто.
Comments