Sunday, September 4, 2011

Programming with C# asynchronous sequences

Tomas Petricek in his last blog post titled “Programming with F# asynchronous sequences” presents F# implementation of something called asynchronous sequences. In this post I will show you how the same concept can be implemented in C#. Let’s look at the sample code below to better understand what the asynchronous sequence is:

IEnumerable<...> AsyncSeq()
{
yield return "Hello";
await TaskEx.Delay(100);
yield return "world!";
}

Asynchronous sequences is a code that produces the sequence of values generated on demand (this is how the IEnumerable interface can be interpreted) but additionally does some asynchronous work during the evaluation process (await keyword). Every time the client of asynchronous sequence calls MoveNext method, next value is being evaluated. The key feature here is that the client decides when to produce next value and when to stop the processing.

There are two problems with such an implementation of asynchronous sequence. Sequences in .Net world are represented with IEnumerable interface, but the interface allows only synchronous processing. Since the MoveNext method returns bool value in the interface implementation, we need to immediately decide whether the next value can be produced or not. In the asynchronous processing it can take a few minutes or even hours to provide such information. The second problem is that so far we cannot mix together await keyword (Async Ctp) with yield return/yield break keywords inside the same method body. My solution resolves those two problems and the above sequence can be implemented the fallowing way:

IEnumerable<AsyncSeqItem<string>> AsyncSeq()
{
yield return "Hello";
yield return TaskEx.Delay(100);
yield return "world!";
}

public enum AsyncSeqItemMode
{
Value, Task, Sequence
}

public class AsyncSeqItem<T>
{
public AsyncSeqItemMode Mode { get; private set; }
public T Value { get; private set; }
public Task Task { get; private set; }
public IEnumerable<AsyncSeqItem<T>> Seq { get; private set; }

public AsyncSeqItem(T value)
{
Value = value;
Mode = AsyncSeqItemMode.Value;
}

public AsyncSeqItem(Task task)
{
Task = task;
Mode = AsyncSeqItemMode.Task;
}

public AsyncSeqItem(IEnumerable<AsyncSeqItem<T>> seq)
{
Seq = seq;
Mode = AsyncSeqItemMode.Sequence;
}

public static implicit operator AsyncSeqItem<T>(T value)
{
return new AsyncSeqItem<T>(value);
}

public static implicit operator AsyncSeqItem<T>(Task task)
{
return new AsyncSeqItem<T>(task);
}
}

AsyncSeqItem represents one of three following values:

  • Value – next value generated by the sequence
  • Task – some asynchronous work that needs to be awaited for before going forward
  • Sequence – it’s used with recursive calls and it means that we want to use tail recursion

There are two ways of consuming such sequence in the client:

public static class AsyncSeqExtensions
{
public static IEnumerable<Task<Option<T>>> ToTaskEnumerable<T>(this IEnumerable<AsyncSeqItem<T>> seq, bool continueOnCapturedContext = true)
{ ... }

public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(this IEnumerable<AsyncSeqItem<T>> seq, bool continueOnCapturedContext = true)
{ ... }
}

public class Option<T>
{
public T Value { get; private set; }
public bool HasValue { get; private set; }

public Option()
{
HasValue = false;
}

public Option(T value)
{
Value = value;
HasValue = true;
}

public static implicit operator Option<T>(T value)
{
return new Option<T>(value);
}
}

In the first approach we are calling ToAsyncEnumerable extension method returning the sequence of tasks. Each task wraps special type called Option<T> which can be used similarly to Nullable<T> type except that it works with value and reference types. Returning task with option object without the value means that we reached the end of the sequence. I also provide few standard LINQ operators built on the top of such a sequence semantic:

