feat: Add add distributed transaction to EfTxBehavior

This commit is contained in:
Pc 2023-05-12 01:13:03 +03:30
parent d59c17671e
commit 24d1ca2d65
14 changed files with 171 additions and 39 deletions

View File

@ -1,24 +1,27 @@
version: "3.3" version: "3.3"
services: services:
###################################################### #######################################################
# Postgres # Postgres
###################################################### ######################################################
postgres: postgres:
image: postgres:latest image: postgres:latest
container_name: postgres container_name: postgres
restart: on-failure restart: on-failure
ports: ports:
- '5432:5432' - '5432:5432'
environment: environment:
- POSTGRES_USER=postgres - POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres - POSTGRES_PASSWORD=postgres
command: command:
- "postgres" - "postgres"
- "-c" - "-c"
- "wal_level=logical" - "wal_level=logical"
networks: - "-c"
- booking - "max_prepared_transactions=10"
networks:
- booking
####################################################### #######################################################

View File

@ -31,6 +31,8 @@ services:
- "postgres" - "postgres"
- "-c" - "-c"
- "wal_level=logical" - "wal_level=logical"
- "-c"
- "max_prepared_transactions=10"
networks: networks:
- booking - booking

View File

@ -390,7 +390,13 @@ metadata:
name: postgres name: postgres
spec: spec:
containers: containers:
- env: - args:
- postgres
- -c
- wal_level=logical
- -c
- max_prepared_transactions=10
env:
- name: POSTGRES_PASSWORD - name: POSTGRES_PASSWORD
value: postgres value: postgres
- name: POSTGRES_USER - name: POSTGRES_USER

View File

