diff --git a/src/BuildingBlocks/MassTransit/Extensions.cs b/src/BuildingBlocks/MassTransit/Extensions.cs index 3e72815..1f46654 100644 --- a/src/BuildingBlocks/MassTransit/Extensions.cs +++ b/src/BuildingBlocks/MassTransit/Extensions.cs @@ -37,65 +37,27 @@ public static class Extensions IBusRegistrationConfigurator configure, Assembly assembly) { configure.AddConsumers(assembly); + configure.AddSagaStateMachines(assembly); + configure.AddSagas(assembly); + configure.AddActivities(assembly); configure.UsingRabbitMq((context, configurator) => { var rabbitMqOptions = services.GetOptions(nameof(RabbitMqOptions)); - configurator.Host(rabbitMqOptions?.HostName, rabbitMqOptions?.Port ?? 5672, "/", h => { h.Username(rabbitMqOptions?.UserName); h.Password(rabbitMqOptions?.Password); }); + configurator.ConfigureEndpoints(context); - var integrationEventRootAssemblies = Assembly.GetAssembly(typeof(IIntegrationEvent)); - - var types = integrationEventRootAssemblies?.GetTypes() - .Where(x => x.IsAssignableTo(typeof(IIntegrationEvent)) - && !x.IsInterface - && !x.IsAbstract - && !x.IsGenericType)?.ToList(); - - if (types != null && types.Any()) - { - foreach (var type in types) - { - var consumers = assembly?.GetTypes() - .Where(x => x.IsAssignableTo(typeof(IConsumer<>).MakeGenericType(type))).ToList(); - - if (consumers != null && consumers.Any()) - configurator.ReceiveEndpoint( - string.IsNullOrEmpty(rabbitMqOptions.ExchangeName) - ? type.Name.Underscore() - : $"{rabbitMqOptions.ExchangeName}_{type.Name.Underscore()}", e => - { - e.UseConsumeFilter(typeof(ConsumeFilter<>), context); //generic filter - - foreach (var consumer in consumers) - { - //ref: https://masstransit-project.com/usage/exceptions.html#retry - //ref: https://markgossa.com/2022/06/masstransit-exponential-back-off.html - configurator.UseMessageRetry(r => AddRetryConfiguration(r)); - - configurator.ConfigureEndpoints(context, x => x.Exclude(consumer)); - var methodInfo = typeof(DependencyInjectionReceiveEndpointExtensions) - .GetMethods() - .Where(x => x.GetParameters() - .Any(p => p.ParameterType == typeof(IServiceProvider))) - .FirstOrDefault(x => x.Name == "Consumer" && x.IsGenericMethod); - - var generic = methodInfo?.MakeGenericMethod(consumer); - generic?.Invoke(e, new object[] { e, context, null }); - } - }); - } - } + configurator.UseMessageRetry(AddRetryConfiguration); }); } - private static IRetryConfigurator AddRetryConfiguration(IRetryConfigurator retryConfigurator) + private static void AddRetryConfiguration(IRetryConfigurator retryConfigurator) { retryConfigurator.Exponential( 3, @@ -103,7 +65,5 @@ public static class Extensions TimeSpan.FromMinutes(120), TimeSpan.FromMilliseconds(200)) .Ignore(); // don't retry if we have invalid data and message goes to _error queue masstransit - - return retryConfigurator; } }