Add Outbox and Internal Command

This commit is contained in:
meysamhadeli 2022-06-17 01:44:05 +04:30
parent 489218dc95
commit aab226ae7a
134 changed files with 2202 additions and 248 deletions

View File

@ -269,7 +269,7 @@ dotnet_diagnostic.S3903.severity = None
# #
# MA0004: Use Task.ConfigureAwait(false) # MA0004: Use Task.ConfigureAwait(false)
dotnet_diagnostic.MA0004.severity = Suggestion dotnet_diagnostic.MA0004.severity = None
# MA0049: Type name should not match containing namespace # MA0049: Type name should not match containing namespace
dotnet_diagnostic.MA0049.severity = Suggestion dotnet_diagnostic.MA0049.severity = Suggestion

View File

@ -1,4 +1,4 @@
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
namespace BuildingBlocks.Contracts.EventBus.Messages; namespace BuildingBlocks.Contracts.EventBus.Messages;

View File

@ -1,4 +1,4 @@
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
namespace BuildingBlocks.Contracts.EventBus.Messages; namespace BuildingBlocks.Contracts.EventBus.Messages;

View File

@ -1,4 +1,4 @@
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
namespace BuildingBlocks.Contracts.EventBus.Messages; namespace BuildingBlocks.Contracts.EventBus.Messages;

View File

@ -0,0 +1,12 @@
using MediatR;
namespace BuildingBlocks.Core.CQRS;
public interface ICommand : ICommand<Unit>
{
}
public interface ICommand<out T> : IRequest<T>
where T : notnull
{
}

View File

@ -0,0 +1,14 @@
using MediatR;
namespace BuildingBlocks.Core.CQRS;
public interface ICommandHandler<in TCommand> : ICommandHandler<TCommand, Unit>
where TCommand : ICommand<Unit>
{
}
public interface ICommandHandler<in TCommand, TResponse> : IRequestHandler<TCommand, TResponse>
where TCommand : ICommand<TResponse>
where TResponse : notnull
{
}

View File

@ -0,0 +1,8 @@
using MediatR;
namespace BuildingBlocks.Core.CQRS;
public interface IQuery<out T> : IRequest<T>
where T : notnull
{
}

View File

@ -0,0 +1,9 @@
using MediatR;
namespace BuildingBlocks.Core.CQRS;
public interface IQueryHandler<in TQuery, TResponse> : IRequestHandler<TQuery, TResponse>
where TQuery : IQuery<TResponse>
where TResponse : notnull
{
}

View File

@ -1,4 +1,4 @@
namespace BuildingBlocks.Domain.Event; namespace BuildingBlocks.Core.Event;
[Flags] [Flags]
public enum EventType public enum EventType

View File

@ -0,0 +1,6 @@
namespace BuildingBlocks.Core.Event;
public interface IDomainEvent : IEvent
{
}

View File

@ -1,7 +1,6 @@
using MassTransit;
using MediatR; using MediatR;
namespace BuildingBlocks.Domain.Event; namespace BuildingBlocks.Core.Event;
public interface IEvent : INotification public interface IEvent : INotification
{ {

View File

@ -1,4 +1,4 @@
namespace BuildingBlocks.Domain.Event; namespace BuildingBlocks.Core.Event;
public interface IHaveIntegrationEvent public interface IHaveIntegrationEvent
{ {

View File

@ -1,7 +1,6 @@
using MassTransit; using MassTransit;
using MassTransit.Topology;
namespace BuildingBlocks.Domain.Event; namespace BuildingBlocks.Core.Event;
[ExcludeFromTopology] [ExcludeFromTopology]
public interface IIntegrationEvent : IEvent public interface IIntegrationEvent : IEvent

View File

@ -0,0 +1,10 @@
using BuildingBlocks.Core.CQRS;
namespace BuildingBlocks.Core.Event;
public interface IInternalCommand : ICommand
{
long Id { get; }
DateTime OccurredOn { get; }
string Type { get; }
}

View File

@ -0,0 +1,13 @@
using BuildingBlocks.IdsGenerator;
using BuildingBlocks.Utils;
namespace BuildingBlocks.Core.Event;
public class InternalCommand : IInternalCommand
{
public long Id { get; set; } = SnowFlakIdGenerator.NewId();
public DateTime OccurredOn => DateTime.Now;
public string Type { get => TypeProvider.GetTypeName(GetType()); }
}

View File

@ -0,0 +1,26 @@
using Google.Protobuf;
namespace BuildingBlocks.Core.Event;
public class MessageEnvelope
{
public MessageEnvelope(object? message, IDictionary<string, object?>? headers = null)
{
Message = message;
Headers = headers ?? new Dictionary<string, object?>();
}
public object? Message { get; init; }
public IDictionary<string, object?> Headers { get; init; }
}
public class MessageEnvelope<TMessage> : MessageEnvelope
where TMessage : class, IMessage
{
public MessageEnvelope(TMessage message, IDictionary<string, object?> header) : base(message, header)
{
Message = message;
}
public new TMessage? Message { get; }
}

View File

@ -1,88 +1,71 @@
using System.Security.Claims; using System.Security.Claims;
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
using BuildingBlocks.MessageProcessor;
using BuildingBlocks.Web; using BuildingBlocks.Web;
using MassTransit;
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using MessageEnvelope = BuildingBlocks.Core.Event.MessageEnvelope;
namespace BuildingBlocks.Domain; namespace BuildingBlocks.Core;
public sealed class BusPublisher : IBusPublisher public sealed class EventDispatcher : IEventDispatcher
{ {
private readonly IEventMapper _eventMapper; private readonly IEventMapper _eventMapper;
private readonly ILogger<BusPublisher> _logger; private readonly ILogger<EventDispatcher> _logger;
private readonly IPublishEndpoint _publishEndpoint; private readonly IPersistMessageProcessor _persistMessageProcessor;
private readonly IHttpContextAccessor _httpContextAccessor; private readonly IHttpContextAccessor _httpContextAccessor;
private readonly IServiceScopeFactory _serviceScopeFactory; private readonly IServiceScopeFactory _serviceScopeFactory;
public BusPublisher(IServiceScopeFactory serviceScopeFactory, public EventDispatcher(IServiceScopeFactory serviceScopeFactory,
IEventMapper eventMapper, IEventMapper eventMapper,
ILogger<BusPublisher> logger, ILogger<EventDispatcher> logger,
IPublishEndpoint publishEndpoint, IPersistMessageProcessor persistMessageProcessor,
IHttpContextAccessor httpContextAccessor) IHttpContextAccessor httpContextAccessor)
{ {
_serviceScopeFactory = serviceScopeFactory; _serviceScopeFactory = serviceScopeFactory;
_eventMapper = eventMapper; _eventMapper = eventMapper;
_logger = logger; _logger = logger;
_publishEndpoint = publishEndpoint; _persistMessageProcessor = persistMessageProcessor;
_httpContextAccessor = httpContextAccessor; _httpContextAccessor = httpContextAccessor;
} }
public async Task SendAsync(IDomainEvent domainEvent, public async Task SendAsync(IDomainEvent domainEvent,
CancellationToken cancellationToken = default) => await SendAsync(new[] { domainEvent }, cancellationToken); CancellationToken cancellationToken = default) => await SendAsync(new[] {domainEvent}, cancellationToken);
public async Task SendAsync(IReadOnlyList<IDomainEvent> domainEvents, CancellationToken cancellationToken = default) public async Task SendAsync(IReadOnlyList<IDomainEvent> domainEvents, CancellationToken cancellationToken = default)
{ {
if (domainEvents is null) return; if (domainEvents is null) return;
_logger.LogTrace("Processing integration events start...");
var integrationEvents = await MapDomainEventToIntegrationEventAsync(domainEvents).ConfigureAwait(false); var integrationEvents = await MapDomainEventToIntegrationEventAsync(domainEvents).ConfigureAwait(false);
if (!integrationEvents.Any()) return; if (integrationEvents.Count == 0) return;
await PublishMessageToBroker(integrationEvents, cancellationToken); foreach (var integrationEvent in integrationEvents)
{
_logger.LogTrace("Processing integration events done..."); await _persistMessageProcessor.PublishMessageAsync(new MessageEnvelope(integrationEvent, SetHeaders()),
cancellationToken);
}
} }
public async Task SendAsync(IIntegrationEvent integrationEvent, public async Task SendAsync(IIntegrationEvent integrationEvent,
CancellationToken cancellationToken = default) => await SendAsync(new[] { integrationEvent }, cancellationToken); CancellationToken cancellationToken = default) => await SendAsync(new[] {integrationEvent}, cancellationToken);
public async Task SendAsync(IReadOnlyList<IIntegrationEvent> integrationEvents, CancellationToken cancellationToken = default) public async Task SendAsync(IReadOnlyList<IIntegrationEvent> integrationEvents,
CancellationToken cancellationToken = default)
{ {
if (integrationEvents is null) return; if (integrationEvents is null) return;
_logger.LogTrace("Processing integration events start..."); await _persistMessageProcessor.PublishMessageAsync(new MessageEnvelope(integrationEvents, SetHeaders()),
cancellationToken);
await PublishMessageToBroker(integrationEvents, cancellationToken);
_logger.LogTrace("Processing integration events done...");
}
private async Task PublishMessageToBroker(IReadOnlyList<IIntegrationEvent> integrationEvents, CancellationToken cancellationToken)
{
foreach (var integrationEvent in integrationEvents)
{
await _publishEndpoint.Publish((object) integrationEvent, context =>
{
context.CorrelationId = _httpContextAccessor?.HttpContext?.GetCorrelationId();
context.Headers.Set("UserId",
_httpContextAccessor?.HttpContext?.User?.FindFirstValue(ClaimTypes.NameIdentifier));
context.Headers.Set("UserName",
_httpContextAccessor?.HttpContext?.User?.FindFirstValue(ClaimTypes.Name));
}, cancellationToken);
_logger.LogTrace("Publish a message with ID: {Id}", integrationEvent?.EventId);
}
} }
private Task<IReadOnlyList<IIntegrationEvent>> MapDomainEventToIntegrationEventAsync( private Task<IReadOnlyList<IIntegrationEvent>> MapDomainEventToIntegrationEventAsync(
IReadOnlyList<IDomainEvent> events) IReadOnlyList<IDomainEvent> events)
{ {
_logger.LogTrace("Processing integration events start...");
var wrappedIntegrationEvents = GetWrappedIntegrationEvents(events.ToList())?.ToList(); var wrappedIntegrationEvents = GetWrappedIntegrationEvents(events.ToList())?.ToList();
if (wrappedIntegrationEvents?.Count > 0) if (wrappedIntegrationEvents?.Count > 0)
return Task.FromResult<IReadOnlyList<IIntegrationEvent>>(wrappedIntegrationEvents); return Task.FromResult<IReadOnlyList<IIntegrationEvent>>(wrappedIntegrationEvents);
@ -101,6 +84,8 @@ public sealed class BusPublisher : IBusPublisher
integrationEvents.Add(integrationEvent); integrationEvents.Add(integrationEvent);
} }
_logger.LogTrace("Processing integration events done...");
return Task.FromResult<IReadOnlyList<IIntegrationEvent>>(integrationEvents); return Task.FromResult<IReadOnlyList<IIntegrationEvent>>(integrationEvents);
} }
@ -118,4 +103,14 @@ public sealed class BusPublisher : IBusPublisher
yield return domainNotificationEvent; yield return domainNotificationEvent;
} }
} }
private IDictionary<string, object> SetHeaders()
{
var headers = new Dictionary<string, object>();
headers.Add("CorrelationId", _httpContextAccessor?.HttpContext?.GetCorrelationId());
headers.Add("UserId", _httpContextAccessor?.HttpContext?.User?.FindFirstValue(ClaimTypes.NameIdentifier));
headers.Add("UserName", _httpContextAccessor?.HttpContext?.User?.FindFirstValue(ClaimTypes.Name));
return headers;
}
} }