@ -1,16 +1,18 @@
namespace BuildingBlocks.EFCore; namespace BuildingBlocks.EFCore;
using System.Collections.Immutable; using System.Collections.Immutable;
using BuildingBlocks.Core.Event; using Core.Event;
using BuildingBlocks.Core.Model; using Core.Model;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using System.Data; using Microsoft.EntityFrameworkCore.Storage;
using Web; using Web;
using Exception = System.Exception; using Exception = System.Exception;
using IsolationLevel = System.Data.IsolationLevel;
public abstract class AppDbContextBase : DbContext, IDbContext public abstract class AppDbContextBase : DbContext, IDbContext
{ {
private readonly ICurrentUserProvider _currentUserProvider; private readonly ICurrentUserProvider _currentUserProvider;
private IDbContextTransaction? _currentTransaction;
protected AppDbContextBase(DbContextOptions options, ICurrentUserProvider currentUserProvider) : protected AppDbContextBase(DbContextOptions options, ICurrentUserProvider currentUserProvider) :
base(options) base(options)
@ -22,13 +24,53 @@ public abstract class AppDbContextBase : DbContext, IDbContext
{ {
} }
public async Task BeginTransactionalAsync(CancellationToken cancellationToken = default)
{
_currentTransaction ??= await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
}
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
public async Task CommitTransactionalAsync(CancellationToken cancellationToken = default)
{
try
{
await SaveChangesAsync(cancellationToken);
await _currentTransaction?.CommitAsync(cancellationToken)!;
}
catch
{
await _currentTransaction?.RollbackAsync(cancellationToken)!;
throw;
}
finally
{
_currentTransaction?.Dispose();
_currentTransaction = null;
}
}
public async Task RollbackTransactionAsync(CancellationToken cancellationToken = default)
{
try
{
await _currentTransaction?.RollbackAsync(cancellationToken)!;
}
finally
{
_currentTransaction?.Dispose();
_currentTransaction = null;
}
}
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions //ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default) public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default)
{ {
var strategy = Database.CreateExecutionStrategy(); var strategy = Database.CreateExecutionStrategy();
return strategy.ExecuteAsync(async () => return strategy.ExecuteAsync(async () =>
{ {
await using var transaction = await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken); await using var transaction =
await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
try try
{ {
await SaveChangesAsync(cancellationToken); await SaveChangesAsync(cancellationToken);

View File

@ -5,21 +5,27 @@ using Microsoft.Extensions.Logging;
namespace BuildingBlocks.EFCore; namespace BuildingBlocks.EFCore;
using System.Transactions;
using PersistMessageProcessor;
public class EfTxBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> public class EfTxBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull, IRequest<TResponse> where TRequest : notnull, IRequest<TResponse>
where TResponse : notnull where TResponse : notnull
{ {
private readonly ILogger<EfTxBehavior<TRequest, TResponse>> _logger; private readonly ILogger<EfTxBehavior<TRequest, TResponse>> _logger;
private readonly IDbContext _dbContextBase; private readonly IDbContext _dbContextBase;
private readonly IPersistMessageDbContext _persistMessageDbContext;
private readonly IEventDispatcher _eventDispatcher; private readonly IEventDispatcher _eventDispatcher;
public EfTxBehavior( public EfTxBehavior(
ILogger<EfTxBehavior<TRequest, TResponse>> logger, ILogger<EfTxBehavior<TRequest, TResponse>> logger,
IDbContext dbContextBase, IDbContext dbContextBase,
IPersistMessageDbContext persistMessageDbContext,
IEventDispatcher eventDispatcher) IEventDispatcher eventDispatcher)
{ {
_logger = logger; _logger = logger;
_dbContextBase = dbContextBase; _dbContextBase = dbContextBase;
_persistMessageDbContext = persistMessageDbContext;
_eventDispatcher = eventDispatcher; _eventDispatcher = eventDispatcher;
} }
@ -42,6 +48,10 @@ public class EfTxBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TRe
nameof(EfTxBehavior<TRequest, TResponse>), nameof(EfTxBehavior<TRequest, TResponse>),
typeof(TRequest).FullName); typeof(TRequest).FullName);
// ref: https://learn.microsoft.com/en-us/ef/core/saving/transactions#using-systemtransactions
using var scope = new TransactionScope(TransactionScopeOption.Required,
new TransactionOptions { IsolationLevel = IsolationLevel.ReadCommitted },
TransactionScopeAsyncFlowOption.Enabled);
var response = await next(); var response = await next();
@ -55,11 +65,16 @@ public class EfTxBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TRe
var domainEvents = _dbContextBase.GetDomainEvents(); var domainEvents = _dbContextBase.GetDomainEvents();
if (domainEvents is null || !domainEvents.Any()) if (domainEvents is null || !domainEvents.Any())
{
return response; return response;
}
await _dbContextBase.ExecuteTransactionalAsync(cancellationToken);
await _eventDispatcher.SendAsync(domainEvents.ToArray(), typeof(TRequest), cancellationToken); await _eventDispatcher.SendAsync(domainEvents.ToArray(), typeof(TRequest), cancellationToken);
await _dbContextBase.SaveChangesAsync(cancellationToken);
await _persistMessageDbContext.SaveChangesAsync(cancellationToken);
scope.Complete();
} }
} }
} }

View File

@ -36,7 +36,7 @@ public static class Extensions
{ {
dbOptions.MigrationsAssembly(typeof(TContext).Assembly.GetName().Name); dbOptions.MigrationsAssembly(typeof(TContext).Assembly.GetName().Name);
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency //ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
dbOptions.EnableRetryOnFailure(3, TimeSpan.FromSeconds(1), null); // dbOptions.EnableRetryOnFailure(3, TimeSpan.FromSeconds(1), null);
}) })
// https://github.com/efcore/EFCore.NamingConventions // https://github.com/efcore/EFCore.NamingConventions
.UseSnakeCaseNamingConvention(); .UseSnakeCaseNamingConvention();

View File

@ -7,6 +7,9 @@ public interface IDbContext
{ {
DbSet<TEntity> Set<TEntity>() where TEntity : class; DbSet<TEntity> Set<TEntity>() where TEntity : class;
IReadOnlyList<IDomainEvent> GetDomainEvents(); IReadOnlyList<IDomainEvent> GetDomainEvents();
public Task BeginTransactionalAsync(CancellationToken cancellationToken = default);
public Task CommitTransactionalAsync(CancellationToken cancellationToken = default);
public Task RollbackTransactionAsync(CancellationToken cancellationToken = default);
Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default); Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default);
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default); Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
} }

View File

@ -8,7 +8,6 @@ using Configurations;
using Core.Model; using Core.Model;
using global::Polly; using global::Polly;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Exception = System.Exception; using Exception = System.Exception;
public class PersistMessageDbContext : DbContext, IPersistMessageDbContext public class PersistMessageDbContext : DbContext, IPersistMessageDbContext

