From c78367c29c9a8ab1f9427100b03fe4fdd3abceab Mon Sep 17 00:00:00 2001 From: meysamhadeli Date: Fri, 17 Jun 2022 22:44:59 +0430 Subject: [PATCH] add inbox pattern --- .../MassTransit/ConsumeFilter.cs | 38 +++++++++++++ src/BuildingBlocks/MassTransit/Extensions.cs | 2 + .../IPersistMessageProcessor.cs | 9 +++- .../PersistMessageProcessor.cs | 54 +++++++++++++------ 4 files changed, 85 insertions(+), 18 deletions(-) create mode 100644 src/BuildingBlocks/MassTransit/ConsumeFilter.cs diff --git a/src/BuildingBlocks/MassTransit/ConsumeFilter.cs b/src/BuildingBlocks/MassTransit/ConsumeFilter.cs new file mode 100644 index 0000000..297a85d --- /dev/null +++ b/src/BuildingBlocks/MassTransit/ConsumeFilter.cs @@ -0,0 +1,38 @@ +using BuildingBlocks.Core.Event; +using BuildingBlocks.MessageProcessor; +using MassTransit; + +namespace BuildingBlocks.MassTransit; + +// Handle inbox messages with masstransit pipeline +public class ConsumeFilter : IFilter> + where T : class +{ + private readonly IPersistMessageProcessor _persistMessageProcessor; + + public ConsumeFilter(IPersistMessageProcessor persistMessageProcessor) + { + _persistMessageProcessor = persistMessageProcessor; + } + + public async Task Send(ConsumeContext context, IPipe> next) + { + var id = await _persistMessageProcessor.AddReceivedMessageAsync( + new MessageEnvelope( + context.Message, + context.Headers.ToDictionary(x => x.Key, x => x.Value)) + ); + + var message = await _persistMessageProcessor.ExistMessageAsync(id); + + if (message is null) + { + await next.Send(context); + await _persistMessageProcessor.ProcessInboxAsync(id); + } + } + + public void Probe(ProbeContext context) + { + } +} diff --git a/src/BuildingBlocks/MassTransit/Extensions.cs b/src/BuildingBlocks/MassTransit/Extensions.cs index 4a7141d..edcd4c4 100644 --- a/src/BuildingBlocks/MassTransit/Extensions.cs +++ b/src/BuildingBlocks/MassTransit/Extensions.cs @@ -54,6 +54,8 @@ public static class Extensions ? type.Name.Underscore() : $"{rabbitMqOptions.ExchangeName}_{type.Name.Underscore()}", e => { + e.UseConsumeFilter(typeof(ConsumeFilter<>), context); //generic filter + foreach (var consumer in consumers) { configurator.ConfigureEndpoints(context, x => x.Exclude(consumer)); diff --git a/src/BuildingBlocks/MessageProcessor/IPersistMessageProcessor.cs b/src/BuildingBlocks/MessageProcessor/IPersistMessageProcessor.cs index 54968c9..d364f99 100644 --- a/src/BuildingBlocks/MessageProcessor/IPersistMessageProcessor.cs +++ b/src/BuildingBlocks/MessageProcessor/IPersistMessageProcessor.cs @@ -14,7 +14,7 @@ public interface IPersistMessageProcessor CancellationToken cancellationToken = default) where TMessageEnvelope : MessageEnvelope; - Task AddReceivedMessageAsync( + Task AddReceivedMessageAsync( TMessageEnvelope messageEnvelope, CancellationToken cancellationToken = default) where TMessageEnvelope : MessageEnvelope; @@ -24,6 +24,13 @@ public interface IPersistMessageProcessor CancellationToken cancellationToken = default) where TCommand : class, IInternalCommand; + Task ExistMessageAsync( + Guid messageId, + CancellationToken cancellationToken = default); + + Task ProcessInboxAsync( + Guid messageId, + CancellationToken cancellationToken = default); Task ProcessAsync(Guid messageId, MessageDeliveryType deliveryType, CancellationToken cancellationToken = default); diff --git a/src/BuildingBlocks/MessageProcessor/PersistMessageProcessor.cs b/src/BuildingBlocks/MessageProcessor/PersistMessageProcessor.cs index cc42392..c12bbb7 100644 --- a/src/BuildingBlocks/MessageProcessor/PersistMessageProcessor.cs +++ b/src/BuildingBlocks/MessageProcessor/PersistMessageProcessor.cs @@ -37,10 +37,10 @@ public class PersistMessageProcessor : IPersistMessageProcessor await SavePersistMessageAsync(messageEnvelope, MessageDeliveryType.Outbox, cancellationToken); } - public async Task AddReceivedMessageAsync(TMessageEnvelope messageEnvelope, + public Task AddReceivedMessageAsync(TMessageEnvelope messageEnvelope, CancellationToken cancellationToken = default) where TMessageEnvelope : MessageEnvelope { - await SavePersistMessageAsync(messageEnvelope, MessageDeliveryType.Inbox, cancellationToken); + return SavePersistMessageAsync(messageEnvelope, MessageDeliveryType.Inbox, cancellationToken); } public async Task AddInternalMessageAsync(TCommand internalCommand, @@ -50,6 +50,15 @@ public class PersistMessageProcessor : IPersistMessageProcessor cancellationToken); } + public Task ExistMessageAsync(Guid messageId, CancellationToken cancellationToken = default) + { + return _dbContext.PersistMessages.FirstOrDefaultAsync(x => + x.Id == messageId && + x.DeliveryType == MessageDeliveryType.Inbox && + x.MessageStatus == MessageStatus.Processed, + cancellationToken); + } + public async Task ProcessAsync( Guid messageId, MessageDeliveryType deliveryType, @@ -64,24 +73,18 @@ public class PersistMessageProcessor : IPersistMessageProcessor switch (deliveryType) { - case MessageDeliveryType.Inbox: - await ProcessInbox(message, cancellationToken); - break; case MessageDeliveryType.Internal: - await ProcessInternal(message, cancellationToken); + await ProcessInternalAsync(message, cancellationToken); break; case MessageDeliveryType.Outbox: - await ProcessOutbox(message, cancellationToken); + await ProcessOutboxAsync(message, cancellationToken); break; } - message.ChangeState(MessageStatus.Processed); - - _dbContext.PersistMessages.Update(message); - - await _dbContext.SaveChangesAsync(cancellationToken); + await ChangeMessageStatusAsync(message, cancellationToken); } + public async Task ProcessAllAsync(CancellationToken cancellationToken = default) { var messages = await _dbContext.PersistMessages.Where(x => x.MessageStatus != MessageStatus.Processed) @@ -93,7 +96,18 @@ public class PersistMessageProcessor : IPersistMessageProcessor } } - private async Task SavePersistMessageAsync( + public async Task ProcessInboxAsync(Guid messageId, CancellationToken cancellationToken = default) + { + var message = await _dbContext.PersistMessages.FirstOrDefaultAsync( + x => x.Id == messageId && + x.DeliveryType == MessageDeliveryType.Inbox && + x.MessageStatus == MessageStatus.InProgress, + cancellationToken); + + await ChangeMessageStatusAsync(message, cancellationToken); + } + + private async Task SavePersistMessageAsync( MessageEnvelope messageEnvelope, MessageDeliveryType deliveryType, CancellationToken cancellationToken = default) @@ -124,9 +138,11 @@ public class PersistMessageProcessor : IPersistMessageProcessor "Message with id: {MessageID} and delivery type: {DeliveryType} saved in persistence message store.", id, deliveryType.ToString()); + + return id; } - private async Task ProcessOutbox(PersistMessage message, CancellationToken cancellationToken) + private async Task ProcessOutboxAsync(PersistMessage message, CancellationToken cancellationToken) { MessageEnvelope? messageEnvelope = JsonSerializer.Deserialize(message.Data); @@ -153,7 +169,7 @@ public class PersistMessageProcessor : IPersistMessageProcessor } } - private async Task ProcessInternal(PersistMessage message, CancellationToken cancellationToken) + private async Task ProcessInternalAsync(PersistMessage message, CancellationToken cancellationToken) { MessageEnvelope? messageEnvelope = JsonSerializer.Deserialize(message.Data); @@ -174,8 +190,12 @@ public class PersistMessageProcessor : IPersistMessageProcessor } } - private Task ProcessInbox(PersistMessage message, CancellationToken cancellationToken) + private async Task ChangeMessageStatusAsync(PersistMessage message, CancellationToken cancellationToken) { - return Task.CompletedTask; + message.ChangeState(MessageStatus.Processed); + + _dbContext.PersistMessages.Update(message); + + await _dbContext.SaveChangesAsync(cancellationToken); } }