Streams
Stream Request Handlers behave exactly like regular request handlers but return an AsyncEnumerable instead of a single value. This allows you to continously send data or efficiently stream data until completion.
Creating a Stream
Section titled “Creating a Stream”-
First off, let’s create our request contract.
public record MyStreamRequest(string SomeArg) : IStreamRequest; -
Next up, let’s create a handler to process our requests.
public class MyStreamHandler : IStreamRequestHandler<MyStreamRequest, string>{public async IAsyncEnumerable<string> Handle(MyStreamRequest request, IMediatorContext context, CancellationToken cancellationToken){for (int i = 0; i < 10; i++){yield return $"Hello {request.SomeArg} {i}";await Task.Delay(1000);}}} -
Now to wire up our handler with DI
ServiceCollection services;// 1. Using AddSingleton or AddScopedservices.AddSingleton<IStreamRequestHandler<MyStreamRequest, string>, MyStreamHandler>();services.AddScoped<IStreamRequestHandler<MyStreamRequest, string>, MyStreamHandler>();// 2. USING OUR EASY EXTENSION METHOD TO ADD AGAINST ALL INTERFACES THE CLASS IMPLEMENTSservices.AddSingletonAsImplementedInterfaces<MyStreamHandler>();services.AddScopedAsImplementedInterfaces<MyStreamHandler>();// 3. (RECOMMENDED DUE TO AOT) LETTING OUR SOURCE GENERATION DO THE WORK FOR YOU WITH A SIMPLE ATTRIBUTE[MediatorSingleton] // OR [MediatorScoped]public class MyStreamHandler : IStreamRequestHandler<MyStreamRequest, string> -
Last - let’s call our stream
IMediator mediator; // injectedvar response = await mediator.Request(new MyStreamRequest("World"));await foreach (var item in response.Result){Console.WriteLine(item);}
Middleware
Section titled “Middleware”Stream middleware is a bit different than request/event middleware in that it survives the entire stream allowing you to post process the stream data.
Here is a simple example of a stream middleware that converts all items to uppercase:
public class MyStreamMiddleware : IStreamMiddleware<MyStreamRequest, string>{ public async IAsyncEnumerable<string> Process(IMediatorContext context, StreamHandlerDelegate<string> next, CancellationToken cancellationToken) { await foreach (var item in next()) { // here, you can do whatever you want on top of the stream yield return item.ToUpper(); } }}