Как я могу предотвратить Синхронное продолжение задачи?



у меня есть некоторые библиотеки (сокет сети) код, который обеспечивает Task-основанный API для ожидающих ответов на запросы, основанный на TaskCompletionSource<T>. Однако в TPL есть раздражение в том, что, похоже, невозможно предотвратить синхронные продолжения. Что бы я как чтобы быть в состоянии сделать это:




  • рассказать TaskCompletionSource<T> то есть не должно позволять абонентам присоединяться с TaskContinuationOptions.ExecuteSynchronously или

  • установить результат (SetResult/TrySetResult) таким образом, что указывает, что TaskContinuationOptions.ExecuteSynchronously следует игнорировать, используя пул вместо


в частности, проблема у меня заключается в том, что входящие данные обрабатываются специальным читателем, и если вызывающий абонент может подключиться с TaskContinuationOptions.ExecuteSynchronously они могут задержать читателя (который влияет не только на них). Ранее я работал над этим с помощью некоторых хакеров, которые определяют, является ли любой продолжения присутствуют, и если они есть, это толкает завершение на ThreadPool, однако это значительное влияние, если вызывающий абонент насыщает свою рабочую очередь, так как завершение не будет обработано своевременно. Если они используют Task.Wait() (или аналогичный), они тогда будут по существу тупик себя. Кроме того, именно поэтому читатель находится в выделенном потоке, а не с помощью рабочих.



Итак; прежде чем я попытаюсь пилить команду TPL: я пропустил вариант?



ключевые моменты:




  • я не хочу, чтобы внешние абоненты могли захватить мои нить

  • я не могу использовать ThreadPool как реализация, так как она должна работать, когда пул насыщен


пример ниже производит выход (заказ может варьироваться в зависимости от времени):



Continuation on: Main thread
Press [return]
Continuation on: Thread pool


проблема в том, что случайный абонент сумел получить продолжение на "основной поток". В реальном коде это было бы прерыванием основного читателя; плохие вещи!



код:



using System;
using System.Threading;
using System.Threading.Tasks;

static class Program
{
static void Identify()
{
var thread = Thread.CurrentThread;
string name = thread.IsThreadPoolThread
? "Thread pool" : thread.Name;
if (string.IsNullOrEmpty(name))
name = "#" + thread.ManagedThreadId;
Console.WriteLine("Continuation on: " + name);
}
static void Main()
{
Thread.CurrentThread.Name = "Main thread";
var source = new TaskCompletionSource<int>();
var task = source.Task;
task.ContinueWith(delegate {
Identify();
});
task.ContinueWith(delegate {
Identify();
}, TaskContinuationOptions.ExecuteSynchronously);
source.TrySetResult(123);
Console.WriteLine("Press [return]");
Console.ReadLine();
}
}
623   6  

6 ответов:

новое в .NET 4.6:

.NET 4.6 содержит новый TaskCreationOptions:RunContinuationsAsynchronously.


Так вы готовы использовать отражение для доступа к закрытым полям...

вы можете отметить задачу TCS с помощью TASK_STATE_THREAD_WAS_ABORTED флаг, который приведет к тому, что все продолжения не будут встроены.

const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;

var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);

Edit:

вместо использования отражения испускают, я предлагаю вам использовать выражения. Это гораздо более читабельно и имеет преимущество быть PCL-совместимым:

