Skip to content

Streams

Stream Request Handlers behave exactly like regular request handlers but return an AsyncEnumerable instead of a single value. This allows you to continously send data or efficiently stream data until completion.

Creating a Stream

  1. First off, let’s create our request contract.

    public record MyStreamRequest(string SomeArg) : IStreamRequest;
  2. Next up, let’s create a handler to process our requests.

    public class MyStreamHandler : IStreamRequestHandler<MyStreamRequest, string>
    {
    public async IAsyncEnumerable<string> Handle(MyStreamRequest request, CancellationToken cancellationToken)
    {
    for (int i = 0; i < 10; i++)
    {
    yield return $"Hello {request.SomeArg} {i}";
    await Task.Delay(1000);
    }
    }
    }
  3. Now to wire up our handler with DI

    // in your host builder
    services.AddSingletonAsImplementedInterfaces<MyStreamHandler>();
  4. Last - let’s call our stream

    IMediator mediator; // injected
    var stream = await mediator.Request(new MyStreamRequest("World"));
    await foreach (var item in stream)
    {
    Console.WriteLine(item);
    }

Middleware

Stream middleware is a bit different than request/event middleware in that it survives the entire stream allowing you to post process the stream data.

Here is a simple example of a stream middleware that converts all items to uppercase:

public class MyStreamMiddleware : IStreamMiddleware<MyStreamRequest, string>
{
public async IAsyncEnumerable<string> Process(MyStreamRequest request, StreamHandlerDelegate<string> next, IStreamRequestHandler<TRequest, TResult> requestHandler, CancellationToken cancellationToken)
{
await foreach (var item in next())
{
// here, you can do whatever you want on top of the stream
yield return item.ToUpper();
}
}
}