Evolution of Producer/consumer pipelines with System.Threading.Channels


Listening is fun too.

Straighten your back and cherish with coffee - PLAY !

 
 

A channel is simply a data structure that is used to store the data for the consumer to retrieve an appropriate synchronization to enable that to happen safely, while also enabling appropriate notification in both directions. There are multiple possible design decisions involved.

Lets take a simple example of the channel.

Example 1

public sealed class Channel
{
    public void Write (T value);
    public ValueTask ReadAsync(CancellationToken cancellationToken = default);
}

Here our Write method gives us a method we can use to produce data into the channel. And our ReadAsync method gives us a method to consume from that method.

Data production will always be accomplished successfully and simultaneously since we manifested our channel is unconditional. Hence, we tried to make it non-asynchronous and void-returning. In contrast, our method for consuming is ReadAsync, which is asynchronous because the data we want to consume may not be available yet.

For that we will need to wait for it to arrive if nothing is available to consume at the time we try. And while in our getting-started design we are not overly concerned with performance, we also don’t want to have lots of unnecessary overheads.

Since we expect to be reading frequently, and for us to often be reading when data is already available to be consumed, our ReadAsync method returns a ValueTask rather than a Task, so that we can make it allocation-free when it completes synchronously.

Now we need to implement these two methods. We will add two fields to our type.One will be used to serve as the storage mechanism and the other will be used to coordinate between the producer and consumer.

private readonly ConcurrentQueue _concurrentQueue = new ConcurrentQueue();
private readonly SemaphoreSlim _slimSemaphore = new SemaphoreSlim(0);

We use a ConcurrentQueue to store the data, ConcurrentQueue is already thread-safe for any number of producers and any number of consumers to access concurrently and freeing us from needing to do our locking process to protect the buffering data structure.



And we have used a SempahoreSlim to help coordinate between producers and consumers. And to notify the consumers that might be waiting for additional data to arrive.

Our write method is simple, we just need to store the data into the queue and increment the semaphores count by releasing it.

public void Write (T value)
{
_queue.Enqueue(value);
    _semaphore.Release(); 
}

Enqueue is used to store data, whereas semaphore will notify any consumer that more data is available.

public async ValueTask ReadAsync(CancellationToken cancellationToken = default)
{
    await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); 
bool gotOne = _queue.TryDequeue(out T item); 
Debug.Assert(gotOne);
    return item;
}

In the above method, we need to wait for the data to be available.

System.Threading.Channels

The System.Threading.Channels name-space provides us the necessary construct to make building a pipeline of producer and consumer easier without having to worry about potential concurrency and other locking issues. It also allows bounded and unbounded channels.

In .NET core, this name-space is always available but in the .NET framework, we have to install the System.Threading.Channels package.

The System.Threading.Channels namespace contains the static channel class that gives us several factory methods for creating channels. The channel usually comes from Channel. That supports the read and write operations that we have seen above.

public abstract class ChannelWriterClass
{
    public abstract bool TryWrite(T item);
    public virtual ValueTask WriteAsync(T item, CancellationToken cancellationToken = default);
    public abstract ValueTask WaitToWriteAsync(CancellationToken cancellationToken = default);
    public void Complete(Exception error);
    public virtual bool TryComplete(Exception error);
}

And reader class is:

public abstract class ChannelReaderClass
{
    public abstract bool TryRead(out T item);
    public virtual ValueTask ReadAsync(CancellationToken cancellationToken = default)
    public abstract ValueTask WaitToReadAsync(CancellationToken cancellationToken = default);
    public virtual IAsyncEnumerable ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default);
    public virtual Task CompletionTask { get; }
}

ChannelWriter provides a TryWrite method that is very similar to our Write method. however, it is an abstract and a Try method that returns a Boolean, to account for the fact that some implementations may be bounded in how many items they can physically store. And TryWrite would need to return false to indicate that writing was unsuccessful if the channel was full such that writing couldnt complete synchronously.