var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
    Expression.Lambda<Action<Task>>(
        Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
            Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
                Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();

без использования отражения:

если кто-то заинтересован, я придумал способ сделать это без размышлений, но он тоже немного "грязный" и, конечно же, несет в себе несущественный штраф perf:

try
{
    Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
    source.TrySetResult(123);
    Thread.ResetAbort();
}

я не думаю, что в TPL есть что-то, что обеспечивает явно API контроль над TaskCompletionSource.SetResult продолжений. Я решил оставить себе первоначальный ответ для управления этим поведением для async/await сценарии.

вот еще одно решение, которое налагает на асинхронные ContinueWith, если tcs.SetResult-срабатывает продолжение происходит в том же потоке SetResult было:

public static class TaskExt
{
    static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
        new ConcurrentDictionary<Task, Thread>();

    // SetResultAsync
    static public void SetResultAsync<TResult>(
        this TaskCompletionSource<TResult> @this,
        TResult result)
    {
        s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
        try
        {
            @this.SetResult(result);
        }
        finally
        {
            Thread thread;
            s_tcsTasks.TryRemove(@this.Task, out thread);
        }
    }

    // ContinueWithAsync, TODO: more overrides
    static public Task ContinueWithAsync<TResult>(
        this Task<TResult> @this,
        Action<Task<TResult>> action,
        TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
    {
        return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
        {
            Thread thread = null;
            s_tcsTasks.TryGetValue(t, out thread);
            if (Thread.CurrentThread == thread)
            {
                // same thread which called SetResultAsync, avoid potential deadlocks

                // using thread pool
                return Task.Run(() => action(t));

                // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
                // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
            }
            else
            {
                // continue on the same thread
                var task = new Task(() => action(t));
                task.RunSynchronously();
                return Task.FromResult(task);
            }
        }), continuationOptions).Unwrap();
    }
}

обновлено для решения комментарий:

я не контролирую вызывающего абонента - я не могу заставить их использовать определенный продолжить-с вариантом: если бы я мог, проблема не существовала бы в первое место

я не знал, что вы не контролируете абонента. Тем не менее, если вы не контролируете его, вы, вероятно, не проходите TaskCompletionSource объект напрямую к звонящему, либо. Логически, вы бы передали маркер часть его, т. е. tcs.Task. В в этом случае решение может быть еще проще, добавив другой метод расширения к приведенному выше:

// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
    return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
    {
        Thread thread = null;
        s_tcsTasks.TryGetValue(antecedent, out thread);
        if (Thread.CurrentThread == thread)
        {
            // continue on a pool thread
            return antecedent.ContinueWith(t => t, 
                TaskContinuationOptions.None).Unwrap();
        }
        else
        {
            return antecedent;
        }
    }), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}

использование:

// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ... 

// client code
task.ContinueWith(delegate
{
    Identify();
}, TaskContinuationOptions.ExecuteSynchronously);

// ...
// library code
source.SetResultAsync(123);

это на самом деле работы как await и ContinueWith (скрипка) и не имеет отражения хаки.

Как насчет вместо того, чтобы делать

var task = source.Task;

вы делаете это вместо

var task = source.Task.ContinueWith<Int32>( x => x.Result );

Если вы можете и готовы использовать отражение, это должно сделать это;

public static class MakeItAsync
{
    static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result)
    {
        var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
        var continuations = (List<object>)continuation.GetValue(source.Task);

        foreach (object c in continuations)
        {
            var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
            var options = (TaskContinuationOptions)option.GetValue(c);

            options &= ~TaskContinuationOptions.ExecuteSynchronously;
            option.SetValue(c, options);
        }

        source.TrySetResult(result);
    }        
}

Обновлено, я написал отдельный ответ заниматься ContinueWith в противоположность await (поскольку ContinueWith не заботится о текущем контексте синхронизации).

вы можете использовать немой контекст синхронизации, чтобы наложить асинхронность на продолжение, вызванное вызовом SetResult/SetCancelled/SetException on TaskCompletionSource. Я считаю, что текущий контекст синхронизации (в точке await tcs.Task) - это критерии, которые TPL использует, чтобы решить, следует ли делать такое продолжение синхронный или асинхронный.

следующие работы для меня:

if (notifyAsync)
{
    tcs.SetResultAsync(null);
}
else
{
    tcs.SetResult(null);
}

SetResultAsync реализуется следующим образом:

