This commit is contained in:
meysamhadeli 2022-07-01 20:05:42 +04:30
parent b395a6b9f4
commit a702916b91
6 changed files with 47 additions and 31 deletions

View File

@ -4,5 +4,5 @@ namespace BuildingBlocks.Core.Event;
public enum EventType public enum EventType
{ {
IntegrationEvent = 1, IntegrationEvent = 1,
DomainEvent = 2, DomainEvent = 2
} }

View File

@ -2,5 +2,4 @@ namespace BuildingBlocks.Core.Event;
public interface IDomainEvent : IEvent public interface IDomainEvent : IEvent
{ {
} }

View File

@ -30,17 +30,12 @@ public sealed class EventDispatcher : IEventDispatcher
_httpContextAccessor = httpContextAccessor; _httpContextAccessor = httpContextAccessor;
} }
public async Task SendAsync(IDomainEvent domainEvent,
CancellationToken cancellationToken = default) => await SendAsync(new[] {domainEvent}, cancellationToken);
public async Task SendAsync(IReadOnlyList<IDomainEvent> domainEvents, CancellationToken cancellationToken = default) public async Task SendAsync<T>(IReadOnlyList<T> events, CancellationToken cancellationToken = default)
where T : IEvent
{
async Task PublishIntegrationEvent(IReadOnlyList<IIntegrationEvent> integrationEvents)
{ {
if (domainEvents is null) return;
var integrationEvents = await MapDomainEventToIntegrationEventAsync(domainEvents).ConfigureAwait(false);
if (integrationEvents.Count == 0) return;
foreach (var integrationEvent in integrationEvents) foreach (var integrationEvent in integrationEvents)
{ {
await _persistMessageProcessor.PublishMessageAsync(new MessageEnvelope(integrationEvent, SetHeaders()), await _persistMessageProcessor.PublishMessageAsync(new MessageEnvelope(integrationEvent, SetHeaders()),
@ -48,19 +43,31 @@ public sealed class EventDispatcher : IEventDispatcher
} }
} }
if (events.Count > 0)
public async Task SendAsync(IIntegrationEvent integrationEvent,
CancellationToken cancellationToken = default) => await SendAsync(new[] {integrationEvent}, cancellationToken);
public async Task SendAsync(IReadOnlyList<IIntegrationEvent> integrationEvents,
CancellationToken cancellationToken = default)
{ {
if (integrationEvents is null) return; switch (events)
{
case IReadOnlyList<IDomainEvent> domainEvents:
{
var integrationEvents = await MapDomainEventToIntegrationEventAsync(domainEvents)
.ConfigureAwait(false);
await _persistMessageProcessor.PublishMessageAsync(new MessageEnvelope(integrationEvents, SetHeaders()), await PublishIntegrationEvent(integrationEvents);
cancellationToken); break;
} }
case IReadOnlyList<IIntegrationEvent> integrationEvents:
await PublishIntegrationEvent(integrationEvents);
break;
}
}
}
public async Task SendAsync<T>(T @event, CancellationToken cancellationToken = default)
where T : IEvent =>
await SendAsync(new[] {@event}, cancellationToken);
private Task<IReadOnlyList<IIntegrationEvent>> MapDomainEventToIntegrationEventAsync( private Task<IReadOnlyList<IIntegrationEvent>> MapDomainEventToIntegrationEventAsync(
IReadOnlyList<IDomainEvent> events) IReadOnlyList<IDomainEvent> events)
{ {

View File

@ -4,9 +4,8 @@ namespace BuildingBlocks.Core;
public interface IEventDispatcher public interface IEventDispatcher
{ {
public Task SendAsync(IReadOnlyList<IDomainEvent> domainEvents, CancellationToken cancellationToken = default); public Task SendAsync<T>(IReadOnlyList<T> events, CancellationToken cancellationToken = default)
public Task SendAsync(IDomainEvent domainEvent, CancellationToken cancellationToken = default); where T : IEvent;
public Task SendAsync<T>(T @event, CancellationToken cancellationToken = default)
public Task SendAsync(IIntegrationEvent integrationEvent, CancellationToken cancellationToken = default); where T : IEvent;
public Task SendAsync(IReadOnlyList<IIntegrationEvent> integrationEvents, CancellationToken cancellationToken = default);
} }

View File

@ -1,6 +1,7 @@
using System.Data; using System.Data;
using System.Text.Json; using System.Text.Json;
using BuildingBlocks.Core; using BuildingBlocks.Core;
using BuildingBlocks.Core.Event;
using MediatR; using MediatR;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;

View File

@ -1,11 +1,15 @@
using System.Threading; using System.Linq;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Ardalis.GuardClauses; using Ardalis.GuardClauses;
using BuildingBlocks.Core.CQRS; using BuildingBlocks.Core.CQRS;
using Flight.Data; using Flight.Data;
using Flight.Flights.Exceptions;
using Flight.Flights.Models.Reads; using Flight.Flights.Models.Reads;
using MapsterMapper; using MapsterMapper;
using MediatR; using MediatR;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
namespace Flight.Flights.Features.CreateFlight.Reads; namespace Flight.Flights.Features.CreateFlight.Reads;
@ -28,6 +32,12 @@ public class CreateFlightMongoCommandHandler : ICommandHandler<CreateFlightMongo
var flightReadModel = _mapper.Map<FlightReadModel>(command); var flightReadModel = _mapper.Map<FlightReadModel>(command);
var flight = await _flightReadDbContext.Flight.AsQueryable()
.FirstOrDefaultAsync(x => x.Id == command.Id, cancellationToken);
if (flight is not null)
throw new FlightAlreadyExistException();
await _flightReadDbContext.Flight.InsertOneAsync(flightReadModel, cancellationToken: cancellationToken); await _flightReadDbContext.Flight.InsertOneAsync(flightReadModel, cancellationToken: cancellationToken);
return Unit.Value; return Unit.Value;