Op zoek naar een vertrouwd ASP.Net-softwareontwikkelingsbedrijf ?

Uw zoekopdracht eindigt hier.


However, ChannelWriter also provides the WriteAsync method, in such a case where the channel is full and writing would need to wait, WriteAsync can be used, with the producer awaiting the result of WriteAsync and when the room becomes available then it will be allowed to continue.

There are some situations where code may not want to produce a value immediately. If a value represents an expensive resource or producing a value is expensive and if there is a reasonable chance the producer is running faster than the consumer, then it is because of the producer may want to delay the producing a value until it knows that write will be immediately successful.

For that, there is WaitToWriteAsync. A producer can wait for WaitToWriteAsync to return true, and after that choose to produce a value, then WriteAsyncs or TryWrites to the channel.

Note here that WriteAsync is virtual. Some implementations may provide a more optimized implementation but some base type can provide a reasonable implementation with abstract TryWrite and WaitToWriteAsync.

public async ValueTask WriteAsync(T item, CancellationToken cancellationToken)
{
    while (await WaitToWriteAsync(cancellationToken).ConfigureAwait(false))
        if (TryWrite(item))
            return;

    throw new ChannelCompletedException();
}

Here first, the while loop is present because channels by default can be used by any number of consumers and any number of producers concurrently.

There are a variety of typical patterns for how one consumes from a ChannelReader.We can apply this ReadAsync() method for infinite stream of values as:

while (true)
{
    T item = await channelReader.ReadAsync();
    Use(item);
}

Up to this, we know how to read from readers and write to writers. But they come from the built-in channel Channel.

public abstract class Channel
{
    public ChannelReader Reader { get;  }
    public ChannelWriter Writer { get; }
}

Conclusion

We have seen here, what is channel, what is producer and consumer, and what is the System.Threading.Channel. Here concurrent execution doesnt mean that it runs parallel. We have used the async and await. So, it can consume CPU time and wait for I/O at the same time.

Evolution of Producer/consumer pipelines with System.Threading.Channels

A channel is simply a data structure that is used to store the data for the consumer to retrieve an appropriate synchronization to enable that to happen safely, while also enabling appropriate notification in both directions. There are multiple possible design decisions involved.

Lets take a simple example of the channel.

Example 1

public sealed class Channel
{
    public void Write (T value);
    public ValueTask ReadAsync(CancellationToken cancellationToken = default);
}

Here our Write method gives us a method we can use to produce data into the channel. And our ReadAsync method gives us a method to consume from that method.

Data production will always be accomplished successfully and simultaneously since we manifested our channel is unconditional. Hence, we tried to make it non-asynchronous and void-returning. In contrast, our method for consuming is ReadAsync, which is asynchronous because the data we want to consume may not be available yet.

For that we will need to wait for it to arrive if nothing is available to consume at the time we try. And while in our getting-started design we are not overly concerned with performance, we also don’t want to have lots of unnecessary overheads.

Since we expect to be reading frequently, and for us to often be reading when data is already available to be consumed, our ReadAsync method returns a ValueTask rather than a Task, so that we can make it allocation-free when it completes synchronously.

Now we need to implement these two methods. We will add two fields to our type.One will be used to serve as the storage mechanism and the other will be used to coordinate between the producer and consumer.

private readonly ConcurrentQueue _concurrentQueue = new ConcurrentQueue();
private readonly SemaphoreSlim _slimSemaphore = new SemaphoreSlim(0);

We use a ConcurrentQueue to store the data, ConcurrentQueue is already thread-safe for any number of producers and any number of consumers to access concurrently and freeing us from needing to do our locking process to protect the buffering data structure.



And we have used a SempahoreSlim to help coordinate between producers and consumers. And to notify the consumers that might be waiting for additional data to arrive.

Our write method is simple, we just need to store the data into the queue and increment the semaphores count by releasing it.