public static class TaskExt
{
    static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
    {
        FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
    }

    // FakeSynchronizationContext
    class FakeSynchronizationContext : SynchronizationContext
    {
        private static readonly ThreadLocal<FakeSynchronizationContext> s_context =
            new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());

        private FakeSynchronizationContext() { }

        public static FakeSynchronizationContext Instance { get { return s_context.Value; } }

        public static void Execute(Action action)
        {
            var savedContext = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance);
            try
            {
                action();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(savedContext);
            }
        }

        // SynchronizationContext methods

        public override SynchronizationContext CreateCopy()
        {
            return this;
        }

        public override void OperationStarted()
        {
            throw new NotImplementedException("OperationStarted");
        }

        public override void OperationCompleted()
        {
            throw new NotImplementedException("OperationCompleted");
        }

        public override void Post(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Post");
        }

        public override void Send(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Send");
        }
    }
}

SynchronizationContext.SetSynchronizationContextочень дешево С точки зрения накладных расходов он добавляет. На самом деле, очень похожий подход используется реализация WPF Dispatcher.BeginInvoke.

TPL сравнивает целевой контекст синхронизации в точке await до точки tcs.SetResult. Если синхронизация контекст один и тот же (или нет контекста синхронизации в обоих местах), продолжение вызывается непосредственно, синхронно. В противном случае он помещается в очередь с помощью SynchronizationContext.Post на целевой контекст синхронизации, т. е. нормальный await поведение. То, что делает этот подход, всегда накладывает SynchronizationContext.Post поведение (или продолжение потока пула, если нет целевого контекста синхронизации).

Обновлено, это не будет работать для task.ContinueWith, потому что ContinueWith не волнует текущий контекст синхронизации. Однако это работает для await task (скрипка). Он также работает для await task.ConfigureAwait(false).

ото этот подход работает ContinueWith.

The имитировать прерывание подход выглядел действительно хорошо, но привел к захвату потоков TPL в некоторых случаях.

у меня тогда была реализация, которая была похожа на проверка объекта продолжение, но просто проверка на любой продолжение, поскольку на самом деле слишком много сценариев для данного кода, чтобы хорошо работать, но это означало, что даже такие вещи, как Task.Wait в результате получился пул потоков уважать.

в конечном счете, после осмотра много-много Ил, только безопасное и полезное сценарий SetOnInvokeMres сценарий (ручной сброс события-тонкое продолжение). Есть много других сценариев:

  • некоторые из них не безопасны, и привести к потоку угона
  • остальные не полезны, так как они в конечном итоге приводят к пулу потоков

Итак, в конце концов, я решил проверить ненулевой объект продолжения; если он равен нулю, штраф (без продолжений); если он не равен нулю, проверьте специальный случай для SetOnInvokeMres - если это так: отлично (безопасно вызывать); в противном случае пусть пул потоков выполняет TrySetComplete, не говоря задача сделать что-нибудь особенное, как спуфинг прервать. Task.Wait использует SetOnInvokeMres подход, который является конкретным сценарием, который мы хотим попробовать действительно трудно не тупик.

Type taskType = typeof(Task);
FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic);
Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic);
if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null)
{
    var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true);
    var il = method.GetILGenerator();
    var hasContinuation = il.DefineLabel();
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel();
    // check if null
    il.Emit(OpCodes.Brtrue_S, nonNull);
    il.MarkLabel(goodReturn);
    il.Emit(OpCodes.Ldc_I4_1);
    il.Emit(OpCodes.Ret);

    // check if is a SetOnInvokeMres - if so, we're OK
    il.MarkLabel(nonNull);
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    il.Emit(OpCodes.Isinst, safeScenario);
    il.Emit(OpCodes.Brtrue_S, goodReturn);

    il.Emit(OpCodes.Ldc_I4_0);
    il.Emit(OpCodes.Ret);

    IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));

Comments

    Ничего не найдено.