View File

@ -1,8 +1,8 @@
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
namespace BuildingBlocks.Domain; namespace BuildingBlocks.Core;
public interface IBusPublisher public interface IEventDispatcher
{ {
public Task SendAsync(IReadOnlyList<IDomainEvent> domainEvents, CancellationToken cancellationToken = default); public Task SendAsync(IReadOnlyList<IDomainEvent> domainEvents, CancellationToken cancellationToken = default);
public Task SendAsync(IDomainEvent domainEvent, CancellationToken cancellationToken = default); public Task SendAsync(IDomainEvent domainEvent, CancellationToken cancellationToken = default);

View File

@ -1,6 +1,6 @@
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
namespace BuildingBlocks.Domain; namespace BuildingBlocks.Core;
public interface IEventMapper public interface IEventMapper
{ {

View File

@ -1,6 +1,6 @@
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
namespace BuildingBlocks.Domain; namespace BuildingBlocks.Core;
public record IntegrationEventWrapper<TDomainEventType>(TDomainEventType DomainEvent) : IIntegrationEvent public record IntegrationEventWrapper<TDomainEventType>(TDomainEventType DomainEvent) : IIntegrationEvent
where TDomainEventType : IDomainEvent; where TDomainEventType : IDomainEvent;

View File

@ -1,6 +1,6 @@
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
namespace BuildingBlocks.Domain.Model namespace BuildingBlocks.Core.Model
{ {
public abstract class Aggregate : Aggregate<long> public abstract class Aggregate : Aggregate<long>
{ {

View File

@ -1,4 +1,4 @@
namespace BuildingBlocks.Domain.Model; namespace BuildingBlocks.Core.Model;
public abstract class Entity : IEntity public abstract class Entity : IEntity
{ {

View File

@ -1,6 +1,6 @@
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
namespace BuildingBlocks.Domain.Model; namespace BuildingBlocks.Core.Model;
public interface IAggregate : IEntity public interface IAggregate : IEntity
{ {

View File

@ -1,4 +1,4 @@
namespace BuildingBlocks.Domain.Model; namespace BuildingBlocks.Core.Model;
public interface IEntity public interface IEntity
{ {

View File

@ -1,8 +0,0 @@
using MediatR;
namespace BuildingBlocks.Domain.Event;
public interface IDomainEvent : IEvent
{
}

View File

@ -2,8 +2,8 @@ using System.Collections.Immutable;
using System.Data; using System.Data;
using System.Reflection; using System.Reflection;
using System.Security.Claims; using System.Security.Claims;
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
using BuildingBlocks.Domain.Model; using BuildingBlocks.Core.Model;
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage; using Microsoft.EntityFrameworkCore.Storage;
@ -21,6 +21,12 @@ public abstract class AppDbContextBase : DbContext, IDbContext
_httpContextAccessor = httpContextAccessor; _httpContextAccessor = httpContextAccessor;
} }
protected override void OnModelCreating(ModelBuilder builder)
{
// ref: https://github.com/pdevito3/MessageBusTestingInMemHarness/blob/main/RecipeManagement/src/RecipeManagement/Databases/RecipesDbContext.cs
builder.FilterSoftDeletedProperties();
}
public async Task BeginTransactionAsync(CancellationToken cancellationToken = default) public async Task BeginTransactionAsync(CancellationToken cancellationToken = default)
{ {
if (_currentTransaction != null) return; if (_currentTransaction != null) return;
@ -83,15 +89,6 @@ public abstract class AppDbContextBase : DbContext, IDbContext
return domainEvents.ToImmutableList(); return domainEvents.ToImmutableList();
} }
protected override void OnModelCreating(ModelBuilder builder)
{
builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly());
base.OnModelCreating(builder);
// ref: https://github.com/pdevito3/MessageBusTestingInMemHarness/blob/main/RecipeManagement/src/RecipeManagement/Databases/RecipesDbContext.cs
builder.FilterSoftDeletedProperties();
}
// ref: https://www.meziantou.net/entity-framework-core-generate-tracking-columns.htm // ref: https://www.meziantou.net/entity-framework-core-generate-tracking-columns.htm
// ref: https://www.meziantou.net/entity-framework-core-soft-delete-using-query-filters.htm // ref: https://www.meziantou.net/entity-framework-core-soft-delete-using-query-filters.htm
private void OnBeforeSaving() private void OnBeforeSaving()

View File

@ -1,6 +1,6 @@
using System.Data; using System.Data;
using System.Text.Json; using System.Text.Json;
using BuildingBlocks.Domain; using BuildingBlocks.Core;
using MediatR; using MediatR;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -12,16 +12,16 @@ public class EfTxBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TRe
{ {
private readonly ILogger<EfTxBehavior<TRequest, TResponse>> _logger; private readonly ILogger<EfTxBehavior<TRequest, TResponse>> _logger;
private readonly IDbContext _dbContextBase; private readonly IDbContext _dbContextBase;
private readonly IBusPublisher _busPublisher; private readonly IEventDispatcher _eventDispatcher;
public EfTxBehavior( public EfTxBehavior(
ILogger<EfTxBehavior<TRequest, TResponse>> logger, ILogger<EfTxBehavior<TRequest, TResponse>> logger,
IDbContext dbContextBase, IDbContext dbContextBase,
IBusPublisher busPublisher) IEventDispatcher eventDispatcher)
{ {
_logger = logger; _logger = logger;
_dbContextBase = dbContextBase; _dbContextBase = dbContextBase;
_busPublisher = busPublisher; _eventDispatcher = eventDispatcher;
} }
public async Task<TResponse> Handle( public async Task<TResponse> Handle(
@ -60,7 +60,7 @@ public class EfTxBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TRe
var domainEvents = _dbContextBase.GetDomainEvents(); var domainEvents = _dbContextBase.GetDomainEvents();
await _busPublisher.SendAsync(domainEvents.ToArray(), cancellationToken); await _eventDispatcher.SendAsync(domainEvents.ToArray(), cancellationToken);
return response; return response;
} }

View File

@ -1,5 +1,5 @@
using System.Linq.Expressions; using System.Linq.Expressions;
using BuildingBlocks.Domain.Model; using BuildingBlocks.Core.Model;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Query; using Microsoft.EntityFrameworkCore.Query;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;

View File

@ -1,5 +1,6 @@
using System.Data; using System.Data;
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
using BuildingBlocks.MessageProcessor;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
namespace BuildingBlocks.EFCore; namespace BuildingBlocks.EFCore;
@ -8,6 +9,7 @@ public interface IDbContext
{ {
DbSet<TEntity> Set<TEntity>() DbSet<TEntity> Set<TEntity>()
where TEntity : class; where TEntity : class;
DbSet<PersistMessage> PersistMessages => Set<PersistMessage>();
IReadOnlyList<IDomainEvent> GetDomainEvents(); IReadOnlyList<IDomainEvent> GetDomainEvents();
Task BeginTransactionAsync(CancellationToken cancellationToken = default); Task BeginTransactionAsync(CancellationToken cancellationToken = default);
Task CommitTransactionAsync(CancellationToken cancellationToken = default); Task CommitTransactionAsync(CancellationToken cancellationToken = default);

View File

@ -1,5 +1,5 @@
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
using BuildingBlocks.Domain.Model; using BuildingBlocks.Core.Model;
namespace BuildingBlocks.EventStoreDB.Events namespace BuildingBlocks.EventStoreDB.Events
{ {

View File

@ -1,5 +1,4 @@
using BuildingBlocks.Domain.Model; using BuildingBlocks.EventStoreDB.Serialization;
using BuildingBlocks.EventStoreDB.Serialization;
using EventStore.Client; using EventStore.Client;
namespace BuildingBlocks.EventStoreDB.Events; namespace BuildingBlocks.EventStoreDB.Events;

View File

@ -1,5 +1,5 @@
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
using BuildingBlocks.Domain.Model; using BuildingBlocks.Core.Model;
namespace BuildingBlocks.EventStoreDB.Events namespace BuildingBlocks.EventStoreDB.Events
{ {

View File

@ -1,4 +1,4 @@
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
using MediatR; using MediatR;
namespace BuildingBlocks.EventStoreDB.Events; namespace BuildingBlocks.EventStoreDB.Events;

View File

@ -1,4 +1,4 @@
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
namespace BuildingBlocks.EventStoreDB.Events; namespace BuildingBlocks.EventStoreDB.Events;

View File

@ -1,4 +1,4 @@
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
namespace BuildingBlocks.EventStoreDB.Events; namespace BuildingBlocks.EventStoreDB.Events;

View File

@ -1,4 +1,3 @@
using BuildingBlocks.Domain.Model;
using BuildingBlocks.EventStoreDB.Events; using BuildingBlocks.EventStoreDB.Events;
using BuildingBlocks.Exception; using BuildingBlocks.Exception;

View File

@ -1,4 +1,4 @@
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
using BuildingBlocks.EventStoreDB.Events; using BuildingBlocks.EventStoreDB.Events;
using BuildingBlocks.EventStoreDB.Serialization; using BuildingBlocks.EventStoreDB.Serialization;
using EventStore.Client; using EventStore.Client;

View File

@ -1,5 +1,5 @@
using System.Reflection; using System.Reflection;
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
using BuildingBlocks.Utils; using BuildingBlocks.Utils;
using BuildingBlocks.Web; using BuildingBlocks.Web;
using Humanizer; using Humanizer;

View File

@ -0,0 +1,21 @@
using BuildingBlocks.Core;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
namespace BuildingBlocks.MessageProcessor;
public static class Extensions
{
public static IServiceCollection AddPersistMessage(this IServiceCollection services, IConfiguration configuration)
{
services.AddOptions<PersistMessageOptions>()
.Bind(configuration.GetSection(nameof(PersistMessageOptions)))
.ValidateDataAnnotations();
services.AddScoped<IPersistMessageProcessor, PersistMessageProcessor>();
services.AddScoped<IEventDispatcher, EventDispatcher>();
services.AddHostedService<PersistMessageBackgroundService>();
return services;
}
}

View File

@ -0,0 +1,31 @@
using BuildingBlocks.Core.Event;
namespace BuildingBlocks.MessageProcessor;
// Ref: http://www.kamilgrzybek.com/design/the-outbox-pattern/
// Ref: https://event-driven.io/en/outbox_inbox_patterns_and_delivery_guarantees_explained/
// Ref: https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/
// Ref: https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/subscribe-events#designing-atomicity-and-resiliency-when-publishing-to-the-event-bus
// Ref: https://github.com/kgrzybek/modular-monolith-with-ddd#38-internal-processing
public interface IPersistMessageProcessor
{
Task PublishMessageAsync<TMessageEnvelope>(
TMessageEnvelope messageEnvelope,
CancellationToken cancellationToken = default)
where TMessageEnvelope : MessageEnvelope;
Task AddReceivedMessageAsync<TMessageEnvelope>(
TMessageEnvelope messageEnvelope,
CancellationToken cancellationToken = default)
where TMessageEnvelope : MessageEnvelope;
Task AddInternalMessageAsync<TCommand>(
TCommand internalCommand,
CancellationToken cancellationToken = default)
where TCommand : class, IInternalCommand;
Task ProcessAsync(Guid messageId, MessageDeliveryType deliveryType, CancellationToken cancellationToken = default);
Task ProcessAllAsync(CancellationToken cancellationToken = default);
}

View File

@ -0,0 +1,9 @@
namespace BuildingBlocks.MessageProcessor;
[Flags]
public enum MessageDeliveryType
{
Outbox = 1,
Inbox = 2,
Internal = 4
}

View File

@ -0,0 +1,7 @@
namespace BuildingBlocks.MessageProcessor;
public enum MessageStatus
{
InProgress = 1,
Processed = 2
}

View File

@ -0,0 +1,33 @@
namespace BuildingBlocks.MessageProcessor;
public class PersistMessage
{
public PersistMessage(Guid id, string dataType, string data, MessageDeliveryType deliveryType)
{
Id = id;
DataType = dataType;
Data = data;
DeliveryType = deliveryType;
Created = DateTime.Now;
MessageStatus = MessageStatus.InProgress;
RetryCount = 0;
}
public Guid Id { get; private set; }
public string DataType { get; private set; }
public string Data { get; private set; }
public DateTime Created { get; private set; }
public int RetryCount { get; private set; }
public MessageStatus MessageStatus { get; private set; }
public MessageDeliveryType DeliveryType { get; private set; }
public void ChangeState(MessageStatus messageStatus)
{
MessageStatus = messageStatus;
}
public void IncreaseRetry()
{
RetryCount++;
}
}

View File

@ -0,0 +1,59 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace BuildingBlocks.MessageProcessor;
public class PersistMessageBackgroundService : BackgroundService
{
private readonly ILogger<PersistMessageBackgroundService> _logger;
private readonly IServiceProvider _serviceProvider;
private PersistMessageOptions _options;
private Task? _executingTask;
public PersistMessageBackgroundService(
ILogger<PersistMessageBackgroundService> logger,
IServiceProvider serviceProvider,
IOptions<PersistMessageOptions> options)
{
_logger = logger;
_serviceProvider = serviceProvider;
_options = options.Value;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("PersistMessage Background Service Start");
_executingTask = ProcessAsync(stoppingToken);
return _executingTask;
}
public override Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("PersistMessage Background Service Stop");
return base.StopAsync(cancellationToken);
}
private async Task ProcessAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await using (var scope = _serviceProvider.CreateAsyncScope())
{
var service = scope.ServiceProvider.GetRequiredService<IPersistMessageProcessor>();
await service.ProcessAllAsync(stoppingToken);
}
var delay = _options.Interval is { }
? TimeSpan.FromSeconds((int)_options.Interval)
: TimeSpan.FromSeconds(30);
await Task.Delay(delay, stoppingToken);
}
}
}

View File

@ -0,0 +1,7 @@
namespace BuildingBlocks.MessageProcessor;
public class PersistMessageOptions
{
public int? Interval { get; set; } = 30;
public bool Enabled { get; set; } = true;
}

View File

@ -0,0 +1,181 @@
using System.Text.Json;
using Ardalis.GuardClauses;
using BuildingBlocks.Core.Event;
using BuildingBlocks.EFCore;
using BuildingBlocks.Utils;
using MassTransit;
using MediatR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
namespace BuildingBlocks.MessageProcessor;
public class PersistMessageProcessor : IPersistMessageProcessor
{
private readonly ILogger<PersistMessageProcessor> _logger;
private readonly IMediator _mediator;
private readonly IDbContext _dbContext;
private readonly IPublishEndpoint _publishEndpoint;
public PersistMessageProcessor(
ILogger<PersistMessageProcessor> logger,
IMediator mediator,
IDbContext dbContext,
IPublishEndpoint publishEndpoint)
{
_logger = logger;
_mediator = mediator;
_dbContext = dbContext;
_publishEndpoint = publishEndpoint;
}
public async Task PublishMessageAsync<TMessageEnvelope>(
TMessageEnvelope messageEnvelope,
CancellationToken cancellationToken = default)
where TMessageEnvelope : MessageEnvelope
{
await SavePersistMessageAsync(messageEnvelope, MessageDeliveryType.Outbox, cancellationToken);
}
public async Task AddReceivedMessageAsync<TMessageEnvelope>(TMessageEnvelope messageEnvelope,
CancellationToken cancellationToken = default) where TMessageEnvelope : MessageEnvelope
{
await SavePersistMessageAsync(messageEnvelope, MessageDeliveryType.Inbox, cancellationToken);
}
public async Task AddInternalMessageAsync<TCommand>(TCommand internalCommand,
CancellationToken cancellationToken = default) where TCommand : class, IInternalCommand
{
await SavePersistMessageAsync(new MessageEnvelope(internalCommand), MessageDeliveryType.Internal,
cancellationToken);
}
public async Task ProcessAsync(
Guid messageId,
MessageDeliveryType deliveryType,
CancellationToken cancellationToken = default)
{
var message =
await _dbContext.PersistMessages.FirstOrDefaultAsync(
x => x.Id == messageId && x.DeliveryType == deliveryType, cancellationToken);
if (message is null)
return;
switch (deliveryType)
{
case MessageDeliveryType.Inbox:
await ProcessInbox(message, cancellationToken);
break;
case MessageDeliveryType.Internal:
await ProcessInternal(message, cancellationToken);
break;
case MessageDeliveryType.Outbox:
await ProcessOutbox(message, cancellationToken);
break;
}
message.ChangeState(MessageStatus.Processed);
_dbContext.PersistMessages.Update(message);
await _dbContext.SaveChangesAsync(cancellationToken);
}
public async Task ProcessAllAsync(CancellationToken cancellationToken = default)
{
var messages = await _dbContext.PersistMessages.Where(x => x.MessageStatus != MessageStatus.Processed)
.ToListAsync(cancellationToken);
foreach (var message in messages)
{
await ProcessAsync(message.Id, message.DeliveryType, cancellationToken);
}
}
private async Task SavePersistMessageAsync(
MessageEnvelope messageEnvelope,
MessageDeliveryType deliveryType,
CancellationToken cancellationToken = default)
{
Guard.Against.Null(messageEnvelope.Message, nameof(messageEnvelope.Message));
Guid id;
if (messageEnvelope.Message is IEvent message)
{
id = message.EventId;
}
else
{
id = Guid.NewGuid();
}
await _dbContext.PersistMessages.AddAsync(
new PersistMessage(
id,
TypeProvider.GetTypeName(messageEnvelope.Message.GetType()),
JsonSerializer.Serialize(messageEnvelope),
deliveryType),
cancellationToken);
await _dbContext.SaveChangesAsync(cancellationToken);
_logger.LogInformation(
"Message with id: {MessageID} and delivery type: {DeliveryType} saved in persistence message store.",
id,
deliveryType.ToString());
}
private async Task ProcessOutbox(PersistMessage message, CancellationToken cancellationToken)
{
MessageEnvelope? messageEnvelope = JsonSerializer.Deserialize<MessageEnvelope>(message.Data);
if (messageEnvelope is null || messageEnvelope.Message is null)
return;
var data = JsonSerializer.Deserialize(messageEnvelope.Message.ToString()!,
TypeProvider.GetType(message.DataType));
if (data is IEvent)
{
await _publishEndpoint.Publish((object)data, context =>
{
foreach (var header in messageEnvelope.Headers)
{
context.Headers.Set(header.Key, header.Value);
}
}, cancellationToken);
_logger.LogInformation(
"Message with id: {MessageId} and delivery type: {DeliveryType} processed from the persistence message store.",
message.Id,
message.DeliveryType);
}
}
private async Task ProcessInternal(PersistMessage message, CancellationToken cancellationToken)
{
MessageEnvelope? messageEnvelope = JsonSerializer.Deserialize<MessageEnvelope>(message.Data);
if (messageEnvelope is null || messageEnvelope.Message is null)
return;
var data = JsonSerializer.Deserialize(messageEnvelope.Message.ToString()!,
TypeProvider.GetType(message.DataType));
if (data is IInternalCommand internalCommand)
{
await _mediator.Send(internalCommand, cancellationToken);
_logger.LogInformation(
"InternalCommand with id: {EventID} and delivery type: {DeliveryType} processed from the persistence message store.",
message.Id,
message.DeliveryType);
}
}
private Task ProcessInbox(PersistMessage message, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}

View File

@ -17,8 +17,6 @@ namespace BuildingBlocks.Mongo
where TContextService : IMongoDbContext where TContextService : IMongoDbContext
where TContextImplementation : MongoDbContext, TContextService where TContextImplementation : MongoDbContext, TContextService
{ {
var mongoOptions = configuration.GetSection(nameof(MongoOptions)).Get<MongoOptions>() ?? new MongoOptions();
services.Configure<MongoOptions>(configuration.GetSection(nameof(MongoOptions))); services.Configure<MongoOptions>(configuration.GetSection(nameof(MongoOptions)));
if (configurator is { }) if (configurator is { })
{ {

View File

@ -1,4 +1,4 @@
using BuildingBlocks.Domain.Model; using BuildingBlocks.Core.Model;
namespace BuildingBlocks.Mongo; namespace BuildingBlocks.Mongo;

View File

@ -1,5 +1,5 @@
using System.Linq.Expressions; using System.Linq.Expressions;
using BuildingBlocks.Domain.Model; using BuildingBlocks.Core.Model;
namespace BuildingBlocks.Mongo; namespace BuildingBlocks.Mongo;

View File

@ -1,5 +1,5 @@
using System.Linq.Expressions; using System.Linq.Expressions;
using BuildingBlocks.Domain.Model; using BuildingBlocks.Core.Model;
using MongoDB.Driver; using MongoDB.Driver;
namespace BuildingBlocks.Mongo; namespace BuildingBlocks.Mongo;

View File

@ -1,10 +1,14 @@
using System.Collections.Concurrent;
using System.Reflection; using System.Reflection;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using Ardalis.GuardClauses;
namespace BuildingBlocks.Utils; namespace BuildingBlocks.Utils;
public static class TypeProvider public static class TypeProvider
{ {
private static readonly ConcurrentDictionary<Type, string> TypeNameMap = new();
private static readonly ConcurrentDictionary<string, Type> TypeMap = new();
private static bool IsRecord(this Type objectType) private static bool IsRecord(this Type objectType)
{ {
return objectType.GetMethod("<Clone>$") != null || return objectType.GetMethod("<Clone>$") != null ||
@ -34,4 +38,72 @@ public static class TypeProvider
.SelectMany(a => a.GetTypes().Where(x => x.FullName == typeName || x.Name == typeName)) .SelectMany(a => a.GetTypes().Where(x => x.FullName == typeName || x.Name == typeName))
.FirstOrDefault(); .FirstOrDefault();
} }
/// <summary>
/// Gets the type name from a generic Type class.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns>GetTypeName</returns>
public static string GetTypeName<T>() => ToName(typeof(T));
/// <summary>
/// Gets the type name from a Type class.
/// </summary>
/// <param name="type"></param>
/// <returns>TypeName</returns>
public static string GetTypeName(Type type) => ToName(type);
/// <summary>
/// Gets the type name from a instance object.
/// </summary>
/// <param name="o"></param>
/// <returns>TypeName</returns>
public static string GetTypeNameByObject(object o) => ToName(o.GetType());
/// <summary>
/// Gets the type class from a type name.
/// </summary>
/// <param name="typeName"></param>
/// <returns>Type</returns>
public static Type GetType(string typeName) => ToType(typeName);
public static void AddType<T>(string name) => AddType(typeof(T), name);
private static void AddType(Type type, string name)
{
ToName(type);
ToType(name);
}
public static bool IsTypeRegistered<T>() => TypeNameMap.ContainsKey(typeof(T));
private static string ToName(Type type)
{
Guard.Against.Null(type, nameof(type));
return TypeNameMap.GetOrAdd(type, _ =>
{
var eventTypeName = type.FullName!.Replace(".", "_", StringComparison.Ordinal);
TypeMap.GetOrAdd(eventTypeName, type);
return eventTypeName;
});
}
private static Type ToType(string typeName) => TypeMap.GetOrAdd(typeName, _ =>
{
Guard.Against.NullOrEmpty(typeName, nameof(typeName));
return TypeMap.GetOrAdd(typeName, _ =>
{
var type = GetFirstMatchingTypeFromCurrentDomainAssembly(
typeName.Replace("_", ".", StringComparison.Ordinal))!;
if (type == null)
throw new System.Exception($"Type map for '{typeName}' wasn't found!");
return type;
});
});
} }

View File

@ -7,7 +7,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\Booking\Booking.csproj"/> <ProjectReference Include="..\Booking\Booking.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -2,7 +2,7 @@ using Booking;
using Booking.Configuration; using Booking.Configuration;
using Booking.Data; using Booking.Data;
using Booking.Extensions; using Booking.Extensions;
using BuildingBlocks.Domain; using BuildingBlocks.Core;
using BuildingBlocks.EFCore; using BuildingBlocks.EFCore;
using BuildingBlocks.EventStoreDB; using BuildingBlocks.EventStoreDB;
using BuildingBlocks.HealthCheck; using BuildingBlocks.HealthCheck;
@ -11,6 +11,7 @@ using BuildingBlocks.Jwt;
using BuildingBlocks.Logging; using BuildingBlocks.Logging;
using BuildingBlocks.Mapster; using BuildingBlocks.Mapster;
using BuildingBlocks.MassTransit; using BuildingBlocks.MassTransit;
using BuildingBlocks.MessageProcessor;
using BuildingBlocks.OpenTelemetry; using BuildingBlocks.OpenTelemetry;
using BuildingBlocks.Swagger; using BuildingBlocks.Swagger;
using BuildingBlocks.Web; using BuildingBlocks.Web;
@ -30,8 +31,8 @@ builder.Services.Configure<GrpcOptions>(options => configuration.GetSection("Grp
Console.WriteLine(FiggleFonts.Standard.Render(appOptions.Name)); Console.WriteLine(FiggleFonts.Standard.Render(appOptions.Name));
builder.Services.AddTransient<IBusPublisher, BusPublisher>();
builder.Services.AddCustomDbContext<BookingDbContext>(configuration); builder.Services.AddCustomDbContext<BookingDbContext>(configuration);
builder.Services.AddPersistMessage(configuration);
builder.AddCustomSerilog(); builder.AddCustomSerilog();
builder.Services.AddJwt(); builder.Services.AddJwt();
@ -46,7 +47,6 @@ builder.Services.AddCustomMapster(typeof(BookingRoot).Assembly);
builder.Services.AddHttpContextAccessor(); builder.Services.AddHttpContextAccessor();
builder.Services.AddTransient<IEventMapper, EventMapper>(); builder.Services.AddTransient<IEventMapper, EventMapper>();
builder.Services.AddTransient<IBusPublisher, BusPublisher>();
builder.Services.AddCustomHealthCheck(); builder.Services.AddCustomHealthCheck();
builder.Services.AddCustomMassTransit(typeof(BookingRoot).Assembly, env); builder.Services.AddCustomMassTransit(typeof(BookingRoot).Assembly, env);
builder.Services.AddCustomOpenTelemetry(); builder.Services.AddCustomOpenTelemetry();

View File

@ -27,5 +27,9 @@
"EventStore": { "EventStore": {
"ConnectionString": "esdb://localhost:2113?tls=false" "ConnectionString": "esdb://localhost:2113?tls=false"
}, },
"PersistMessageOptions": {
"Interval": 30,
"Enabled": true
},
"AllowedHosts": "*" "AllowedHosts": "*"
} }

View File

@ -0,0 +1,23 @@
{
"ConnectionStrings": {
"DefaultConnection": "Server=.\\sqlexpress;Database=BookingDB_Test;Trusted_Connection=True;MultipleActiveResultSets=true"
},
"RabbitMq": {
"HostName": "localhost",
"ExchangeName": "booking",
"UserName": "guest",
"Password": "guest"
},
"Logging": {
"LogLevel": {
"Default": "Debug",
"Microsoft": "Debug",
"Microsoft.Hosting.Lifetime": "Debug",
"Microsoft.EntityFrameworkCore.Database.Command": "Debug"
}
},
"PersistMessageOptions": {
"Interval": 1,
"Enabled": true
}
}

View File

@ -1,5 +1,5 @@
using Booking.Booking.Models.ValueObjects; using Booking.Booking.Models.ValueObjects;
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
namespace Booking.Booking.Events.Domain; namespace Booking.Booking.Events.Domain;

View File

@ -1,10 +1,9 @@
using BuildingBlocks.Core.CQRS;
using BuildingBlocks.IdsGenerator; using BuildingBlocks.IdsGenerator;
using MediatR;
namespace Booking.Booking.Features.CreateBooking; namespace Booking.Booking.Features.CreateBooking;
public record CreateBookingCommand public record CreateBookingCommand(long PassengerId, long FlightId, string Description) : ICommand<ulong>
(long PassengerId, long FlightId, string Description) : IRequest<ulong>
{ {
public long Id { get; set; } = SnowFlakIdGenerator.NewId(); public long Id { get; set; } = SnowFlakIdGenerator.NewId();
} }

View File

@ -2,12 +2,13 @@ using Ardalis.GuardClauses;
using Booking.Booking.Exceptions; using Booking.Booking.Exceptions;
using Booking.Booking.Models.ValueObjects; using Booking.Booking.Models.ValueObjects;
using BuildingBlocks.Contracts.Grpc; using BuildingBlocks.Contracts.Grpc;
using BuildingBlocks.Core.CQRS;
using BuildingBlocks.EventStoreDB.Repository; using BuildingBlocks.EventStoreDB.Repository;
using MediatR; using MediatR;
namespace Booking.Booking.Features.CreateBooking; namespace Booking.Booking.Features.CreateBooking;
public class CreateBookingCommandHandler : IRequestHandler<CreateBookingCommand, ulong> public class CreateBookingCommandHandler : ICommandHandler<CreateBookingCommand, ulong>
{ {
private readonly IEventStoreDBRepository<Models.Booking> _eventStoreDbRepository; private readonly IEventStoreDBRepository<Models.Booking> _eventStoreDbRepository;
private readonly IFlightGrpcService _flightGrpcService; private readonly IFlightGrpcService _flightGrpcService;

View File

@ -1,6 +1,5 @@
using Booking.Booking.Events.Domain; using Booking.Booking.Events.Domain;
using Booking.Booking.Models.ValueObjects; using Booking.Booking.Models.ValueObjects;
using BuildingBlocks.Domain.Model;
using BuildingBlocks.EventStoreDB.Events; using BuildingBlocks.EventStoreDB.Events;
namespace Booking.Booking.Models; namespace Booking.Booking.Models;

View File

@ -7,6 +7,8 @@ namespace Booking.Data;
public class BookingDbContext : AppDbContextBase public class BookingDbContext : AppDbContextBase
{ {
public const string DefaultSchema = "dbo";
public BookingDbContext(DbContextOptions options, IHttpContextAccessor httpContextAccessor) : base(options, httpContextAccessor) public BookingDbContext(DbContextOptions options, IHttpContextAccessor httpContextAccessor) : base(options, httpContextAccessor)
{ {
} }

View File

@ -7,7 +7,7 @@ public class BookingConfiguration : IEntityTypeConfiguration<Booking.Models.Book
{ {
public void Configure(EntityTypeBuilder<Booking.Models.Booking> builder) public void Configure(EntityTypeBuilder<Booking.Models.Booking> builder)
{ {
builder.ToTable("Booking", "dbo"); builder.ToTable("Booking", BookingDbContext.DefaultSchema);
builder.HasKey(r => r.Id); builder.HasKey(r => r.Id);
builder.Property(r => r.Id).ValueGeneratedNever(); builder.Property(r => r.Id).ValueGeneratedNever();

View File

@ -0,0 +1,42 @@
using BuildingBlocks.MessageProcessor;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
namespace Booking.Data.Configurations;
public class PersistMessageConfiguration : IEntityTypeConfiguration<PersistMessage>
{
public void Configure(EntityTypeBuilder<PersistMessage> builder)
{
builder.ToTable("PersistMessages", BookingDbContext.DefaultSchema);
builder.HasKey(x => x.Id);
builder.Property(x => x.Id)
.IsRequired();
builder.Property(x => x.DeliveryType)
.HasMaxLength(50)
.HasConversion(
v => v.ToString(),
v => (MessageDeliveryType)Enum.Parse(typeof(MessageDeliveryType), v))
.IsRequired()
.IsUnicode(false);
builder.Property(x => x.DeliveryType)
.HasMaxLength(50)
.HasConversion(
v => v.ToString(),
v => (MessageDeliveryType)Enum.Parse(typeof(MessageDeliveryType), v))
.IsRequired()
.IsUnicode(false);
builder.Property(x => x.MessageStatus)
.HasMaxLength(50)
.HasConversion(
v => v.ToString(),
v => (MessageStatus)Enum.Parse(typeof(MessageStatus), v))
.IsRequired()
.IsUnicode(false);
}
}

View File

@ -0,0 +1,152 @@
// <auto-generated />
using System;
using Booking.Data;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
#nullable disable
namespace Booking.Data.Migrations
{
[DbContext(typeof(BookingDbContext))]
[Migration("20220616121920_Add-PersistMessages")]
partial class AddPersistMessages
{
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "6.0.1")
.HasAnnotation("Relational:MaxIdentifierLength", 128);
SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder, 1L, 1);
modelBuilder.Entity("Booking.Booking.Models.Booking", b =>
{
b.Property<long>("Id")
.HasColumnType("bigint");
b.Property<DateTime?>("CreatedAt")
.HasColumnType("datetime2");
b.Property<long?>("CreatedBy")
.HasColumnType("bigint");
b.Property<bool>("IsDeleted")
.HasColumnType("bit");
b.Property<DateTime?>("LastModified")
.HasColumnType("datetime2");
b.Property<long?>("LastModifiedBy")
.HasColumnType("bigint");
b.Property<long>("Version")
.HasColumnType("bigint");
b.HasKey("Id");
b.ToTable("Booking", "dbo");
});
modelBuilder.Entity("BuildingBlocks.MessageProcessor.PersistMessage", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uniqueidentifier");
b.Property<DateTime>("Created")
.HasColumnType("datetime2");
b.Property<string>("Data")
.HasColumnType("nvarchar(max)");
b.Property<string>("DataType")
.HasColumnType("nvarchar(max)");
b.Property<string>("DeliveryType")
.IsRequired()
.HasMaxLength(50)
.IsUnicode(false)
.HasColumnType("varchar(50)");
b.Property<string>("MessageStatus")
.IsRequired()
.HasMaxLength(50)
.IsUnicode(false)
.HasColumnType("varchar(50)");
b.Property<int>("RetryCount")
.HasColumnType("int");
b.HasKey("Id");
b.ToTable("PersistMessages", "dbo");
});
modelBuilder.Entity("Booking.Booking.Models.Booking", b =>
{
b.OwnsOne("Booking.Booking.Models.ValueObjects.PassengerInfo", "PassengerInfo", b1 =>
{
b1.Property<long>("BookingId")
.HasColumnType("bigint");
b1.Property<string>("Name")
.HasColumnType("nvarchar(max)");
b1.HasKey("BookingId");
b1.ToTable("Booking", "dbo");
b1.WithOwner()
.HasForeignKey("BookingId");
});
b.OwnsOne("Booking.Booking.Models.ValueObjects.Trip", "Trip", b1 =>
{
b1.Property<long>("BookingId")
.HasColumnType("bigint");
b1.Property<long>("AircraftId")
.HasColumnType("bigint");
b1.Property<long>("ArriveAirportId")
.HasColumnType("bigint");
b1.Property<long>("DepartureAirportId")
.HasColumnType("bigint");
b1.Property<string>("Description")
.HasColumnType("nvarchar(max)");
b1.Property<DateTime>("FlightDate")
.HasColumnType("datetime2");
b1.Property<string>("FlightNumber")
.HasColumnType("nvarchar(max)");
b1.Property<decimal>("Price")
.HasColumnType("decimal(18,2)");
b1.Property<string>("SeatNumber")
.HasColumnType("nvarchar(max)");
b1.HasKey("BookingId");
b1.ToTable("Booking", "dbo");
b1.WithOwner()
.HasForeignKey("BookingId");
});
b.Navigation("PassengerInfo");
b.Navigation("Trip");
});
#pragma warning restore 612, 618
}
}
}

View File

@ -0,0 +1,38 @@
using System;
using Microsoft.EntityFrameworkCore.Migrations;
#nullable disable
namespace Booking.Data.Migrations
{
public partial class AddPersistMessages : Migration
{
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "PersistMessages",
schema: "dbo",
columns: table => new
{
Id = table.Column<Guid>(type: "uniqueidentifier", nullable: false),
DataType = table.Column<string>(type: "nvarchar(max)", nullable: true),
Data = table.Column<string>(type: "nvarchar(max)", nullable: true),
Created = table.Column<DateTime>(type: "datetime2", nullable: false),
RetryCount = table.Column<int>(type: "int", nullable: false),
MessageStatus = table.Column<string>(type: "varchar(50)", unicode: false, maxLength: 50, nullable: false),
DeliveryType = table.Column<string>(type: "varchar(50)", unicode: false, maxLength: 50, nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK_PersistMessages", x => x.Id);
});
}
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "PersistMessages",
schema: "dbo");
}
}
}

View File

@ -50,6 +50,41 @@ namespace Booking.Data.Migrations
b.ToTable("Booking", "dbo"); b.ToTable("Booking", "dbo");
}); });
modelBuilder.Entity("BuildingBlocks.MessageProcessor.PersistMessage", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uniqueidentifier");
b.Property<DateTime>("Created")
.HasColumnType("datetime2");
b.Property<string>("Data")
.HasColumnType("nvarchar(max)");
b.Property<string>("DataType")
.HasColumnType("nvarchar(max)");
b.Property<string>("DeliveryType")
.IsRequired()
.HasMaxLength(50)
.IsUnicode(false)
.HasColumnType("varchar(50)");
b.Property<string>("MessageStatus")
.IsRequired()
.HasMaxLength(50)
.IsUnicode(false)
.HasColumnType("varchar(50)");
b.Property<int>("RetryCount")
.HasColumnType("int");
b.HasKey("Id");
b.ToTable("PersistMessages", "dbo");
});
modelBuilder.Entity("Booking.Booking.Models.Booking", b => modelBuilder.Entity("Booking.Booking.Models.Booking", b =>
{ {
b.OwnsOne("Booking.Booking.Models.ValueObjects.PassengerInfo", "PassengerInfo", b1 => b.OwnsOne("Booking.Booking.Models.ValueObjects.PassengerInfo", "PassengerInfo", b1 =>

View File

@ -1,7 +1,7 @@
using Booking.Booking.Events.Domain; using Booking.Booking.Events.Domain;
using BuildingBlocks.Contracts.EventBus.Messages; using BuildingBlocks.Contracts.EventBus.Messages;
using BuildingBlocks.Domain; using BuildingBlocks.Core;
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
namespace Booking; namespace Booking;

View File

@ -2,7 +2,7 @@
using System.Net.Http; using System.Net.Http;
using System.Threading.Tasks; using System.Threading.Tasks;
using Booking.Data; using Booking.Data;
using BuildingBlocks.Domain.Model; using BuildingBlocks.Core.Model;
using BuildingBlocks.MassTransit; using BuildingBlocks.MassTransit;
using BuildingBlocks.Web; using BuildingBlocks.Web;
using Grpc.Net.Client; using Grpc.Net.Client;
@ -48,7 +48,6 @@ public class IntegrationTestFixture : IAsyncLifetime
builder.UseEnvironment("test"); builder.UseEnvironment("test");
builder.ConfigureServices(services => builder.ConfigureServices(services =>
{ {
services.RemoveAll(typeof(IHostedService));
services.ReplaceSingleton(AddHttpContextAccessorMock); services.ReplaceSingleton(AddHttpContextAccessorMock);
TestRegistrationServices?.Invoke(services); TestRegistrationServices?.Invoke(services);
services.AddMassTransitTestHarness(x => services.AddMassTransitTestHarness(x =>

View File

@ -1,6 +1,6 @@
using System.Reflection; using System.Reflection;
using BuildingBlocks.Caching; using BuildingBlocks.Caching;
using BuildingBlocks.Domain; using BuildingBlocks.Core;
using BuildingBlocks.EFCore; using BuildingBlocks.EFCore;
using BuildingBlocks.Exception; using BuildingBlocks.Exception;
using BuildingBlocks.HealthCheck; using BuildingBlocks.HealthCheck;
@ -9,6 +9,8 @@ using BuildingBlocks.Jwt;
using BuildingBlocks.Logging; using BuildingBlocks.Logging;
using BuildingBlocks.Mapster; using BuildingBlocks.Mapster;
using BuildingBlocks.MassTransit; using BuildingBlocks.MassTransit;
using BuildingBlocks.MessageProcessor;
using BuildingBlocks.Mongo;
using BuildingBlocks.OpenTelemetry; using BuildingBlocks.OpenTelemetry;
using BuildingBlocks.Swagger; using BuildingBlocks.Swagger;
using BuildingBlocks.Web; using BuildingBlocks.Web;
@ -33,9 +35,11 @@ var env = builder.Environment;
var appOptions = builder.Services.GetOptions<AppOptions>("AppOptions"); var appOptions = builder.Services.GetOptions<AppOptions>("AppOptions");
Console.WriteLine(FiggleFonts.Standard.Render(appOptions.Name)); Console.WriteLine(FiggleFonts.Standard.Render(appOptions.Name));
builder.Services.AddTransient<IBusPublisher, BusPublisher>();
builder.Services.AddCustomDbContext<FlightDbContext>(configuration); builder.Services.AddCustomDbContext<FlightDbContext>(configuration);
builder.Services.AddScoped<IDataSeeder, FlightDataSeeder>(); builder.Services.AddScoped<IDataSeeder, FlightDataSeeder>();
builder.Services.AddMongoDbContext<FlightReadDbContext>(configuration);
builder.Services.AddPersistMessage(configuration);
builder.AddCustomSerilog(); builder.AddCustomSerilog();
builder.Services.AddJwt(); builder.Services.AddJwt();

View File

@ -10,6 +10,10 @@
"ConnectionStrings": { "ConnectionStrings": {
"DefaultConnection": "Server=.\\sqlexpress;Database=FlightDB;Trusted_Connection=True;MultipleActiveResultSets=true" "DefaultConnection": "Server=.\\sqlexpress;Database=FlightDB;Trusted_Connection=True;MultipleActiveResultSets=true"
}, },
"MongoOptions": {
"ConnectionString": "mongodb://localhost:27017",
"DatabaseName": "flight-db"
},
"Jwt": { "Jwt": {
"Authority": "https://localhost:5005", "Authority": "https://localhost:5005",
"Audience": "flight-api" "Audience": "flight-api"
@ -20,5 +24,9 @@
"UserName": "guest", "UserName": "guest",
"Password": "guest" "Password": "guest"
}, },
"PersistMessageOptions": {
"Interval": 30,
"Enabled": true
},
"AllowedHosts": "*" "AllowedHosts": "*"
} }

View File

@ -15,5 +15,9 @@
"Microsoft.Hosting.Lifetime": "Debug", "Microsoft.Hosting.Lifetime": "Debug",
"Microsoft.EntityFrameworkCore.Database.Command": "Debug" "Microsoft.EntityFrameworkCore.Database.Command": "Debug"
} }
},
"PersistMessageOptions": {
"Interval": 1,
"Enabled": true
} }
} }

View File

@ -1,4 +1,4 @@
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
namespace Flight.Aircrafts.Events; namespace Flight.Aircrafts.Events;

View File

@ -1,4 +1,4 @@
using BuildingBlocks.Domain.Model; using BuildingBlocks.Core.Model;
using BuildingBlocks.IdsGenerator; using BuildingBlocks.IdsGenerator;
using Flight.Aircrafts.Events; using Flight.Aircrafts.Events;

View File

@ -1,4 +1,4 @@
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
namespace Flight.Airports.Events; namespace Flight.Airports.Events;

View File

@ -1,4 +1,4 @@
using BuildingBlocks.Domain.Model; using BuildingBlocks.Core.Model;
using BuildingBlocks.IdsGenerator; using BuildingBlocks.IdsGenerator;
using Flight.Airports.Events; using Flight.Airports.Events;

View File

@ -8,7 +8,7 @@ public class AircraftConfiguration : IEntityTypeConfiguration<Aircraft>
{ {
public void Configure(EntityTypeBuilder<Aircraft> builder) public void Configure(EntityTypeBuilder<Aircraft> builder)
{ {
builder.ToTable("Aircraft", "dbo"); builder.ToTable("Aircraft", FlightDbContext.DefaultSchema);
builder.HasKey(r => r.Id); builder.HasKey(r => r.Id);
builder.Property(r => r.Id).ValueGeneratedNever(); builder.Property(r => r.Id).ValueGeneratedNever();
} }

View File

@ -8,7 +8,7 @@ public class AirportConfiguration: IEntityTypeConfiguration<Airport>
{ {
public void Configure(EntityTypeBuilder<Airport> builder) public void Configure(EntityTypeBuilder<Airport> builder)
{ {
builder.ToTable("Airport", "dbo"); builder.ToTable("Airport", FlightDbContext.DefaultSchema);
builder.HasKey(r => r.Id); builder.HasKey(r => r.Id);
builder.Property(r => r.Id).ValueGeneratedNever(); builder.Property(r => r.Id).ValueGeneratedNever();

View File

@ -9,7 +9,7 @@ public class FlightConfiguration : IEntityTypeConfiguration<Flights.Models.Fligh
{ {
public void Configure(EntityTypeBuilder<Flights.Models.Flight> builder) public void Configure(EntityTypeBuilder<Flights.Models.Flight> builder)
{ {
builder.ToTable("Flight", "dbo"); builder.ToTable("Flight", FlightDbContext.DefaultSchema);
builder.HasKey(r => r.Id); builder.HasKey(r => r.Id);
builder.Property(r => r.Id).ValueGeneratedNever(); builder.Property(r => r.Id).ValueGeneratedNever();

View File

@ -0,0 +1,43 @@
using System;
using BuildingBlocks.MessageProcessor;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
namespace Flight.Data.Configurations;
public class PersistMessageConfiguration : IEntityTypeConfiguration<PersistMessage>
{
public void Configure(EntityTypeBuilder<PersistMessage> builder)
{
builder.ToTable("PersistMessages", FlightDbContext.DefaultSchema);
builder.HasKey(x => x.Id);
builder.Property(x => x.Id)
.IsRequired();
builder.Property(x => x.DeliveryType)
.HasMaxLength(50)
.HasConversion(
v => v.ToString(),
v => (MessageDeliveryType)Enum.Parse(typeof(MessageDeliveryType), v))
.IsRequired()
.IsUnicode(false);
builder.Property(x => x.DeliveryType)
.HasMaxLength(50)
.HasConversion(
v => v.ToString(),
v => (MessageDeliveryType)Enum.Parse(typeof(MessageDeliveryType), v))
.IsRequired()
.IsUnicode(false);
builder.Property(x => x.MessageStatus)
.HasMaxLength(50)
.HasConversion(
v => v.ToString(),
v => (MessageStatus)Enum.Parse(typeof(MessageStatus), v))
.IsRequired()
.IsUnicode(false);
}
}

View File

@ -8,7 +8,7 @@ public class SeatConfiguration : IEntityTypeConfiguration<Seat>
{ {
public void Configure(EntityTypeBuilder<Seat> builder) public void Configure(EntityTypeBuilder<Seat> builder)
{ {
builder.ToTable("Seat", "dbo"); builder.ToTable("Seat", FlightDbContext.DefaultSchema);
builder.HasKey(r => r.Id); builder.HasKey(r => r.Id);
builder.Property(r => r.Id).ValueGeneratedNever(); builder.Property(r => r.Id).ValueGeneratedNever();

View File

@ -10,6 +10,7 @@ namespace Flight.Data;
public sealed class FlightDbContext : AppDbContextBase public sealed class FlightDbContext : AppDbContextBase
{ {
public const string DefaultSchema = "dbo";
public FlightDbContext(DbContextOptions<FlightDbContext> options, IHttpContextAccessor httpContextAccessor) : base( public FlightDbContext(DbContextOptions<FlightDbContext> options, IHttpContextAccessor httpContextAccessor) : base(
options, httpContextAccessor) options, httpContextAccessor)
{ {

View File

@ -0,0 +1,266 @@
// <auto-generated />
using System;
using Flight.Data;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
#nullable disable
namespace Flight.Data.Migrations
{
[DbContext(typeof(FlightDbContext))]
[Migration("20220616121204_Add-PersistMessages")]
partial class AddPersistMessages
{
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "6.0.1")
.HasAnnotation("Relational:MaxIdentifierLength", 128);
SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder, 1L, 1);
modelBuilder.Entity("BuildingBlocks.MessageProcessor.PersistMessage", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uniqueidentifier");
b.Property<DateTime>("Created")
.HasColumnType("datetime2");
b.Property<string>("Data")
.HasColumnType("nvarchar(max)");
b.Property<string>("DataType")
.HasColumnType("nvarchar(max)");
b.Property<string>("DeliveryType")
.IsRequired()
.HasMaxLength(50)
.IsUnicode(false)
.HasColumnType("varchar(50)");
b.Property<string>("MessageStatus")
.IsRequired()
.HasMaxLength(50)
.IsUnicode(false)
.HasColumnType("varchar(50)");
b.Property<int>("RetryCount")
.HasColumnType("int");
b.HasKey("Id");
b.ToTable("PersistMessages", "dbo");
});
modelBuilder.Entity("Flight.Aircrafts.Models.Aircraft", b =>
{
b.Property<long>("Id")
.HasColumnType("bigint");
b.Property<DateTime?>("CreatedAt")
.HasColumnType("datetime2");
b.Property<long?>("CreatedBy")
.HasColumnType("bigint");
b.Property<bool>("IsDeleted")
.HasColumnType("bit");
b.Property<DateTime?>("LastModified")
.HasColumnType("datetime2");
b.Property<long?>("LastModifiedBy")
.HasColumnType("bigint");
b.Property<int>("ManufacturingYear")
.HasColumnType("int");
b.Property<string>("Model")
.HasColumnType("nvarchar(max)");
b.Property<string>("Name")
.HasColumnType("nvarchar(max)");
b.Property<long>("Version")
.HasColumnType("bigint");
b.HasKey("Id");
b.ToTable("Aircraft", "dbo");
});
modelBuilder.Entity("Flight.Airports.Models.Airport", b =>
{
b.Property<long>("Id")
.HasColumnType("bigint");
b.Property<string>("Address")
.HasColumnType("nvarchar(max)");
b.Property<string>("Code")
.HasColumnType("nvarchar(max)");
b.Property<DateTime?>("CreatedAt")
.HasColumnType("datetime2");
b.Property<long?>("CreatedBy")
.HasColumnType("bigint");
b.Property<bool>("IsDeleted")
.HasColumnType("bit");
b.Property<DateTime?>("LastModified")
.HasColumnType("datetime2");
b.Property<long?>("LastModifiedBy")
.HasColumnType("bigint");
b.Property<string>("Name")
.HasColumnType("nvarchar(max)");
b.Property<long>("Version")
.HasColumnType("bigint");
b.HasKey("Id");
b.ToTable("Airport", "dbo");
});
modelBuilder.Entity("Flight.Flights.Models.Flight", b =>
{
b.Property<long>("Id")
.HasColumnType("bigint");
b.Property<long>("AircraftId")
.HasColumnType("bigint");
b.Property<long>("ArriveAirportId")
.HasColumnType("bigint");
b.Property<DateTime>("ArriveDate")
.HasColumnType("datetime2");
b.Property<DateTime?>("CreatedAt")
.HasColumnType("datetime2");
b.Property<long?>("CreatedBy")
.HasColumnType("bigint");
b.Property<long>("DepartureAirportId")
.HasColumnType("bigint");
b.Property<DateTime>("DepartureDate")
.HasColumnType("datetime2");
b.Property<decimal>("DurationMinutes")
.HasColumnType("decimal(18,2)");
b.Property<DateTime>("FlightDate")
.HasColumnType("datetime2");
b.Property<string>("FlightNumber")
.HasColumnType("nvarchar(max)");
b.Property<bool>("IsDeleted")
.HasColumnType("bit");
b.Property<DateTime?>("LastModified")
.HasColumnType("datetime2");
b.Property<long?>("LastModifiedBy")
.HasColumnType("bigint");
b.Property<decimal>("Price")
.HasColumnType("decimal(18,2)");
b.Property<int>("Status")
.HasColumnType("int");
b.Property<long>("Version")
.HasColumnType("bigint");
b.HasKey("Id");
b.HasIndex("AircraftId");
b.HasIndex("ArriveAirportId");
b.ToTable("Flight", "dbo");
});
modelBuilder.Entity("Flight.Seats.Models.Seat", b =>
{
b.Property<long>("Id")
.HasColumnType("bigint");
b.Property<int>("Class")
.HasColumnType("int");
b.Property<DateTime?>("CreatedAt")
.HasColumnType("datetime2");
b.Property<long?>("CreatedBy")
.HasColumnType("bigint");
b.Property<long>("FlightId")
.HasColumnType("bigint");
b.Property<bool>("IsDeleted")
.HasColumnType("bit");
b.Property<DateTime?>("LastModified")
.HasColumnType("datetime2");
b.Property<long?>("LastModifiedBy")
.HasColumnType("bigint");
b.Property<string>("SeatNumber")
.HasColumnType("nvarchar(max)");
b.Property<int>("Type")
.HasColumnType("int");
b.Property<long>("Version")
.HasColumnType("bigint");
b.HasKey("Id");
b.HasIndex("FlightId");
b.ToTable("Seat", "dbo");
});
modelBuilder.Entity("Flight.Flights.Models.Flight", b =>
{
b.HasOne("Flight.Aircrafts.Models.Aircraft", null)
.WithMany()
.HasForeignKey("AircraftId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.HasOne("Flight.Airports.Models.Airport", null)
.WithMany()
.HasForeignKey("ArriveAirportId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
});
modelBuilder.Entity("Flight.Seats.Models.Seat", b =>
{
b.HasOne("Flight.Flights.Models.Flight", null)
.WithMany()
.HasForeignKey("FlightId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
});
#pragma warning restore 612, 618
}
}
}

View File

@ -0,0 +1,38 @@
using System;
using Microsoft.EntityFrameworkCore.Migrations;
#nullable disable
namespace Flight.Data.Migrations
{
public partial class AddPersistMessages : Migration
{
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "PersistMessages",
schema: "dbo",
columns: table => new
{
Id = table.Column<Guid>(type: "uniqueidentifier", nullable: false),
DataType = table.Column<string>(type: "nvarchar(max)", nullable: true),
Data = table.Column<string>(type: "nvarchar(max)", nullable: true),
Created = table.Column<DateTime>(type: "datetime2", nullable: false),
RetryCount = table.Column<int>(type: "int", nullable: false),
MessageStatus = table.Column<string>(type: "varchar(50)", unicode: false, maxLength: 50, nullable: false),
DeliveryType = table.Column<string>(type: "varchar(50)", unicode: false, maxLength: 50, nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK_PersistMessages", x => x.Id);
});
}
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "PersistMessages",
schema: "dbo");
}
}
}

View File

@ -22,6 +22,41 @@ namespace Flight.Data.Migrations
SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder, 1L, 1); SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder, 1L, 1);
modelBuilder.Entity("BuildingBlocks.MessageProcessor.PersistMessage", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uniqueidentifier");
b.Property<DateTime>("Created")
.HasColumnType("datetime2");
b.Property<string>("Data")
.HasColumnType("nvarchar(max)");
b.Property<string>("DataType")
.HasColumnType("nvarchar(max)");
b.Property<string>("DeliveryType")
.IsRequired()
.HasMaxLength(50)
.IsUnicode(false)
.HasColumnType("varchar(50)");
b.Property<string>("MessageStatus")
.IsRequired()
.HasMaxLength(50)
.IsUnicode(false)
.HasColumnType("varchar(50)");
b.Property<int>("RetryCount")
.HasColumnType("int");
b.HasKey("Id");
b.ToTable("PersistMessages", "dbo");
});
modelBuilder.Entity("Flight.Aircrafts.Models.Aircraft", b => modelBuilder.Entity("Flight.Aircrafts.Models.Aircraft", b =>
{ {
b.Property<long>("Id") b.Property<long>("Id")

View File

@ -1,8 +1,8 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using BuildingBlocks.Contracts.EventBus.Messages; using BuildingBlocks.Contracts.EventBus.Messages;
using BuildingBlocks.Domain; using BuildingBlocks.Core;
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
using Flight.Aircrafts.Events; using Flight.Aircrafts.Events;
using Flight.Airports.Events; using Flight.Airports.Events;
using Flight.Flights.Events.Domain; using Flight.Flights.Events.Domain;

View File

@ -1,5 +1,5 @@
using System; using System;
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
using Flight.Flights.Models; using Flight.Flights.Models;
namespace Flight.Flights.Events.Domain; namespace Flight.Flights.Events.Domain;

View File

@ -1,5 +1,5 @@
using System; using System;
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
using Flight.Flights.Models; using Flight.Flights.Models;
namespace Flight.Flights.Events.Domain; namespace Flight.Flights.Events.Domain;

View File

@ -1,5 +1,5 @@
using System; using System;
using BuildingBlocks.Domain.Event; using BuildingBlocks.Core.Event;
using Flight.Flights.Models; using Flight.Flights.Models;
namespace Flight.Flights.Events.Domain; namespace Flight.Flights.Events.Domain;

View File

@ -1,14 +1,14 @@
using System; using System;
using BuildingBlocks.Core.CQRS;
using BuildingBlocks.IdsGenerator; using BuildingBlocks.IdsGenerator;
using Flight.Flights.Dtos; using Flight.Flights.Dtos;
using Flight.Flights.Models; using Flight.Flights.Models;
using MediatR;
namespace Flight.Flights.Features.CreateFlight; namespace Flight.Flights.Features.CreateFlight;
public record CreateFlightCommand(string FlightNumber, long AircraftId, long DepartureAirportId, public record CreateFlightCommand(string FlightNumber, long AircraftId, long DepartureAirportId,
DateTime DepartureDate, DateTime ArriveDate, long ArriveAirportId, DateTime DepartureDate, DateTime ArriveDate, long ArriveAirportId,
decimal DurationMinutes, DateTime FlightDate, FlightStatus Status, decimal Price) : IRequest<FlightResponseDto> decimal DurationMinutes, DateTime FlightDate, FlightStatus Status, decimal Price) : ICommand<FlightResponseDto>
{ {
public long Id { get; set; } = SnowFlakIdGenerator.NewId(); public long Id { get; set; } = SnowFlakIdGenerator.NewId();
} }

View File

@ -1,25 +1,30 @@
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Ardalis.GuardClauses; using Ardalis.GuardClauses;
using BuildingBlocks.Core.CQRS;
using BuildingBlocks.MessageProcessor;
using Flight.Data; using Flight.Data;
using Flight.Flights.Dtos; using Flight.Flights.Dtos;
using Flight.Flights.Exceptions; using Flight.Flights.Exceptions;
using Flight.Flights.Models; using Flight.Flights.Features.CreateFlight.Reads;
using MapsterMapper; using MapsterMapper;
using MediatR;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
namespace Flight.Flights.Features.CreateFlight; namespace Flight.Flights.Features.CreateFlight;
public class CreateFlightCommandHandler : IRequestHandler<CreateFlightCommand, FlightResponseDto> public class CreateFlightCommandHandler : ICommandHandler<CreateFlightCommand, FlightResponseDto>
{ {
private readonly FlightDbContext _flightDbContext; private readonly FlightDbContext _flightDbContext;
private readonly IPersistMessageProcessor _persistMessageProcessor;
private readonly IMapper _mapper; private readonly IMapper _mapper;
public CreateFlightCommandHandler(IMapper mapper, FlightDbContext flightDbContext) public CreateFlightCommandHandler(IMapper mapper,
FlightDbContext flightDbContext,
IPersistMessageProcessor persistMessageProcessor)
{ {
_mapper = mapper; _mapper = mapper;
_flightDbContext = flightDbContext; _flightDbContext = flightDbContext;
_persistMessageProcessor = persistMessageProcessor;
} }
public async Task<FlightResponseDto> Handle(CreateFlightCommand command, CancellationToken cancellationToken) public async Task<FlightResponseDto> Handle(CreateFlightCommand command, CancellationToken cancellationToken)
@ -32,11 +37,17 @@ public class CreateFlightCommandHandler : IRequestHandler<CreateFlightCommand, F
if (flight is not null) if (flight is not null)
throw new FlightAlreadyExistException(); throw new FlightAlreadyExistException();
var flightEntity = Models.Flight.Create(command.Id, command.FlightNumber, command.AircraftId, command.DepartureAirportId, command.DepartureDate, var flightEntity = Models.Flight.Create(command.Id, command.FlightNumber, command.AircraftId,
command.ArriveDate, command.ArriveAirportId, command.DurationMinutes, command.FlightDate, command.Status, command.Price); command.DepartureAirportId, command.DepartureDate,
command.ArriveDate, command.ArriveAirportId, command.DurationMinutes, command.FlightDate, command.Status,
command.Price);
var newFlight = await _flightDbContext.Flights.AddAsync(flightEntity, cancellationToken); var newFlight = await _flightDbContext.Flights.AddAsync(flightEntity, cancellationToken);
var createFlightMongoCommand = _mapper.Map<CreateFlightMongoCommand>(newFlight.Entity);
await _persistMessageProcessor.AddInternalMessageAsync(createFlightMongoCommand, cancellationToken);
return _mapper.Map<FlightResponseDto>(newFlight.Entity); return _mapper.Map<FlightResponseDto>(newFlight.Entity);
} }
} }

View File

@ -0,0 +1,43 @@
using System;
using BuildingBlocks.Core.Event;
using Flight.Flights.Models;
namespace Flight.Flights.Features.CreateFlight.Reads;
public class CreateFlightMongoCommand : InternalCommand
{
public CreateFlightMongoCommand()
{
}
public CreateFlightMongoCommand(long Id, string FlightNumber, long AircraftId, DateTime DepartureDate, long DepartureAirportId,
DateTime ArriveDate, long ArriveAirportId, decimal DurationMinutes, DateTime FlightDate, FlightStatus Status,
decimal Price, bool IsDeleted)
{
this.Id = Id;
this.FlightNumber = FlightNumber;
this.AircraftId = AircraftId;
this.DepartureDate = DepartureDate;
this.DepartureAirportId = DepartureAirportId;
this.ArriveDate = ArriveDate;
this.ArriveAirportId = ArriveAirportId;
this.DurationMinutes = DurationMinutes;
this.FlightDate = FlightDate;
this.Status = Status;
this.Price = Price;
this.IsDeleted = IsDeleted;
}
public string FlightNumber { get; init; }
public long AircraftId { get; init; }
public DateTime DepartureDate { get; init; }
public long DepartureAirportId { get; init; }
public DateTime ArriveDate { get; init; }
public long ArriveAirportId { get; init; }
public decimal DurationMinutes { get; init; }
public DateTime FlightDate { get; init; }
public FlightStatus Status { get; init; }
public decimal Price { get; init; }
public bool IsDeleted { get; init; }
}

View File

@ -0,0 +1,35 @@
using System.Threading;
using System.Threading.Tasks;
using Ardalis.GuardClauses;
using BuildingBlocks.Core.CQRS;
using Flight.Data;
using Flight.Flights.Models.Reads;
using MapsterMapper;
using MediatR;
namespace Flight.Flights.Features.CreateFlight.Reads;
public class CreateFlightMongoCommandHandler : ICommandHandler<CreateFlightMongoCommand>
{
private readonly FlightReadDbContext _flightReadDbContext;
private readonly IMapper _mapper;
public CreateFlightMongoCommandHandler(
FlightReadDbContext flightReadDbContext,
IMapper mapper)
{
_flightReadDbContext = flightReadDbContext;
_mapper = mapper;
}
public async Task<Unit> Handle(CreateFlightMongoCommand command, CancellationToken cancellationToken)
{
Guard.Against.Null(command, nameof(command));
var flightReadModel = _mapper.Map<FlightReadModel>(command);
await _flightReadDbContext.Flight.InsertOneAsync(flightReadModel, cancellationToken: cancellationToken);
return Unit.Value;
}
}

View File

@ -1,6 +1,6 @@
using Flight.Flights.Dtos; using BuildingBlocks.Core.CQRS;
using MediatR; using Flight.Flights.Dtos;
namespace Flight.Flights.Features.DeleteFlight; namespace Flight.Flights.Features.DeleteFlight;
public record DeleteFlightCommand(long Id) : IRequest<FlightResponseDto>; public record DeleteFlightCommand(long Id) : ICommand<FlightResponseDto>;

View File

@ -1,6 +1,7 @@
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Ardalis.GuardClauses; using Ardalis.GuardClauses;
using BuildingBlocks.Core.CQRS;
using Flight.Data; using Flight.Data;
using Flight.Flights.Dtos; using Flight.Flights.Dtos;
using Flight.Flights.Exceptions; using Flight.Flights.Exceptions;
@ -11,7 +12,7 @@ using Microsoft.EntityFrameworkCore;
namespace Flight.Flights.Features.DeleteFlight; namespace Flight.Flights.Features.DeleteFlight;
public class DeleteFlightCommandHandler : IRequestHandler<DeleteFlightCommand, FlightResponseDto> public class DeleteFlightCommandHandler : ICommandHandler<DeleteFlightCommand, FlightResponseDto>
{ {
private readonly FlightDbContext _flightDbContext; private readonly FlightDbContext _flightDbContext;
private readonly IMapper _mapper; private readonly IMapper _mapper;

View File

@ -1,5 +1,7 @@
using AutoMapper; using AutoMapper;
using Flight.Flights.Dtos; using Flight.Flights.Dtos;
using Flight.Flights.Features.CreateFlight.Reads;
using Flight.Flights.Models.Reads;
using Mapster; using Mapster;
namespace Flight.Flights.Features; namespace Flight.Flights.Features;
@ -9,5 +11,7 @@ public class FlightMappings : Profile
public void Register(TypeAdapterConfig config) public void Register(TypeAdapterConfig config)
{ {
config.NewConfig<Models.Flight, FlightResponseDto>(); config.NewConfig<Models.Flight, FlightResponseDto>();
} config.NewConfig<Models.Flight, CreateFlightMongoCommand>();
config.NewConfig<CreateFlightMongoCommand, FlightReadModel>();
}
} }

View File

@ -1,12 +1,13 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using BuildingBlocks.Caching; using BuildingBlocks.Caching;
using BuildingBlocks.Core.CQRS;
using Flight.Flights.Dtos; using Flight.Flights.Dtos;
using MediatR; using MediatR;
namespace Flight.Flights.Features.GetAvailableFlights; namespace Flight.Flights.Features.GetAvailableFlights;
public record GetAvailableFlightsQuery : IRequest<IEnumerable<FlightResponseDto>>, ICacheRequest public record GetAvailableFlightsQuery : IQuery<IEnumerable<FlightResponseDto>>, ICacheRequest
{ {
public string CacheKey => "GetAvailableFlightsQuery"; public string CacheKey => "GetAvailableFlightsQuery";
public DateTime? AbsoluteExpirationRelativeToNow => DateTime.Now.AddHours(1); public DateTime? AbsoluteExpirationRelativeToNow => DateTime.Now.AddHours(1);

View File

@ -3,6 +3,7 @@ using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Ardalis.GuardClauses; using Ardalis.GuardClauses;
using BuildingBlocks.Core.CQRS;
using Flight.Data; using Flight.Data;
using Flight.Flights.Dtos; using Flight.Flights.Dtos;
using Flight.Flights.Exceptions; using Flight.Flights.Exceptions;
@ -12,7 +13,7 @@ using Microsoft.EntityFrameworkCore;
namespace Flight.Flights.Features.GetAvailableFlights; namespace Flight.Flights.Features.GetAvailableFlights;
public class GetAvailableFlightsQueryHandler : IRequestHandler<GetAvailableFlightsQuery, IEnumerable<FlightResponseDto>> public class GetAvailableFlightsQueryHandler : IQueryHandler<GetAvailableFlightsQuery, IEnumerable<FlightResponseDto>>
{ {
private readonly FlightDbContext _flightDbContext; private readonly FlightDbContext _flightDbContext;
private readonly IMapper _mapper; private readonly IMapper _mapper;

View File

@ -1,6 +1,6 @@
using BuildingBlocks.Core.CQRS;
using Flight.Flights.Dtos; using Flight.Flights.Dtos;
using MediatR;
namespace Flight.Flights.Features.GetFlightById; namespace Flight.Flights.Features.GetFlightById;
public record GetFlightByIdQuery(long Id) : IRequest<FlightResponseDto>; public record GetFlightByIdQuery(long Id) : IQuery<FlightResponseDto>;

View File

@ -1,6 +1,7 @@
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Ardalis.GuardClauses; using Ardalis.GuardClauses;
using BuildingBlocks.Core.CQRS;
using Flight.Data; using Flight.Data;
using Flight.Flights.Dtos; using Flight.Flights.Dtos;
using Flight.Flights.Exceptions; using Flight.Flights.Exceptions;
@ -10,7 +11,7 @@ using Microsoft.EntityFrameworkCore;
namespace Flight.Flights.Features.GetFlightById; namespace Flight.Flights.Features.GetFlightById;
public class GetFlightByIdQueryHandler : IRequestHandler<GetFlightByIdQuery, FlightResponseDto> public class GetFlightByIdQueryHandler : IQueryHandler<GetFlightByIdQuery, FlightResponseDto>
{ {
private readonly FlightDbContext _flightDbContext; private readonly FlightDbContext _flightDbContext;
private readonly IMapper _mapper; private readonly IMapper _mapper;

View File

@ -1,12 +1,13 @@
using System; using System;
using BuildingBlocks.Caching; using BuildingBlocks.Caching;
using BuildingBlocks.Core.CQRS;
using Flight.Flights.Dtos; using Flight.Flights.Dtos;
using Flight.Flights.Models; using Flight.Flights.Models;
using MediatR; using MediatR;
namespace Flight.Flights.Features.UpdateFlight; namespace Flight.Flights.Features.UpdateFlight;
public record UpdateFlightCommand : IRequest<FlightResponseDto>, IInvalidateCacheRequest public record UpdateFlightCommand : ICommand<FlightResponseDto>, IInvalidateCacheRequest
{ {
public long Id { get; init; } public long Id { get; init; }
public string FlightNumber { get; init; } public string FlightNumber { get; init; }

View File

@ -1,6 +1,7 @@
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Ardalis.GuardClauses; using Ardalis.GuardClauses;
using BuildingBlocks.Core.CQRS;
using BuildingBlocks.EventStoreDB.Repository; using BuildingBlocks.EventStoreDB.Repository;
using Flight.Data; using Flight.Data;
using Flight.Flights.Dtos; using Flight.Flights.Dtos;
@ -12,7 +13,7 @@ using Microsoft.EntityFrameworkCore;
namespace Flight.Flights.Features.UpdateFlight; namespace Flight.Flights.Features.UpdateFlight;
public class UpdateFlightCommandHandler : IRequestHandler<UpdateFlightCommand, FlightResponseDto> public class UpdateFlightCommandHandler : ICommandHandler<UpdateFlightCommand, FlightResponseDto>
{ {
private readonly FlightDbContext _flightDbContext; private readonly FlightDbContext _flightDbContext;
private readonly IMapper _mapper; private readonly IMapper _mapper;

Some files were not shown because too many files have changed in this diff Show More