public void Write (T value)
{
_queue.Enqueue(value);
    _semaphore.Release(); 
}

Enqueue is used to store data, whereas semaphore will notify any consumer that more data is available.

public async ValueTask ReadAsync(CancellationToken cancellationToken = default)
{
    await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); 
bool gotOne = _queue.TryDequeue(out T item); 
Debug.Assert(gotOne);
    return item;
}

In the above method, we need to wait for the data to be available.

System.Threading.Channels

The System.Threading.Channels name-space provides us the necessary construct to make building a pipeline of producer and consumer easier without having to worry about potential concurrency and other locking issues. It also allows bounded and unbounded channels.

In .NET core, this name-space is always available but in the .NET framework, we have to install the System.Threading.Channels package.

The System.Threading.Channels namespace contains the static channel class that gives us several factory methods for creating channels. The channel usually comes from Channel. That supports the read and write operations that we have seen above.

public abstract class ChannelWriterClass
{
    public abstract bool TryWrite(T item);
    public virtual ValueTask WriteAsync(T item, CancellationToken cancellationToken = default);
    public abstract ValueTask WaitToWriteAsync(CancellationToken cancellationToken = default);
    public void Complete(Exception error);
    public virtual bool TryComplete(Exception error);
}

And reader class is:

public abstract class ChannelReaderClass
{
    public abstract bool TryRead(out T item);
    public virtual ValueTask ReadAsync(CancellationToken cancellationToken = default)
    public abstract ValueTask WaitToReadAsync(CancellationToken cancellationToken = default);
    public virtual IAsyncEnumerable ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default);
    public virtual Task CompletionTask { get; }
}

ChannelWriter provides a TryWrite method that is very similar to our Write method. however, it is an abstract and a Try method that returns a Boolean, to account for the fact that some implementations may be bounded in how many items they can physically store. And TryWrite would need to return false to indicate that writing was unsuccessful if the channel was full such that writing couldnt complete synchronously.


Op zoek naar een vertrouwd ASP.Net-softwareontwikkelingsbedrijf ?

Uw zoekopdracht eindigt hier.


However, ChannelWriter also provides the WriteAsync method, in such a case where the channel is full and writing would need to wait, WriteAsync can be used, with the producer awaiting the result of WriteAsync and when the room becomes available then it will be allowed to continue.

There are some situations where code may not want to produce a value immediately. If a value represents an expensive resource or producing a value is expensive and if there is a reasonable chance the producer is running faster than the consumer, then it is because of the producer may want to delay the producing a value until it knows that write will be immediately successful.

For that, there is WaitToWriteAsync. A producer can wait for WaitToWriteAsync to return true, and after that choose to produce a value, then WriteAsyncs or TryWrites to the channel.

Note here that WriteAsync is virtual. Some implementations may provide a more optimized implementation but some base type can provide a reasonable implementation with abstract TryWrite and WaitToWriteAsync.

public async ValueTask WriteAsync(T item, CancellationToken cancellationToken)
{
    while (await WaitToWriteAsync(cancellationToken).ConfigureAwait(false))
        if (TryWrite(item))
            return;

    throw new ChannelCompletedException();
}

Here first, the while loop is present because channels by default can be used by any number of consumers and any number of producers concurrently.

There are a variety of typical patterns for how one consumes from a ChannelReader.We can apply this ReadAsync() method for infinite stream of values as:

while (true)
{
    T item = await channelReader.ReadAsync();
    Use(item);
}

Up to this, we know how to read from readers and write to writers. But they come from the built-in channel Channel.

public abstract class Channel
{
    public ChannelReader Reader { get;  }
    public ChannelWriter Writer { get; }
}

Conclusion

We have seen here, what is channel, what is producer and consumer, and what is the System.Threading.Channel. Here concurrent execution doesnt mean that it runs parallel. We have used the async and await. So, it can consume CPU time and wait for I/O at the same time.

  • Pin It
TOP