Merge pull request #19 from meysamhadeli/develop

add inbox pattern
This commit is contained in:
Meysam Hadeli 2022-06-17 22:46:05 +04:30 committed by GitHub
commit 91950f6f5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 85 additions and 18 deletions

View File

@ -0,0 +1,38 @@
using BuildingBlocks.Core.Event;
using BuildingBlocks.MessageProcessor;
using MassTransit;
namespace BuildingBlocks.MassTransit;
// Handle inbox messages with masstransit pipeline
public class ConsumeFilter<T> : IFilter<ConsumeContext<T>>
where T : class
{
private readonly IPersistMessageProcessor _persistMessageProcessor;
public ConsumeFilter(IPersistMessageProcessor persistMessageProcessor)
{
_persistMessageProcessor = persistMessageProcessor;
}
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
{
var id = await _persistMessageProcessor.AddReceivedMessageAsync(
new MessageEnvelope(
context.Message,
context.Headers.ToDictionary(x => x.Key, x => x.Value))
);
var message = await _persistMessageProcessor.ExistMessageAsync(id);
if (message is null)
{
await next.Send(context);
await _persistMessageProcessor.ProcessInboxAsync(id);
}
}
public void Probe(ProbeContext context)
{
}
}

View File

@ -54,6 +54,8 @@ public static class Extensions
? type.Name.Underscore()
: $"{rabbitMqOptions.ExchangeName}_{type.Name.Underscore()}", e =>
{
e.UseConsumeFilter(typeof(ConsumeFilter<>), context); //generic filter
foreach (var consumer in consumers)
{
configurator.ConfigureEndpoints(context, x => x.Exclude(consumer));

View File

@ -14,7 +14,7 @@ public interface IPersistMessageProcessor
CancellationToken cancellationToken = default)
where TMessageEnvelope : MessageEnvelope;
Task AddReceivedMessageAsync<TMessageEnvelope>(
Task<Guid> AddReceivedMessageAsync<TMessageEnvelope>(
TMessageEnvelope messageEnvelope,
CancellationToken cancellationToken = default)
where TMessageEnvelope : MessageEnvelope;
@ -24,6 +24,13 @@ public interface IPersistMessageProcessor
CancellationToken cancellationToken = default)
where TCommand : class, IInternalCommand;
Task<PersistMessage> ExistMessageAsync(
Guid messageId,
CancellationToken cancellationToken = default);
Task ProcessInboxAsync(
Guid messageId,
CancellationToken cancellationToken = default);
Task ProcessAsync(Guid messageId, MessageDeliveryType deliveryType, CancellationToken cancellationToken = default);

View File

@ -37,10 +37,10 @@ public class PersistMessageProcessor : IPersistMessageProcessor
await SavePersistMessageAsync(messageEnvelope, MessageDeliveryType.Outbox, cancellationToken);
}
public async Task AddReceivedMessageAsync<TMessageEnvelope>(TMessageEnvelope messageEnvelope,
public Task<Guid> AddReceivedMessageAsync<TMessageEnvelope>(TMessageEnvelope messageEnvelope,
CancellationToken cancellationToken = default) where TMessageEnvelope : MessageEnvelope
{
await SavePersistMessageAsync(messageEnvelope, MessageDeliveryType.Inbox, cancellationToken);
return SavePersistMessageAsync(messageEnvelope, MessageDeliveryType.Inbox, cancellationToken);
}
public async Task AddInternalMessageAsync<TCommand>(TCommand internalCommand,
@ -50,6 +50,15 @@ public class PersistMessageProcessor : IPersistMessageProcessor
cancellationToken);
}
public Task<PersistMessage> ExistMessageAsync(Guid messageId, CancellationToken cancellationToken = default)
{
return _dbContext.PersistMessages.FirstOrDefaultAsync(x =>
x.Id == messageId &&
x.DeliveryType == MessageDeliveryType.Inbox &&
x.MessageStatus == MessageStatus.Processed,
cancellationToken);
}
public async Task ProcessAsync(
Guid messageId,
MessageDeliveryType deliveryType,
@ -64,24 +73,18 @@ public class PersistMessageProcessor : IPersistMessageProcessor
switch (deliveryType)
{
case MessageDeliveryType.Inbox:
await ProcessInbox(message, cancellationToken);
break;
case MessageDeliveryType.Internal:
await ProcessInternal(message, cancellationToken);
await ProcessInternalAsync(message, cancellationToken);
break;
case MessageDeliveryType.Outbox:
await ProcessOutbox(message, cancellationToken);
await ProcessOutboxAsync(message, cancellationToken);
break;
}
message.ChangeState(MessageStatus.Processed);
_dbContext.PersistMessages.Update(message);
await _dbContext.SaveChangesAsync(cancellationToken);
await ChangeMessageStatusAsync(message, cancellationToken);
}
public async Task ProcessAllAsync(CancellationToken cancellationToken = default)
{
var messages = await _dbContext.PersistMessages.Where(x => x.MessageStatus != MessageStatus.Processed)
@ -93,7 +96,18 @@ public class PersistMessageProcessor : IPersistMessageProcessor
}
}
private async Task SavePersistMessageAsync(
public async Task ProcessInboxAsync(Guid messageId, CancellationToken cancellationToken = default)
{
var message = await _dbContext.PersistMessages.FirstOrDefaultAsync(
x => x.Id == messageId &&
x.DeliveryType == MessageDeliveryType.Inbox &&
x.MessageStatus == MessageStatus.InProgress,
cancellationToken);
await ChangeMessageStatusAsync(message, cancellationToken);
}
private async Task<Guid> SavePersistMessageAsync(
MessageEnvelope messageEnvelope,
MessageDeliveryType deliveryType,
CancellationToken cancellationToken = default)
@ -124,9 +138,11 @@ public class PersistMessageProcessor : IPersistMessageProcessor
"Message with id: {MessageID} and delivery type: {DeliveryType} saved in persistence message store.",
id,
deliveryType.ToString());
return id;
}
private async Task ProcessOutbox(PersistMessage message, CancellationToken cancellationToken)
private async Task ProcessOutboxAsync(PersistMessage message, CancellationToken cancellationToken)
{
MessageEnvelope? messageEnvelope = JsonSerializer.Deserialize<MessageEnvelope>(message.Data);
@ -153,7 +169,7 @@ public class PersistMessageProcessor : IPersistMessageProcessor
}
}
private async Task ProcessInternal(PersistMessage message, CancellationToken cancellationToken)
private async Task ProcessInternalAsync(PersistMessage message, CancellationToken cancellationToken)
{
MessageEnvelope? messageEnvelope = JsonSerializer.Deserialize<MessageEnvelope>(message.Data);
@ -174,8 +190,12 @@ public class PersistMessageProcessor : IPersistMessageProcessor
}
}
private Task ProcessInbox(PersistMessage message, CancellationToken cancellationToken)
private async Task ChangeMessageStatusAsync(PersistMessage message, CancellationToken cancellationToken)
{
return Task.CompletedTask;
message.ChangeState(MessageStatus.Processed);
_dbContext.PersistMessages.Update(message);
await _dbContext.SaveChangesAsync(cancellationToken);
}
}