Как я могу предотвратить Синхронное продолжение задачи?
у меня есть некоторые библиотеки (сокет сети) код, который обеспечивает Task-основанный API для ожидающих ответов на запросы, основанный на TaskCompletionSource<T>. Однако в TPL есть раздражение в том, что, похоже, невозможно предотвратить синхронные продолжения. Что бы я как чтобы быть в состоянии сделать это:
- рассказать
TaskCompletionSource<T>то есть не должно позволять абонентам присоединяться сTaskContinuationOptions.ExecuteSynchronouslyили - установить результат (
SetResult/TrySetResult) таким образом, что указывает, чтоTaskContinuationOptions.ExecuteSynchronouslyследует игнорировать, используя пул вместо
в частности, проблема у меня заключается в том, что входящие данные обрабатываются специальным читателем, и если вызывающий абонент может подключиться с TaskContinuationOptions.ExecuteSynchronously они могут задержать читателя (который влияет не только на них). Ранее я работал над этим с помощью некоторых хакеров, которые определяют, является ли любой продолжения присутствуют, и если они есть, это толкает завершение на ThreadPool, однако это значительное влияние, если вызывающий абонент насыщает свою рабочую очередь, так как завершение не будет обработано своевременно. Если они используют Task.Wait() (или аналогичный), они тогда будут по существу тупик себя. Кроме того, именно поэтому читатель находится в выделенном потоке, а не с помощью рабочих.
Итак; прежде чем я попытаюсь пилить команду TPL: я пропустил вариант?
ключевые моменты:
- я не хочу, чтобы внешние абоненты могли захватить мои нить
- я не могу использовать
ThreadPoolкак реализация, так как она должна работать, когда пул насыщен
пример ниже производит выход (заказ может варьироваться в зависимости от времени):
Continuation on: Main thread
Press [return]
Continuation on: Thread pool
проблема в том, что случайный абонент сумел получить продолжение на "основной поток". В реальном коде это было бы прерыванием основного читателя; плохие вещи!
код:
using System;
using System.Threading;
using System.Threading.Tasks;
static class Program
{
static void Identify()
{
var thread = Thread.CurrentThread;
string name = thread.IsThreadPoolThread
? "Thread pool" : thread.Name;
if (string.IsNullOrEmpty(name))
name = "#" + thread.ManagedThreadId;
Console.WriteLine("Continuation on: " + name);
}
static void Main()
{
Thread.CurrentThread.Name = "Main thread";
var source = new TaskCompletionSource<int>();
var task = source.Task;
task.ContinueWith(delegate {
Identify();
});
task.ContinueWith(delegate {
Identify();
}, TaskContinuationOptions.ExecuteSynchronously);
source.TrySetResult(123);
Console.WriteLine("Press [return]");
Console.ReadLine();
}
}
6 ответов:
новое в .NET 4.6:
.NET 4.6 содержит новый
TaskCreationOptions:RunContinuationsAsynchronously.
Так вы готовы использовать отражение для доступа к закрытым полям...
вы можете отметить задачу TCS с помощью
TASK_STATE_THREAD_WAS_ABORTEDфлаг, который приведет к тому, что все продолжения не будут встроены.const int TASK_STATE_THREAD_WAS_ABORTED = 134217728; var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance); stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);Edit:
вместо использования отражения испускают, я предлагаю вам использовать выражения. Это гораздо более читабельно и имеет преимущество быть PCL-совместимым:
var taskParameter = Expression.Parameter(typeof (Task)); const string stateFlagsFieldName = "m_stateFlags"; var setter = Expression.Lambda<Action<Task>>( Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();без использования отражения:
если кто-то заинтересован, я придумал способ сделать это без размышлений, но он тоже немного "грязный" и, конечно же, несет в себе несущественный штраф perf:
try { Thread.CurrentThread.Abort(); } catch (ThreadAbortException) { source.TrySetResult(123); Thread.ResetAbort(); }
я не думаю, что в TPL есть что-то, что обеспечивает явно API контроль над
TaskCompletionSource.SetResultпродолжений. Я решил оставить себе первоначальный ответ для управления этим поведением дляasync/awaitсценарии.вот еще одно решение, которое налагает на асинхронные
ContinueWith, еслиtcs.SetResult-срабатывает продолжение происходит в том же потокеSetResultбыло:public static class TaskExt { static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks = new ConcurrentDictionary<Task, Thread>(); // SetResultAsync static public void SetResultAsync<TResult>( this TaskCompletionSource<TResult> @this, TResult result) { s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread); try { @this.SetResult(result); } finally { Thread thread; s_tcsTasks.TryRemove(@this.Task, out thread); } } // ContinueWithAsync, TODO: more overrides static public Task ContinueWithAsync<TResult>( this Task<TResult> @this, Action<Task<TResult>> action, TaskContinuationOptions continuationOptions = TaskContinuationOptions.None) { return @this.ContinueWith((Func<Task<TResult>, Task>)(t => { Thread thread = null; s_tcsTasks.TryGetValue(t, out thread); if (Thread.CurrentThread == thread) { // same thread which called SetResultAsync, avoid potential deadlocks // using thread pool return Task.Run(() => action(t)); // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread) // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning); } else { // continue on the same thread var task = new Task(() => action(t)); task.RunSynchronously(); return Task.FromResult(task); } }), continuationOptions).Unwrap(); } }обновлено для решения комментарий:
я не контролирую вызывающего абонента - я не могу заставить их использовать определенный продолжить-с вариантом: если бы я мог, проблема не существовала бы в первое место
я не знал, что вы не контролируете абонента. Тем не менее, если вы не контролируете его, вы, вероятно, не проходите
TaskCompletionSourceобъект напрямую к звонящему, либо. Логически, вы бы передали маркер часть его, т. е.tcs.Task. В в этом случае решение может быть еще проще, добавив другой метод расширения к приведенному выше:// ImposeAsync, TODO: more overrides static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this) { return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent => { Thread thread = null; s_tcsTasks.TryGetValue(antecedent, out thread); if (Thread.CurrentThread == thread) { // continue on a pool thread return antecedent.ContinueWith(t => t, TaskContinuationOptions.None).Unwrap(); } else { return antecedent; } }), TaskContinuationOptions.ExecuteSynchronously).Unwrap(); }использование:
// library code var source = new TaskCompletionSource<int>(); var task = source.Task.ImposeAsync(); // ... // client code task.ContinueWith(delegate { Identify(); }, TaskContinuationOptions.ExecuteSynchronously); // ... // library code source.SetResultAsync(123);это на самом деле работы как
awaitиContinueWith(скрипка) и не имеет отражения хаки.
Как насчет вместо того, чтобы делать
var task = source.Task;вы делаете это вместо
var task = source.Task.ContinueWith<Int32>( x => x.Result );
Если вы можете и готовы использовать отражение, это должно сделать это;
public static class MakeItAsync { static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result) { var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var continuations = (List<object>)continuation.GetValue(source.Task); foreach (object c in continuations) { var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var options = (TaskContinuationOptions)option.GetValue(c); options &= ~TaskContinuationOptions.ExecuteSynchronously; option.SetValue(c, options); } source.TrySetResult(result); } }
Обновлено, я написал отдельный ответ заниматься
ContinueWithв противоположностьawait(посколькуContinueWithне заботится о текущем контексте синхронизации).вы можете использовать немой контекст синхронизации, чтобы наложить асинхронность на продолжение, вызванное вызовом
SetResult/SetCancelled/SetExceptiononTaskCompletionSource. Я считаю, что текущий контекст синхронизации (в точкеawait tcs.Task) - это критерии, которые TPL использует, чтобы решить, следует ли делать такое продолжение синхронный или асинхронный.следующие работы для меня:
if (notifyAsync) { tcs.SetResultAsync(null); } else { tcs.SetResult(null); }
SetResultAsyncреализуется следующим образом:public static class TaskExt { static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result) { FakeSynchronizationContext.Execute(() => tcs.SetResult(result)); } // FakeSynchronizationContext class FakeSynchronizationContext : SynchronizationContext { private static readonly ThreadLocal<FakeSynchronizationContext> s_context = new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext()); private FakeSynchronizationContext() { } public static FakeSynchronizationContext Instance { get { return s_context.Value; } } public static void Execute(Action action) { var savedContext = SynchronizationContext.Current; SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance); try { action(); } finally { SynchronizationContext.SetSynchronizationContext(savedContext); } } // SynchronizationContext methods public override SynchronizationContext CreateCopy() { return this; } public override void OperationStarted() { throw new NotImplementedException("OperationStarted"); } public override void OperationCompleted() { throw new NotImplementedException("OperationCompleted"); } public override void Post(SendOrPostCallback d, object state) { throw new NotImplementedException("Post"); } public override void Send(SendOrPostCallback d, object state) { throw new NotImplementedException("Send"); } } }
SynchronizationContext.SetSynchronizationContextочень дешево С точки зрения накладных расходов он добавляет. На самом деле, очень похожий подход используется реализация WPFDispatcher.BeginInvoke.TPL сравнивает целевой контекст синхронизации в точке
awaitдо точкиtcs.SetResult. Если синхронизация контекст один и тот же (или нет контекста синхронизации в обоих местах), продолжение вызывается непосредственно, синхронно. В противном случае он помещается в очередь с помощьюSynchronizationContext.Postна целевой контекст синхронизации, т. е. нормальныйawaitповедение. То, что делает этот подход, всегда накладываетSynchronizationContext.Postповедение (или продолжение потока пула, если нет целевого контекста синхронизации).Обновлено, это не будет работать для
task.ContinueWith, потому чтоContinueWithне волнует текущий контекст синхронизации. Однако это работает дляawait task(скрипка). Он также работает дляawait task.ConfigureAwait(false).ото этот подход работает
ContinueWith.
The имитировать прерывание подход выглядел действительно хорошо, но привел к захвату потоков TPL в некоторых случаях.
у меня тогда была реализация, которая была похожа на проверка объекта продолжение, но просто проверка на любой продолжение, поскольку на самом деле слишком много сценариев для данного кода, чтобы хорошо работать, но это означало, что даже такие вещи, как
Task.Waitв результате получился пул потоков уважать.в конечном счете, после осмотра много-много Ил, только безопасное и полезное сценарий
SetOnInvokeMresсценарий (ручной сброс события-тонкое продолжение). Есть много других сценариев:
- некоторые из них не безопасны, и привести к потоку угона
- остальные не полезны, так как они в конечном итоге приводят к пулу потоков
Итак, в конце концов, я решил проверить ненулевой объект продолжения; если он равен нулю, штраф (без продолжений); если он не равен нулю, проверьте специальный случай для
SetOnInvokeMres- если это так: отлично (безопасно вызывать); в противном случае пусть пул потоков выполняетTrySetComplete, не говоря задача сделать что-нибудь особенное, как спуфинг прервать.Task.WaitиспользуетSetOnInvokeMresподход, который является конкретным сценарием, который мы хотим попробовать действительно трудно не тупик.Type taskType = typeof(Task); FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic); Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic); if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null) { var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true); var il = method.GetILGenerator(); var hasContinuation = il.DefineLabel(); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel(); // check if null il.Emit(OpCodes.Brtrue_S, nonNull); il.MarkLabel(goodReturn); il.Emit(OpCodes.Ldc_I4_1); il.Emit(OpCodes.Ret); // check if is a SetOnInvokeMres - if so, we're OK il.MarkLabel(nonNull); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); il.Emit(OpCodes.Isinst, safeScenario); il.Emit(OpCodes.Brtrue_S, goodReturn); il.Emit(OpCodes.Ldc_I4_0); il.Emit(OpCodes.Ret); IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));
Comments