mirror of
https://github.com/meysamhadeli/booking-microservices.git
synced 2026-05-03 03:11:51 +08:00
Merge pull request #265 from meysamhadeli/feat/add_retry_for_distributed_transaction_in_postgres
feat: Add retry for distributed transaction in postgres
This commit is contained in:
commit
7e9cea74f6
@ -4,6 +4,7 @@ using System.Collections.Immutable;
|
|||||||
using Core.Event;
|
using Core.Event;
|
||||||
using Core.Model;
|
using Core.Model;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.EntityFrameworkCore.Storage;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Web;
|
using Web;
|
||||||
using Exception = System.Exception;
|
using Exception = System.Exception;
|
||||||
@ -13,6 +14,7 @@ public abstract class AppDbContextBase : DbContext, IDbContext
|
|||||||
{
|
{
|
||||||
private readonly ICurrentUserProvider? _currentUserProvider;
|
private readonly ICurrentUserProvider? _currentUserProvider;
|
||||||
private readonly ILogger<AppDbContextBase>? _logger;
|
private readonly ILogger<AppDbContextBase>? _logger;
|
||||||
|
private IDbContextTransaction _currentTransaction;
|
||||||
|
|
||||||
protected AppDbContextBase(DbContextOptions options, ICurrentUserProvider? currentUserProvider = null, ILogger<AppDbContextBase>? logger = null) :
|
protected AppDbContextBase(DbContextOptions options, ICurrentUserProvider? currentUserProvider = null, ILogger<AppDbContextBase>? logger = null) :
|
||||||
base(options)
|
base(options)
|
||||||
@ -26,10 +28,52 @@ public abstract class AppDbContextBase : DbContext, IDbContext
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public IExecutionStrategy CreateExecutionStrategy() => Database.CreateExecutionStrategy();
|
||||||
|
|
||||||
|
public async Task BeginTransactionAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
if (_currentTransaction != null) return;
|
||||||
|
|
||||||
|
_currentTransaction = await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task CommitTransactionAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await SaveChangesAsync(cancellationToken);
|
||||||
|
await _currentTransaction?.CommitAsync(cancellationToken)!;
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
await RollbackTransactionAsync(cancellationToken);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_currentTransaction?.Dispose();
|
||||||
|
_currentTransaction = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task RollbackTransactionAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _currentTransaction?.RollbackAsync(cancellationToken)!;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_currentTransaction?.Dispose();
|
||||||
|
_currentTransaction = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
|
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
|
||||||
public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default)
|
public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
var strategy = Database.CreateExecutionStrategy();
|
var strategy = CreateExecutionStrategy();
|
||||||
return strategy.ExecuteAsync(async () =>
|
return strategy.ExecuteAsync(async () =>
|
||||||
{
|
{
|
||||||
await using var transaction =
|
await using var transaction =
|
||||||
|
|||||||
@ -7,6 +7,7 @@ namespace BuildingBlocks.EFCore;
|
|||||||
|
|
||||||
using System.Transactions;
|
using System.Transactions;
|
||||||
using PersistMessageProcessor;
|
using PersistMessageProcessor;
|
||||||
|
using Polly;
|
||||||
|
|
||||||
public class EfTxBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
|
public class EfTxBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
|
||||||
where TRequest : notnull, IRequest<TResponse>
|
where TRequest : notnull, IRequest<TResponse>
|
||||||
@ -48,10 +49,10 @@ public class EfTxBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TRe
|
|||||||
nameof(EfTxBehavior<TRequest, TResponse>),
|
nameof(EfTxBehavior<TRequest, TResponse>),
|
||||||
typeof(TRequest).FullName);
|
typeof(TRequest).FullName);
|
||||||
|
|
||||||
// ref: https://learn.microsoft.com/en-us/ef/core/saving/transactions#using-systemtransactions
|
//ref: https://learn.microsoft.com/en-us/ef/core/saving/transactions#using-systemtransactions
|
||||||
using var scope = new TransactionScope(TransactionScopeOption.Required,
|
using var scope = new TransactionScope(TransactionScopeOption.Required,
|
||||||
new TransactionOptions { IsolationLevel = IsolationLevel.ReadCommitted },
|
new TransactionOptions { IsolationLevel = IsolationLevel.ReadCommitted },
|
||||||
TransactionScopeAsyncFlowOption.Enabled);
|
TransactionScopeAsyncFlowOption.Enabled);
|
||||||
|
|
||||||
var response = await next();
|
var response = await next();
|
||||||
|
|
||||||
@ -71,10 +72,23 @@ public class EfTxBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TRe
|
|||||||
|
|
||||||
await _eventDispatcher.SendAsync(domainEvents.ToArray(), typeof(TRequest), cancellationToken);
|
await _eventDispatcher.SendAsync(domainEvents.ToArray(), typeof(TRequest), cancellationToken);
|
||||||
|
|
||||||
await _dbContextBase.SaveChangesAsync(cancellationToken);
|
// Save data to database with some retry policy in distributed transaction
|
||||||
|
await _dbContextBase.RetryOnFailure(async () =>
|
||||||
|
{
|
||||||
|
await _dbContextBase.SaveChangesAsync(cancellationToken);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Save data to database with some retry policy in distributed transaction
|
||||||
|
await _dbContextBase.RetryOnFailure(async () =>
|
||||||
|
{
|
||||||
|
await _dbContextBase.SaveChangesAsync(cancellationToken);
|
||||||
|
});
|
||||||
|
|
||||||
await _persistMessageDbContext.SaveChangesAsync(cancellationToken);
|
await _persistMessageDbContext.SaveChangesAsync(cancellationToken);
|
||||||
|
|
||||||
scope.Complete();
|
scope.Complete();
|
||||||
|
|
||||||
|
return response;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -34,8 +34,6 @@ public static class Extensions
|
|||||||
dbOptions =>
|
dbOptions =>
|
||||||
{
|
{
|
||||||
dbOptions.MigrationsAssembly(typeof(TContext).Assembly.GetName().Name);
|
dbOptions.MigrationsAssembly(typeof(TContext).Assembly.GetName().Name);
|
||||||
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
|
|
||||||
// dbOptions.EnableRetryOnFailure(3, TimeSpan.FromSeconds(1), null);
|
|
||||||
})
|
})
|
||||||
// https://github.com/efcore/EFCore.NamingConventions
|
// https://github.com/efcore/EFCore.NamingConventions
|
||||||
.UseSnakeCaseNamingConvention();
|
.UseSnakeCaseNamingConvention();
|
||||||
|
|||||||
@ -3,10 +3,16 @@ using Microsoft.EntityFrameworkCore;
|
|||||||
|
|
||||||
namespace BuildingBlocks.EFCore;
|
namespace BuildingBlocks.EFCore;
|
||||||
|
|
||||||
|
using Microsoft.EntityFrameworkCore.Storage;
|
||||||
|
|
||||||
public interface IDbContext
|
public interface IDbContext
|
||||||
{
|
{
|
||||||
DbSet<TEntity> Set<TEntity>() where TEntity : class;
|
DbSet<TEntity> Set<TEntity>() where TEntity : class;
|
||||||
IReadOnlyList<IDomainEvent> GetDomainEvents();
|
IReadOnlyList<IDomainEvent> GetDomainEvents();
|
||||||
Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default);
|
|
||||||
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
|
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
|
||||||
|
Task BeginTransactionAsync(CancellationToken cancellationToken = default);
|
||||||
|
Task CommitTransactionAsync(CancellationToken cancellationToken = default);
|
||||||
|
Task RollbackTransactionAsync(CancellationToken cancellationToken = default);
|
||||||
|
IExecutionStrategy CreateExecutionStrategy();
|
||||||
|
Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -7,6 +7,7 @@ using Configurations;
|
|||||||
using Core.Model;
|
using Core.Model;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Exception = System.Exception;
|
using Exception = System.Exception;
|
||||||
|
using IsolationLevel = System.Data.IsolationLevel;
|
||||||
|
|
||||||
public class PersistMessageDbContext : DbContext, IPersistMessageDbContext
|
public class PersistMessageDbContext : DbContext, IPersistMessageDbContext
|
||||||
{
|
{
|
||||||
@ -28,6 +29,27 @@ public class PersistMessageDbContext : DbContext, IPersistMessageDbContext
|
|||||||
builder.ToSnakeCaseTables();
|
builder.ToSnakeCaseTables();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
|
||||||
|
public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
var strategy = Database.CreateExecutionStrategy();
|
||||||
|
return strategy.ExecuteAsync(async () =>
|
||||||
|
{
|
||||||
|
await using var transaction =
|
||||||
|
await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await SaveChangesAsync(cancellationToken);
|
||||||
|
await transaction.CommitAsync(cancellationToken);
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
await transaction.RollbackAsync(cancellationToken);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
|
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
OnBeforeSaving();
|
OnBeforeSaving();
|
||||||
|
|||||||
@ -6,4 +6,5 @@ public interface IPersistMessageDbContext
|
|||||||
{
|
{
|
||||||
DbSet<PersistMessage> PersistMessages { get; }
|
DbSet<PersistMessage> PersistMessages { get; }
|
||||||
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
|
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
|
||||||
|
Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -10,6 +10,7 @@ using Microsoft.Extensions.Logging;
|
|||||||
namespace BuildingBlocks.PersistMessageProcessor;
|
namespace BuildingBlocks.PersistMessageProcessor;
|
||||||
|
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Polly;
|
||||||
|
|
||||||
public class PersistMessageProcessor : IPersistMessageProcessor
|
public class PersistMessageProcessor : IPersistMessageProcessor
|
||||||
{
|
{
|
||||||
@ -199,7 +200,10 @@ public class PersistMessageProcessor : IPersistMessageProcessor
|
|||||||
deliveryType),
|
deliveryType),
|
||||||
cancellationToken);
|
cancellationToken);
|
||||||
|
|
||||||
await _persistMessageDbContext.SaveChangesAsync(cancellationToken);
|
await _persistMessageDbContext.RetryOnFailure(async () =>
|
||||||
|
{
|
||||||
|
await _persistMessageDbContext.SaveChangesAsync(cancellationToken);
|
||||||
|
});
|
||||||
|
|
||||||
_logger.LogInformation(
|
_logger.LogInformation(
|
||||||
"Message with id: {MessageID} and delivery type: {DeliveryType} saved in persistence message store.",
|
"Message with id: {MessageID} and delivery type: {DeliveryType} saved in persistence message store.",
|
||||||
@ -215,6 +219,9 @@ public class PersistMessageProcessor : IPersistMessageProcessor
|
|||||||
|
|
||||||
_persistMessageDbContext.PersistMessages.Update(message);
|
_persistMessageDbContext.PersistMessages.Update(message);
|
||||||
|
|
||||||
await _persistMessageDbContext.SaveChangesAsync(cancellationToken);
|
await _persistMessageDbContext.RetryOnFailure(async () =>
|
||||||
|
{
|
||||||
|
await _persistMessageDbContext.SaveChangesAsync(cancellationToken);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
20
src/BuildingBlocks/Polly/Extensions.cs
Normal file
20
src/BuildingBlocks/Polly/Extensions.cs
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
namespace BuildingBlocks.Polly;
|
||||||
|
|
||||||
|
using global::Polly;
|
||||||
|
using Exception = System.Exception;
|
||||||
|
|
||||||
|
public static class Extensions
|
||||||
|
{
|
||||||
|
public static T RetryOnFailure<T>(this object retrySource, Func<T> action, int retryCount = 3)
|
||||||
|
{
|
||||||
|
var retryPolicy = Policy
|
||||||
|
.Handle<Exception>()
|
||||||
|
.Retry(retryCount, (exception, retryAttempt) =>
|
||||||
|
{
|
||||||
|
Console.WriteLine($"Retry attempt: {retryAttempt}");
|
||||||
|
Console.WriteLine($"Exception: {exception.Message}");
|
||||||
|
});
|
||||||
|
|
||||||
|
return retryPolicy.Execute(action);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -10,5 +10,6 @@ public class FakeCreateAircraftCommand : AutoFaker<CreateAircraft>
|
|||||||
public FakeCreateAircraftCommand()
|
public FakeCreateAircraftCommand()
|
||||||
{
|
{
|
||||||
RuleFor(r => r.Id, _ => NewId.NextGuid());
|
RuleFor(r => r.Id, _ => NewId.NextGuid());
|
||||||
|
RuleFor(r => r.ManufacturingYear, _ => 2000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -22,6 +22,7 @@ public sealed class IdentityContext : IdentityDbContext<User, Role, Guid,
|
|||||||
UserClaim, UserRole, UserLogin, RoleClaim, UserToken>, IDbContext
|
UserClaim, UserRole, UserLogin, RoleClaim, UserToken>, IDbContext
|
||||||
{
|
{
|
||||||
private readonly ILogger<IdentityContext>? _logger;
|
private readonly ILogger<IdentityContext>? _logger;
|
||||||
|
private IDbContextTransaction _currentTransaction;
|
||||||
|
|
||||||
public IdentityContext(DbContextOptions<IdentityContext> options, ILogger<IdentityContext>? logger = null) : base(options)
|
public IdentityContext(DbContextOptions<IdentityContext> options, ILogger<IdentityContext>? logger = null) : base(options)
|
||||||
{
|
{
|
||||||
@ -36,10 +37,51 @@ public sealed class IdentityContext : IdentityDbContext<User, Role, Guid,
|
|||||||
builder.ToSnakeCaseTables();
|
builder.ToSnakeCaseTables();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public IExecutionStrategy CreateExecutionStrategy() => Database.CreateExecutionStrategy();
|
||||||
|
|
||||||
|
public async Task BeginTransactionAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
if (_currentTransaction != null) return;
|
||||||
|
|
||||||
|
_currentTransaction = await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task CommitTransactionAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await SaveChangesAsync(cancellationToken);
|
||||||
|
await _currentTransaction?.CommitAsync(cancellationToken)!;
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
await RollbackTransactionAsync(cancellationToken);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_currentTransaction?.Dispose();
|
||||||
|
_currentTransaction = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task RollbackTransactionAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _currentTransaction?.RollbackAsync(cancellationToken)!;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_currentTransaction?.Dispose();
|
||||||
|
_currentTransaction = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
|
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
|
||||||
public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default)
|
public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
var strategy = Database.CreateExecutionStrategy();
|
var strategy = CreateExecutionStrategy();
|
||||||
return strategy.ExecuteAsync(async () =>
|
return strategy.ExecuteAsync(async () =>
|
||||||
{
|
{
|
||||||
await using var transaction =
|
await using var transaction =
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user