public static class AsyncSeqExtensions
{
async private static Task ForEachTaskImpl<T>(this IEnumerable<Task<Option<T>>> seq, Action<Task<Option<T>>> action)
{
foreach (var task in seq)
{
await task;
action(task);
}
}
public static Task ForEachTask<T>(this IEnumerable<Task<Option<T>>> seq, Action<Task<Option<T>>> action)
{
return ForEachTaskImpl(seq, action);
}

public static Task ForEach<T>(this IEnumerable<Task<Option<T>>> seq, Action<T> action)
{
return seq.ForEachTask(task =>
{
if(task.Result.HasValue)
action(task.Result.Value);
});
}

async private static Task<T[]> ToArrayImpl<T>(IEnumerable<Task<Option<T>>> seq)
{
var list = new List<T>();
await seq.ForEach(v => list.Add(v));
return list.ToArray();
}
public static Task<T[]> ToArray<T>(this IEnumerable<Task<Option<T>>> seq)
{
return ToArrayImpl(seq);
}

public static IEnumerable<Task<Option<TResult>>> Select<T, TResult>(this IEnumerable<Task<Option<T>>> source,
Func<T,TResult> selector) { ... }

public static IEnumerable<Task<Option<T>>> Where<T>(this IEnumerable<Task<Option<T>>> source,
Func<T, bool> predicate) { ... }

public static IEnumerable<Task<Option<T>>> Take<T>(this IEnumerable<Task<Option<T>>> source,
int count) { ... }

...
}

Returning additional task object at the end of a sequence with special value allows us to use standard IEnumerable<T> interface but it’s a little bit inconvenient. In the second approach we use the IAsyncEnumerable interface from the Reactive Framework library released some time ago.

public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetEnumerator();
}

public interface IAsyncEnumerator<out T> : IDisposable
{
Task<bool> MoveNext();
T Current { get; }
}

public static class AsyncEnumerable
{
public static IAsyncEnumerable<TResult> Select<TSource, TResult>(IAsyncEnumerable<TSource> source,
Func<TSource, TResult> selector) { ... }

public static IAsyncEnumerable<TSource> Where<TSource>(IAsyncEnumerable<TSource> source,
Func<TSource, bool> predicate) { ... }

public static IAsyncEnumerable<TSource> Take<TSource>(IAsyncEnumerable<TSource> source,
int n) { ... }
}

This interface perfectly represents the semantic of asynchronous sequence. Rx library also provides many standard LINQ operations like: Where, Select, Take, Sum, First and so on. This allows us to write almost any LINQ query executing on the top of asynchronous sequence.

Now let’s summarize what we achieved so far. We can write imperative code implementing asynchronous sequence. We can use extension method to create one of two asynchronous sequence representations. Finally we can iterate through all items in such a sequence or we can build a new sequence object using LINQ operators.

The C# version of the web crawler presented in Tomas Petricek’s blog post could look like this:

public static class AsyncSeqSample
{
async public static Task CrawlBingUsingAsyncEnumerable()
{
await RandomCrawl("http://news.bing.com")
.ToAsyncEnumerable()
.Where(t => !t.Item1.Contains("bing.com"))
.Select(t => t.Item2)
.Take(10)
.ForEach(Console.WriteLine);

Console.WriteLine("the end...");
}

async public static Task CrawlBingUsingTaskEnumerable()
{
await RandomCrawl("http://news.bing.com")
.ToTaskEnumerable()
.Where(t => !t.Item1.Contains("bing.com"))
.Select(t => t.Item2)
.Take(10)
.ForEach(Console.WriteLine);

Console.WriteLine("the end...");
}

public static IEnumerable<AsyncSeqItem<Tuple<string, string>>> RandomCrawl(string url)
{
return RandomCrawlLoop(url, new HashSet<string>());
}

private static IEnumerable<AsyncSeqItem<Tuple<string,string>>> RandomCrawlLoop(string url, HashSet<string> visited)
{
if (visited.Add(url))
{
var downloadTask = DownloadDocument(url);
yield return downloadTask;
if (downloadTask.Result.HasValue)
{
var doc = downloadTask.Result.Value;
yield return Tuple.Create(url, GetTitle(doc));
foreach (var link in ExtractLinks(doc))
{
foreach (var l in RandomCrawlLoop(link, visited))
{
yield return l;
}
}
}
}
}

private static string[] ExtractLinks(HtmlDocument doc)
{
try
{
var q = from a in doc.DocumentNode.SelectNodes("//a")
where a.Attributes.Contains("href")
let href = a.Attributes["href"].Value
where href.StartsWith("http://")
let endl = href.IndexOf('?')
select endl > 0 ? href.Substring(0, endl) : href;

return q.ToArray();
}
catch
{
return new string[0];
}
}

async private static Task<Option<HtmlDocument>> DownloadDocument(string url)
{
try
{
var client = new WebClient();
var html = await client.DownloadStringTaskAsync(url);
var doc = new HtmlDocument();
doc.LoadHtml(html);
return new Option<HtmlDocument>(doc);
}
catch (Exception)
{
return new Option<HtmlDocument>();
}
}

private static string GetTitle(HtmlDocument doc)
{
var title = doc.DocumentNode.SelectSingleNode("//title");
return title != null ? title.InnerText.Trim() : "Untitled";
}
}

