Streams
Stream Request Handlers behave exactly like regular request handlers but return an AsyncEnumerable instead of a single value. This allows you to continously send data or efficiently stream data until completion.
Creating a Stream
-
First off, let’s create our request contract.
public record MyStreamRequest(string SomeArg) : IStreamRequest; -
Next up, let’s create a handler to process our requests.
public class MyStreamHandler : IStreamRequestHandler<MyStreamRequest, string>{public async IAsyncEnumerable<string> Handle(MyStreamRequest request, CancellationToken cancellationToken){for (int i = 0; i < 10; i++){yield return $"Hello {request.SomeArg} {i}";await Task.Delay(1000);}}} -
Now to wire up our handler with DI
// in your host builderservices.AddSingletonAsImplementedInterfaces<MyStreamHandler>(); -
Last - let’s call our stream
IMediator mediator; // injectedvar stream = await mediator.Request(new MyStreamRequest("World"));await foreach (var item in stream){Console.WriteLine(item);}
Middleware
Stream middleware is a bit different than request/event middleware in that it survives the entire stream allowing you to post process the stream data.
Here is a simple example of a stream middleware that converts all items to uppercase:
public class MyStreamMiddleware : IStreamMiddleware<MyStreamRequest, string>{ public async IAsyncEnumerable<string> Process(MyStreamRequest request, StreamHandlerDelegate<string> next, IStreamRequestHandler<TRequest, TResult> requestHandler, CancellationToken cancellationToken) { await foreach (var item in next()) { // here, you can do whatever you want on top of the stream yield return item.ToUpper(); } }}