Недавно на ресурсе Medium были опубликованы две статьи от одного и того же автора, затрагивающие функциональность C# async/await.
Основными выводами были:
- рекурсивный вызов асинхронного метода в C# невозможен
- goroutine’ы лучше задач (тасков) в .NET в плане производительности
Но главная проблема вышеприведенных публикаций - абсолютное непонимание модели кооперативной многозадачности в C# и бессмысленные примеры бенчмарков.
Далее в статье я попытаюсь раскрыть суть проблемы более подробно с примерами решения.
TL;DR
После небольшой правки кода исходных примеров, .NET оказывается быстрее Go channels. Рекурсивный вызов асинхронных методов также становится возможен.
NB: использоваться будут свежевыпущенный .NET Core 2.0 и Go 1.8.3.
Stack overflow & async
Перейдем сразу к рассмотрению примера #1:
using System;
using System.Threading;
using System.Threading.Tasks;
namespace CSharpAsyncRecursion
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Counted to {0}.", CountRecursivelyAsync(10000).Result);
}
static async Task<int> CountRecursivelyAsync(int count)
{
if (count <= 0)
return count;
return 1 + await CountRecursivelyAsync(count - 1);
}
}
}
Консоль упадет со StackOverflowException
. Печаль!
Вариант реализации tail-call оптимизации здесь не подходит, т.к. мы не собираемся править компилятор, переписывать байт-код и т.п.
Поэтому решение должно подходить для максимально общего случая.
Обычно рекурсивный алгоритм заменяют на итерационный. Но так сделать для примера с C# будет слишком просто.
Нам на помощь приходит механизм отложенного выполнения.
Реализуем простой метод Defer
:
Task<T> Defer<T, TState>(Func<TState, Task<T>> task, TState state) =>
Task.Factory.StartNew(async s => await task((TState)s), state).Unwrap();
Для того, чтобы поставить задачу в очередь необходимо указание планировщика.
Методы Task.Run
и Task.Factory.StartNew
позволяют его использовать (По-умолчанию - TaskScheduler.Default
, который для данного примера и так подойдет), а последний позволяет передать объект-состояние в делегат.
На даный момент
Task.Factory.StartNew
не подерживает обобщенные перегрузки и вряд ли будет. Если необходимо передать состояние, то либоAction<object>
, либоFunc<object, TResult>
.
Перепишем пример, используя новый метод Defer
:
static async Task Main(string[] args)
{
Task<T> Defer<T, TState>(Func<TState, Task<T>> task, TState state) =>
Task.Factory.StartNew(async s => await task((TState)s), state).Unwrap();
Task<int> CountRecursivelyAsync(int count)
{
if (count <= 0)
return Task.FromResult(count);
return Defer(seed => CountRecursivelyAsync(seed - 1).ContinueWith(rec => rec.Result + 1), count);
}
Console.WriteLine($"Counted to {await CountRecursivelyAsync(100000)}.");
}
Оно не то, чем кажется
Для начала ознакомимся с кодом бенчмарков из этой статьи.
package main
import (
"flag";
"fmt";
"time"
)
func measure(start time.Time, name string) {
elapsed := time.Since(start)
fmt.Printf("%s took %s", name, elapsed)
fmt.Println()
}
var maxCount = flag.Int("n", 1000000, "how many")
func f(output, input chan int) {
output <- 1 + <-input
}
func test() {
fmt.Printf("Started, sending %d messages.", *maxCount)
fmt.Println()
flag.Parse()
defer measure(time.Now(), fmt.Sprintf("Sending %d messages", *maxCount))
finalOutput := make(chan int)
var left, right chan int = nil, finalOutput
for i := 0; i < *maxCount; i++ {
left, right = right, make(chan int)
go f(left, right)
}
right <- 0
x := <-finalOutput
fmt.Println(x)
}
func main() {
test()
test()
}
C#-код:
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
namespace ChannelsTest
{
class Program
{
public static void Measure(string title, Action<int, bool> test, int count, int warmupCount = 1)
{
test(warmupCount, true); // Warmup
var sw = new Stopwatch();
GC.Collect();
sw.Start();
test(count, false);
sw.Stop();
Console.WriteLine($"{title}: {sw.Elapsed.TotalMilliseconds:0.000}ms");
}
static async void AddOne(WritableChannel<int> output, ReadableChannel<int> input)
{
await output.WriteAsync(1 + await input.ReadAsync());
}
static async Task<int> AddOne(Task<int> input)
{
var result = 1 + await input;
await Task.Yield();
return result;
}
static void Main(string[] args)
{
if (!int.TryParse(args.FirstOrDefault(), out var maxCount))
maxCount = 1000000;
Measure($"Sending {maxCount} messages (channels)", (count, isWarmup) => {
var firstChannel = Channel.CreateUnbuffered<int>();
var output = firstChannel;
for (var i = 0; i < count; i++) {
var input = Channel.CreateUnbuffered<int>();
AddOne(output.Out, input.In);
output = input;
}
output.Out.WriteAsync(0);
if (!isWarmup)
Console.WriteLine(firstChannel.In.ReadAsync().Result);
}, maxCount);
Measure($"Sending {maxCount} messages (Task<int>)", (count, isWarmup) => {
var tcs = new TaskCompletionSource<int>();
var firstTask = AddOne(tcs.Task);
var output = firstTask;
for (var i = 0; i < count; i++) {
var input = AddOne(output);
output = input;
}
tcs.SetResult(-1);
if (!isWarmup)
Console.WriteLine(output.Result);
}, maxCount);
}
}
}
Что брасается сразу в глаза:
- Сам пример (что для Go, что для C#) весьма странен. Все сводится к эмуляции цепочки действий и их лавинообразном ‘спуске’. Более того в Go создается chan int на каждую итерацию из 1 млн. Это вообще best-practice??
- автор использует
Task.Yield()
, оправдывая это тем, что иначе пример упадет с StackOverflowException. С таким же успехом мог бы и Task.Delay задействовать. Зачем мелочиться-то?! Но, как увидели ранее, все проистекает из-за ‘неудачного’ опыта с рекурсивными вызовами асинхронных методов. - Изначально в примерах также фигурирует бета-версия System.Threading.Tasks.Channels для сравнения с каналами в Go. Я решил оставить только пример с тасками, т.к. библиотека System.Threading.Tasks.Channels еще не выпущена официально.
- Вызов GC.Collect() после прогрева. Боюсь, я откажусь от такого сомнительного преимущества.
Go использует понятие goroutine - легковесных потоков. Соответственно каждая горутина имеет свой стек. На данный момент размер стека равен 2KB. Поэтому при запуске бенчмарков будьте осторожны (более 4GB понадобиться)!
С одной стороны, это может быть полезно CLR JIT’у, а с другой - Go переиспользует уже созданные горутины, что позволяет исключить замеры трат на выделение памяти системой.
Результаты до оптимизации
Среда тестирования:
- Core i7 6700HQ (3.5 GHz)
- 8 GB DDR4 (2133 MHz)
- Win 10 x64 (Creators Update)
Ну что ж, у меня получились похожие результаты (берем в расчет лучшее время).
NB: Т.к. пример реализует просто цепочку вызовов, то ни GOMAXPROCS, ни размер канала не влияют на результат (уже проверено опытным путем).
Да, действительно: Go опережает C# на ~30%. Challange accepted!
Используй TaskScheduler, Luke!
Если не использовать что-то наподобие Task.Yield
, то снова будет StackOverflowException.
На этот раз не будем использовать Defer
!
Мысль рализации проста: запускаем доп. поток, который слушает/обрабатывает задачи по очереди.
По-моему, легче реализовать собственный планировщик, чем контекст синхронизации.
Сам класс TaskScheduler
выглядит так:
// Represents an object that handles the low-level work of queuing tasks onto threads.
public abstract class TaskScheduler
{
/* остальные методы */
public virtual int MaximumConcurrencyLevel { get; }
public static TaskScheduler FromCurrentSynchronizationContext();
protected abstract IEnumerable<Task> GetScheduledTasks();
protected bool TryExecuteTask(Task task);
protected abstract bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued);
protected internal abstract void QueueTask(Task task);
protected internal virtual bool TryDequeue(Task task);
}
Как мы видим, TaskScheduler
уж реализует подобие очереди: QueueTask
и TryDequeue
.
Дабы не изобретать велосипед, воспользуемся уже готовыми планировщиками от команды .NET.
Внимание! Камера! Мотор!
Перепишем это дело на C# 7, делая его максимально приближенным к Go:
static void Main(string[] args)
{
// т.к. мы делаем методы async, то и лямбды становятся асинхронными
// Action<int, bool> превращаем в Func<int, bool, Task>, чтобы избежать проблемы async void
async Task Measure(string title, Func<int, bool, Task> test, int count, int warmupCount = 1)
{
await test(warmupCount, true); // Warmup
var sw = new Stopwatch();
sw.Start();
await test(count, false);
sw.Stop();
Console.WriteLine($"{title}: {sw.Elapsed.TotalMilliseconds:0.000}ms");
}
async Task<int> f(Task<int> input)
{
return 1 + await input; // return output
}
Task.Factory.StartNew(async () =>
{
if (!int.TryParse(args.FirstOrDefault(), out var maxCount))
maxCount = 1000000;
await Measure($"Sending {maxCount} messages (Task<int>)", async (count, isWarmup) => {
var tcs = new TaskCompletionSource<int>();
(var left, var right) = ((Task<int>)null, f(tcs.Task));
for (var i = 0; i < count; i++)
{
left = f(right);
right = left;
}
tcs.SetResult(-1);
if (!isWarmup)
Console.WriteLine(right.Result);
}, maxCount);
}, CancellationToken.None, TaskCreationOptions.None, new StaTaskScheduler(2)).Wait();
}
Здесь необходимо сделать пару ремарок:
- Т.к. мы делаем методы async, то и лямбды становятся асинхронными. Соответственно,
Action<int, bool>
заменяем наFunc<int, bool, Task>
, чтобы избежать проблемы async void. GC.Collect()
убираем как и говорилось выше- Используем
StaTaskScheduler
с двумя вспомогательными потоками, чтобы избежать блокировки: один ждет результата, а др. обрабатывает задачи. Можно использовать только одним доп. потоком, однако это потребует корректировки нашего планировщика, да и хочется обойтись минимальными усилиями.
Проблема рекурсивных вызовов исчезает автоматически. Поэтому смело убираем из метода f(input)
вызов Task.Yield()
.
Теперь публикуем релизную сборку:
dotnet publish -c release -r win10-x64
И запускаем… Внезапно получаем около 600 ms вместо прежних 1300 ms. Not bad! Go, напомню, отрабатывал на уровне 1000 ms. Но меня не покидает чувство неуместности использования каналов как средство кооперативной многозадачности в исходных примерах.
p.s. Я не делал огромного количества прогонов тестов с полноценной статистикой распределения значений замеров специально. Цель статьи заключалась в освещении определеного use-case’a async/await и попутного развенчания мифа о невозможности рекурсивного вызова асинхронных методов в C#.
p.p.s.
Причиной изначального отставания C# было использование Task.Yield()
. Постоянное переключение контекста - не есть гуд!