View File

@ -1,4 +1,5 @@
namespace BuildingBlocks.TestBase; namespace BuildingBlocks.TestBase;
using Testcontainers.EventStoreDb; using Testcontainers.EventStoreDb;
using Testcontainers.MongoDb; using Testcontainers.MongoDb;
using Testcontainers.PostgreSql; using Testcontainers.PostgreSql;
@ -7,21 +8,25 @@ using Web;
public static class TestContainers public static class TestContainers
{ {
public static RabbitMqContainerOptions RabbitMqContainerConfiguration { get;} public static RabbitMqContainerOptions RabbitMqContainerConfiguration { get; }
public static PostgresContainerOptions PostgresContainerConfiguration { get;} public static PostgresContainerOptions PostgresContainerConfiguration { get; }
public static PostgresPersistContainerOptions PostgresPersistContainerConfiguration { get;} public static PostgresPersistContainerOptions PostgresPersistContainerConfiguration { get; }
public static MongoContainerOptions MongoContainerConfiguration { get;} public static MongoContainerOptions MongoContainerConfiguration { get; }
public static EventStoreContainerOptions EventStoreContainerConfiguration { get;} public static EventStoreContainerOptions EventStoreContainerConfiguration { get; }
static TestContainers() static TestContainers()
{ {
var configuration = ConfigurationHelper.GetConfiguration(); var configuration = ConfigurationHelper.GetConfiguration();
RabbitMqContainerConfiguration = configuration.GetOptions<RabbitMqContainerOptions>(nameof(RabbitMqContainerOptions)); RabbitMqContainerConfiguration =
PostgresContainerConfiguration = configuration.GetOptions<PostgresContainerOptions>(nameof(PostgresContainerOptions)); configuration.GetOptions<RabbitMqContainerOptions>(nameof(RabbitMqContainerOptions));
PostgresPersistContainerConfiguration = configuration.GetOptions<PostgresPersistContainerOptions>(nameof(PostgresPersistContainerOptions)); PostgresContainerConfiguration =
configuration.GetOptions<PostgresContainerOptions>(nameof(PostgresContainerOptions));
PostgresPersistContainerConfiguration =
configuration.GetOptions<PostgresPersistContainerOptions>(nameof(PostgresPersistContainerOptions));
MongoContainerConfiguration = configuration.GetOptions<MongoContainerOptions>(nameof(MongoContainerOptions)); MongoContainerConfiguration = configuration.GetOptions<MongoContainerOptions>(nameof(MongoContainerOptions));
EventStoreContainerConfiguration = configuration.GetOptions<EventStoreContainerOptions>(nameof(EventStoreContainerOptions)); EventStoreContainerConfiguration =
configuration.GetOptions<EventStoreContainerOptions>(nameof(EventStoreContainerOptions));
} }
public static PostgreSqlContainer PostgresTestContainer() public static PostgreSqlContainer PostgresTestContainer()
@ -34,6 +39,7 @@ public static class TestContainers
var builder = baseBuilder var builder = baseBuilder
.WithImage(PostgresContainerConfiguration.ImageName) .WithImage(PostgresContainerConfiguration.ImageName)
.WithName(PostgresContainerConfiguration.Name) .WithName(PostgresContainerConfiguration.Name)
.WithCommand(new string[2] { "-c", "max_prepared_transactions=10" })
.WithPortBinding(PostgresContainerConfiguration.Port, true) .WithPortBinding(PostgresContainerConfiguration.Port, true)
.Build(); .Build();
@ -50,6 +56,7 @@ public static class TestContainers
var builder = baseBuilder var builder = baseBuilder
.WithImage(PostgresPersistContainerConfiguration.ImageName) .WithImage(PostgresPersistContainerConfiguration.ImageName)
.WithName(PostgresPersistContainerConfiguration.Name) .WithName(PostgresPersistContainerConfiguration.Name)
.WithCommand(new string[2] { "-c", "max_prepared_transactions=10" })
.WithPortBinding(PostgresPersistContainerConfiguration.Port, true) .WithPortBinding(PostgresPersistContainerConfiguration.Port, true)
.Build(); .Build();

View File

@ -5,7 +5,7 @@ using MassTransit;
namespace Flight; namespace Flight;
public class CreateFlightConsumerHandler : IConsumer<FlightCreated> public class FlightCreatedConsumerHandler : IConsumer<FlightCreated>
{ {
public Task Consume(ConsumeContext<FlightCreated> context) public Task Consume(ConsumeContext<FlightCreated> context)
{ {

View File

@ -54,7 +54,7 @@ public class CreateFlightEndpoint : IMinimalEndpoint
return Results.CreatedAtRoute("GetFlightById", new { id = result.Id }, response); return Results.CreatedAtRoute("GetFlightById", new { id = result.Id }, response);
}) })
// .RequireAuthorization() .RequireAuthorization()
.WithName("CreateFlight") .WithName("CreateFlight")
.WithApiVersionSet(builder.NewApiVersionSet("Flight").Build()) .WithApiVersionSet(builder.NewApiVersionSet("Flight").Build())
.Produces<CreateFlightResponseDto>(StatusCodes.Status201Created) .Produces<CreateFlightResponseDto>(StatusCodes.Status201Created)

View File

@ -9,18 +9,19 @@ using BuildingBlocks.Core.Event;
using BuildingBlocks.Core.Model; using BuildingBlocks.Core.Model;
using BuildingBlocks.EFCore; using BuildingBlocks.EFCore;
using Identity.Identity.Models; using Identity.Identity.Models;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Identity.EntityFrameworkCore; using Microsoft.AspNetCore.Identity.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
namespace Identity.Data; namespace Identity.Data;
using System; using System;
using Microsoft.EntityFrameworkCore.Storage;
public sealed class IdentityContext : IdentityDbContext<User, Role, Guid, public sealed class IdentityContext : IdentityDbContext<User, Role, Guid,
UserClaim, UserRole, UserLogin, RoleClaim, UserToken>, IDbContext UserClaim, UserRole, UserLogin, RoleClaim, UserToken>, IDbContext
{ {
private IDbContextTransaction? _currentTransaction;
public IdentityContext(DbContextOptions<IdentityContext> options) : base(options) public IdentityContext(DbContextOptions<IdentityContext> options) : base(options)
{ {
} }
@ -33,6 +34,45 @@ public sealed class IdentityContext : IdentityDbContext<User, Role, Guid,
builder.ToSnakeCaseTables(); builder.ToSnakeCaseTables();
} }
public async Task BeginTransactionalAsync(CancellationToken cancellationToken = default)
{
_currentTransaction ??= await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
}
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
public async Task CommitTransactionalAsync(CancellationToken cancellationToken = default)
{
try
{
await SaveChangesAsync(cancellationToken);
await _currentTransaction?.CommitAsync(cancellationToken)!;
}
catch
{
await _currentTransaction?.RollbackAsync(cancellationToken)!;
throw;
}
finally
{
_currentTransaction?.Dispose();
_currentTransaction = null;
}
}
public async Task RollbackTransactionAsync(CancellationToken cancellationToken = default)
{
try
{
await _currentTransaction?.RollbackAsync(cancellationToken)!;
}
finally
{
_currentTransaction?.Dispose();
_currentTransaction = null;
}
}
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions //ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default) public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default)
{ {

View File

@ -46,6 +46,7 @@ public class RegisterNewUserEndpoint : IMinimalEndpoint
return Results.Ok(response); return Results.Ok(response);
}) })
.RequireAuthorization()
.WithName("RegisterUser") .WithName("RegisterUser")
.WithApiVersionSet(builder.NewApiVersionSet("Identity").Build()) .WithApiVersionSet(builder.NewApiVersionSet("Identity").Build())
.Produces<RegisterNewUserResponseDto>() .Produces<RegisterNewUserResponseDto>()

View File

@ -0,0 +1,14 @@
namespace Identity;
using System;
using System.Threading.Tasks;
using BuildingBlocks.Contracts.EventBus.Messages;
using MassTransit;
public class UserCreatedConsumerHandler : IConsumer<UserCreated>
{
public Task Consume(ConsumeContext<UserCreated> context)
{
Console.WriteLine("It's for test"); return Task.CompletedTask;
}
}