Now let’s see how ToAsyncEnumerable and ToTaskEnumerable methods have been implemented:

public static class AsyncSeqExtensions
{
public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(this IEnumerable<AsyncSeqItem<T>> seq, bool continueOnCapturedContext = true)
{
if (seq == null) throw new ArgumentNullException("seq");

return new AnonymousAsyncEnumerable<T>(() =>
{
var enumerator = seq.ToTaskEnumerable(continueOnCapturedContext).GetEnumerator();
seq = null; // holding reference to seq parameter introduces memory leaks when asynchronous sequence uses recursive calls
TaskCompletionSource<bool> currentTcs = null;
Task<Option<T>> currentTask = null;

return new AnonymousAsyncEnumerator<T>(
() =>
{
currentTcs = new TaskCompletionSource<bool>();

if (CheckEndOfSeq(currentTask) == false)
{
currentTcs.SetResult(false);
return currentTcs.Task;
}

enumerator.MoveNext();

enumerator.Current.ContinueWith(t =>
{
if (t.IsFaulted)
{
currentTcs.SetException(t.Exception);
}
else
{
if (!t.Result.HasValue)
{
currentTcs.SetResult(false);
}
else
{
currentTask = t;
currentTcs.SetResult(true);
}
}
});

return currentTcs.Task;
},
() => currentTask.Result.Value
);
});
}

public static IEnumerable<Task<Option<T>>> ToTaskEnumerable<T>(this IEnumerable<AsyncSeqItem<T>> seq, bool continueOnCapturedContext = true)
{
if (seq == null) throw new ArgumentNullException("seq");

return new AnonymousEnumerable<Task<Option<T>>>(() =>
{
var synchronizationContext = continueOnCapturedContext ? SynchronizationContext.Current : null;

var enumerator = seq.GetEnumerator();
seq = null; // holding reference to seq parameter introduces memory leaks when asynchronous sequence uses recursive calls

TaskCompletionSource<Option<T>> currentTcs = null;

return new AnonymousEnumerator<Task<Option<T>>>(
() =>
{
if (CheckEndOfSeq(currentTcs) == false)
return false;

currentTcs = new TaskCompletionSource<Option<T>>();

Action moveNext = null;
moveNext = () =>
{
Start:

bool b;

try
{
b = enumerator.MoveNext();
}
catch (Exception exception)
{
currentTcs.SetException(exception);
return;
}

if (b == false)
{
currentTcs.SetResult(new Option<T>());
}
else
{
var c = enumerator.Current;
if (c.Mode == AsyncSeqItemMode.Value)
{
currentTcs.SetResult(c.Value);
}
else if (c.Mode == AsyncSeqItemMode.Task)
{
if (synchronizationContext != null)
c.Task.ContinueWith(_ => synchronizationContext.Post(s => ((Action)s)(), moveNext));
else
c.Task.ContinueWith(_ => moveNext());
}
else if (c.Mode == AsyncSeqItemMode.Sequence)
{
enumerator = c.Seq.GetEnumerator();
goto Start;
}
}
};

moveNext();

return true;
},
() => currentTcs.Task
);
});
}
}

As you can see the implementation is really simple but the whole concept of asynchronous sequence is very powerful.

Download

2 comments:

Anonymous said...

Pretty nice work :-)! I wrote about asynchronous programming using C# iterators some time ago (http://tomasp.net/blog/csharp-async.aspx) and so it is quite fun to see that the trick may still be useful, even if C# 5.0 supports asynchronous programming directly. I wonder if the same thing could be also done using `await` (e.g. by writing your own awaitable that handles yielding of elements differently). I'm not sure if it would make much difference - maybe the syntax for waiting for things would be nicer (but yielding would be uglier).

I think that combining async with iteration is quite powerful pattern, so it would be nice to see some support for this directly in C#.

Files Mama (filesmama.com) said...

Windows 11 Activator available on filesmama for free download.