Есть ли что-нибудь вроде асинхронного BlockingCollection?



Я хочу await в результате BlockingCollection<T>.Take() асинхронно, поэтому я не блокирую поток. Ищем что-нибудь вроде этого:



var item = await blockingCollection.TakeAsync();


Я знаю, что мог бы сделать это:



var item = await Task.Run(() => blockingCollection.Take());


но это как-то убивает всю идею, потому что другой поток (о ThreadPool) блокируется вместо этого.



есть ли альтернатива?

591   2  

2 ответов:

есть три альтернативы, которые я знаю.

первое-это BufferBlock<T> с поток данных TPL. Если у вас есть только один потребитель, вы можете использовать OutputAvailableAsync или ReceiveAsync, или просто связать его в ActionBlock<T>. Для получения дополнительной информации смотрите мой блог.

два других типа, которые я создал, доступны в моем AsyncEx библиотека.

AsyncCollection<T> - это async рядом-эквивалент BlockingCollection<T>, способных обертывания параллельной коллекции производителя / потребителя, такой как ConcurrentQueue<T> или ConcurrentBag<T>. Вы можете использовать TakeAsync асинхронно потреблять элементы из коллекции. Для получения дополнительной информации смотрите мой блог.

AsyncProducerConsumerQueue<T> является более портативным async-совместимая очередь производителя / потребителя. Вы можете использовать DequeueAsync асинхронно потреблять элементы из очереди. Для получения дополнительной информации смотрите мой блог.

все три из этих альтернатив позвольте синхронных и асинхронных ставит и принимает.

...или вы можете сделать это:

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class AsyncQueue<T>
{
    private readonly SemaphoreSlim _sem;
    private readonly ConcurrentQueue<T> _que;

    public AsyncQueue()
    {
        _sem = new SemaphoreSlim(0);
        _que = new ConcurrentQueue<T>();
    }

    public void Enqueue(T item)
    {
        _que.Enqueue(item);
        _sem.Release();
    }

    public void EnqueueRange(IEnumerable<T> source)
    {
        var n = 0;
        foreach (var item in source)
        {
            _que.Enqueue(item);
            n++;
        }
        _sem.Release(n);
    }

    public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
    {
        for (; ; )
        {
            await _sem.WaitAsync(cancellationToken);

            T item;
            if (_que.TryDequeue(out item))
            {
                return item;
            }
        }
    }
}

простая, полностью функциональная асинхронная очередь FIFO.

Примечание: SemaphoreSlim.WaitAsync был добавлен в .NET 4.5 до этого, это было не так просто.

Comments

    Ничего не найдено.