feat: Add retry for distributed transaction in postgres

This commit is contained in:
Pc 2023-06-04 03:17:45 +03:30
parent 39bee67fa1
commit 66cd7c54be
9 changed files with 166 additions and 12 deletions

View File

@ -4,6 +4,7 @@ using System.Collections.Immutable;
using Core.Event;
using Core.Model;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
using Web;
using Exception = System.Exception;
@ -13,6 +14,7 @@ public abstract class AppDbContextBase : DbContext, IDbContext
{
private readonly ICurrentUserProvider? _currentUserProvider;
private readonly ILogger<AppDbContextBase>? _logger;
private IDbContextTransaction _currentTransaction;
protected AppDbContextBase(DbContextOptions options, ICurrentUserProvider? currentUserProvider = null, ILogger<AppDbContextBase>? logger = null) :
base(options)
@ -26,10 +28,52 @@ public abstract class AppDbContextBase : DbContext, IDbContext
{
}
public IExecutionStrategy CreateExecutionStrategy() => Database.CreateExecutionStrategy();
public async Task BeginTransactionAsync(CancellationToken cancellationToken = default)
{
if (_currentTransaction != null) return;
_currentTransaction = await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
}
public async Task CommitTransactionAsync(CancellationToken cancellationToken = default)
{
try
{
await SaveChangesAsync(cancellationToken);
await _currentTransaction?.CommitAsync(cancellationToken)!;
}
catch
{
await RollbackTransactionAsync(cancellationToken);
throw;
}
finally
{
_currentTransaction?.Dispose();
_currentTransaction = null;
}
}
public async Task RollbackTransactionAsync(CancellationToken cancellationToken = default)
{
try
{
await _currentTransaction?.RollbackAsync(cancellationToken)!;
}
finally
{
_currentTransaction?.Dispose();
_currentTransaction = null;
}
}
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default)
{
var strategy = Database.CreateExecutionStrategy();
var strategy = CreateExecutionStrategy();
return strategy.ExecuteAsync(async () =>
{
await using var transaction =

View File

@ -7,6 +7,7 @@ namespace BuildingBlocks.EFCore;
using System.Transactions;
using PersistMessageProcessor;
using Polly;
public class EfTxBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull, IRequest<TResponse>
@ -48,10 +49,10 @@ public class EfTxBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TRe
nameof(EfTxBehavior<TRequest, TResponse>),
typeof(TRequest).FullName);
// ref: https://learn.microsoft.com/en-us/ef/core/saving/transactions#using-systemtransactions
using var scope = new TransactionScope(TransactionScopeOption.Required,
new TransactionOptions { IsolationLevel = IsolationLevel.ReadCommitted },
TransactionScopeAsyncFlowOption.Enabled);
//ref: https://learn.microsoft.com/en-us/ef/core/saving/transactions#using-systemtransactions
using var scope = new TransactionScope(TransactionScopeOption.Required,
new TransactionOptions { IsolationLevel = IsolationLevel.ReadCommitted },
TransactionScopeAsyncFlowOption.Enabled);
var response = await next();
@ -71,10 +72,23 @@ public class EfTxBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TRe
await _eventDispatcher.SendAsync(domainEvents.ToArray(), typeof(TRequest), cancellationToken);
await _dbContextBase.SaveChangesAsync(cancellationToken);
// Save data to database with some retry policy in distributed transaction
await _dbContextBase.RetryOnFailure(async () =>
{
await _dbContextBase.SaveChangesAsync(cancellationToken);
});
// Save data to database with some retry policy in distributed transaction
await _dbContextBase.RetryOnFailure(async () =>
{
await _dbContextBase.SaveChangesAsync(cancellationToken);
});
await _persistMessageDbContext.SaveChangesAsync(cancellationToken);
scope.Complete();
return response;
}
}
}

View File

@ -34,8 +34,6 @@ public static class Extensions
dbOptions =>
{
dbOptions.MigrationsAssembly(typeof(TContext).Assembly.GetName().Name);
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
// dbOptions.EnableRetryOnFailure(3, TimeSpan.FromSeconds(1), null);
})
// https://github.com/efcore/EFCore.NamingConventions
.UseSnakeCaseNamingConvention();

View File

@ -3,10 +3,16 @@ using Microsoft.EntityFrameworkCore;
namespace BuildingBlocks.EFCore;
using Microsoft.EntityFrameworkCore.Storage;
public interface IDbContext
{
DbSet<TEntity> Set<TEntity>() where TEntity : class;
IReadOnlyList<IDomainEvent> GetDomainEvents();
Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default);
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
Task BeginTransactionAsync(CancellationToken cancellationToken = default);
Task CommitTransactionAsync(CancellationToken cancellationToken = default);
Task RollbackTransactionAsync(CancellationToken cancellationToken = default);
IExecutionStrategy CreateExecutionStrategy();
Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default);
}

View File

@ -7,6 +7,7 @@ using Configurations;
using Core.Model;
using Microsoft.Extensions.Logging;
using Exception = System.Exception;
using IsolationLevel = System.Data.IsolationLevel;
public class PersistMessageDbContext : DbContext, IPersistMessageDbContext
{
@ -28,6 +29,27 @@ public class PersistMessageDbContext : DbContext, IPersistMessageDbContext
builder.ToSnakeCaseTables();
}
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default)
{
var strategy = Database.CreateExecutionStrategy();
return strategy.ExecuteAsync(async () =>
{
await using var transaction =
await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
try
{
await SaveChangesAsync(cancellationToken);
await transaction.CommitAsync(cancellationToken);
}
catch
{
await transaction.RollbackAsync(cancellationToken);
throw;
}
});
}
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
{
OnBeforeSaving();

View File

@ -6,4 +6,5 @@ public interface IPersistMessageDbContext
{
DbSet<PersistMessage> PersistMessages { get; }
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default);
}

View File

@ -10,6 +10,7 @@ using Microsoft.Extensions.Logging;
namespace BuildingBlocks.PersistMessageProcessor;
using Microsoft.EntityFrameworkCore;
using Polly;
public class PersistMessageProcessor : IPersistMessageProcessor
{
@ -199,7 +200,10 @@ public class PersistMessageProcessor : IPersistMessageProcessor
deliveryType),
cancellationToken);
await _persistMessageDbContext.SaveChangesAsync(cancellationToken);
await _persistMessageDbContext.RetryOnFailure(async () =>
{
await _persistMessageDbContext.SaveChangesAsync(cancellationToken);
});
_logger.LogInformation(
"Message with id: {MessageID} and delivery type: {DeliveryType} saved in persistence message store.",
@ -215,6 +219,9 @@ public class PersistMessageProcessor : IPersistMessageProcessor
_persistMessageDbContext.PersistMessages.Update(message);
await _persistMessageDbContext.SaveChangesAsync(cancellationToken);
await _persistMessageDbContext.RetryOnFailure(async () =>
{
await _persistMessageDbContext.SaveChangesAsync(cancellationToken);
});
}
}

View File

@ -0,0 +1,20 @@
namespace BuildingBlocks.Polly;
using global::Polly;
using Exception = System.Exception;
public static class Extensions
{
public static T RetryOnFailure<T>(this object retrySource, Func<T> action, int retryCount = 3)
{
var retryPolicy = Policy
.Handle<Exception>()
.Retry(retryCount, (exception, retryAttempt) =>
{
Console.WriteLine($"Retry attempt: {retryAttempt}");
Console.WriteLine($"Exception: {exception.Message}");
});
return retryPolicy.Execute(action);
}
}

View File

@ -22,6 +22,7 @@ public sealed class IdentityContext : IdentityDbContext<User, Role, Guid,
UserClaim, UserRole, UserLogin, RoleClaim, UserToken>, IDbContext
{
private readonly ILogger<IdentityContext>? _logger;
private IDbContextTransaction _currentTransaction;
public IdentityContext(DbContextOptions<IdentityContext> options, ILogger<IdentityContext>? logger = null) : base(options)
{
@ -36,10 +37,51 @@ public sealed class IdentityContext : IdentityDbContext<User, Role, Guid,
builder.ToSnakeCaseTables();
}
public IExecutionStrategy CreateExecutionStrategy() => Database.CreateExecutionStrategy();
public async Task BeginTransactionAsync(CancellationToken cancellationToken = default)
{
if (_currentTransaction != null) return;
_currentTransaction = await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
}
public async Task CommitTransactionAsync(CancellationToken cancellationToken = default)
{
try
{
await SaveChangesAsync(cancellationToken);
await _currentTransaction?.CommitAsync(cancellationToken)!;
}
catch
{
await RollbackTransactionAsync(cancellationToken);
throw;
}
finally
{
_currentTransaction?.Dispose();
_currentTransaction = null;
}
}
public async Task RollbackTransactionAsync(CancellationToken cancellationToken = default)
{
try
{
await _currentTransaction?.RollbackAsync(cancellationToken)!;
}
finally
{
_currentTransaction?.Dispose();
_currentTransaction = null;
}
}
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default)
{
var strategy = Database.CreateExecutionStrategy();
var strategy = CreateExecutionStrategy();
return strategy.ExecuteAsync(async () =>
{
await using var transaction =