From 66cd7c54be249de66f12251818508cdaf480dd8a Mon Sep 17 00:00:00 2001 From: Pc Date: Sun, 4 Jun 2023 03:17:45 +0330 Subject: [PATCH] feat: Add retry for distributed transaction in postgres --- src/BuildingBlocks/EFCore/AppDbContextBase.cs | 46 ++++++++++++++++++- src/BuildingBlocks/EFCore/EfTxBehavior.cs | 24 ++++++++-- src/BuildingBlocks/EFCore/Extensions.cs | 2 - src/BuildingBlocks/EFCore/IDbContext.cs | 8 +++- .../Data/PersistMessageDbContext.cs | 22 +++++++++ .../IPersistMessageDbContext.cs | 1 + .../PersistMessageProcessor.cs | 11 ++++- src/BuildingBlocks/Polly/Extensions.cs | 20 ++++++++ .../src/Identity/Data/IdentityContext.cs | 44 +++++++++++++++++- 9 files changed, 166 insertions(+), 12 deletions(-) create mode 100644 src/BuildingBlocks/Polly/Extensions.cs diff --git a/src/BuildingBlocks/EFCore/AppDbContextBase.cs b/src/BuildingBlocks/EFCore/AppDbContextBase.cs index 9d33010..b9d1996 100644 --- a/src/BuildingBlocks/EFCore/AppDbContextBase.cs +++ b/src/BuildingBlocks/EFCore/AppDbContextBase.cs @@ -4,6 +4,7 @@ using System.Collections.Immutable; using Core.Event; using Core.Model; using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.Logging; using Web; using Exception = System.Exception; @@ -13,6 +14,7 @@ public abstract class AppDbContextBase : DbContext, IDbContext { private readonly ICurrentUserProvider? _currentUserProvider; private readonly ILogger? _logger; + private IDbContextTransaction _currentTransaction; protected AppDbContextBase(DbContextOptions options, ICurrentUserProvider? currentUserProvider = null, ILogger? logger = null) : base(options) @@ -26,10 +28,52 @@ public abstract class AppDbContextBase : DbContext, IDbContext { } + public IExecutionStrategy CreateExecutionStrategy() => Database.CreateExecutionStrategy(); + + public async Task BeginTransactionAsync(CancellationToken cancellationToken = default) + { + if (_currentTransaction != null) return; + + _currentTransaction = await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken); + } + + public async Task CommitTransactionAsync(CancellationToken cancellationToken = default) + { + try + { + await SaveChangesAsync(cancellationToken); + await _currentTransaction?.CommitAsync(cancellationToken)!; + } + catch + { + await RollbackTransactionAsync(cancellationToken); + throw; + } + finally + { + _currentTransaction?.Dispose(); + _currentTransaction = null; + } + } + + public async Task RollbackTransactionAsync(CancellationToken cancellationToken = default) + { + try + { + await _currentTransaction?.RollbackAsync(cancellationToken)!; + } + finally + { + _currentTransaction?.Dispose(); + _currentTransaction = null; + } + } + + //ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default) { - var strategy = Database.CreateExecutionStrategy(); + var strategy = CreateExecutionStrategy(); return strategy.ExecuteAsync(async () => { await using var transaction = diff --git a/src/BuildingBlocks/EFCore/EfTxBehavior.cs b/src/BuildingBlocks/EFCore/EfTxBehavior.cs index 87a09ec..f7e6eb9 100644 --- a/src/BuildingBlocks/EFCore/EfTxBehavior.cs +++ b/src/BuildingBlocks/EFCore/EfTxBehavior.cs @@ -7,6 +7,7 @@ namespace BuildingBlocks.EFCore; using System.Transactions; using PersistMessageProcessor; +using Polly; public class EfTxBehavior : IPipelineBehavior where TRequest : notnull, IRequest @@ -48,10 +49,10 @@ public class EfTxBehavior : IPipelineBehavior), typeof(TRequest).FullName); - // ref: https://learn.microsoft.com/en-us/ef/core/saving/transactions#using-systemtransactions - using var scope = new TransactionScope(TransactionScopeOption.Required, - new TransactionOptions { IsolationLevel = IsolationLevel.ReadCommitted }, - TransactionScopeAsyncFlowOption.Enabled); + //ref: https://learn.microsoft.com/en-us/ef/core/saving/transactions#using-systemtransactions + using var scope = new TransactionScope(TransactionScopeOption.Required, + new TransactionOptions { IsolationLevel = IsolationLevel.ReadCommitted }, + TransactionScopeAsyncFlowOption.Enabled); var response = await next(); @@ -71,10 +72,23 @@ public class EfTxBehavior : IPipelineBehavior + { + await _dbContextBase.SaveChangesAsync(cancellationToken); + }); + + // Save data to database with some retry policy in distributed transaction + await _dbContextBase.RetryOnFailure(async () => + { + await _dbContextBase.SaveChangesAsync(cancellationToken); + }); + await _persistMessageDbContext.SaveChangesAsync(cancellationToken); scope.Complete(); + + return response; } } } diff --git a/src/BuildingBlocks/EFCore/Extensions.cs b/src/BuildingBlocks/EFCore/Extensions.cs index e81c336..497ffbd 100644 --- a/src/BuildingBlocks/EFCore/Extensions.cs +++ b/src/BuildingBlocks/EFCore/Extensions.cs @@ -34,8 +34,6 @@ public static class Extensions dbOptions => { dbOptions.MigrationsAssembly(typeof(TContext).Assembly.GetName().Name); - //ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency - // dbOptions.EnableRetryOnFailure(3, TimeSpan.FromSeconds(1), null); }) // https://github.com/efcore/EFCore.NamingConventions .UseSnakeCaseNamingConvention(); diff --git a/src/BuildingBlocks/EFCore/IDbContext.cs b/src/BuildingBlocks/EFCore/IDbContext.cs index 48ac238..4bb10b6 100644 --- a/src/BuildingBlocks/EFCore/IDbContext.cs +++ b/src/BuildingBlocks/EFCore/IDbContext.cs @@ -3,10 +3,16 @@ using Microsoft.EntityFrameworkCore; namespace BuildingBlocks.EFCore; +using Microsoft.EntityFrameworkCore.Storage; + public interface IDbContext { DbSet Set() where TEntity : class; IReadOnlyList GetDomainEvents(); - Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default); Task SaveChangesAsync(CancellationToken cancellationToken = default); + Task BeginTransactionAsync(CancellationToken cancellationToken = default); + Task CommitTransactionAsync(CancellationToken cancellationToken = default); + Task RollbackTransactionAsync(CancellationToken cancellationToken = default); + IExecutionStrategy CreateExecutionStrategy(); + Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default); } diff --git a/src/BuildingBlocks/PersistMessageProcessor/Data/PersistMessageDbContext.cs b/src/BuildingBlocks/PersistMessageProcessor/Data/PersistMessageDbContext.cs index 243f274..e7eacb7 100644 --- a/src/BuildingBlocks/PersistMessageProcessor/Data/PersistMessageDbContext.cs +++ b/src/BuildingBlocks/PersistMessageProcessor/Data/PersistMessageDbContext.cs @@ -7,6 +7,7 @@ using Configurations; using Core.Model; using Microsoft.Extensions.Logging; using Exception = System.Exception; +using IsolationLevel = System.Data.IsolationLevel; public class PersistMessageDbContext : DbContext, IPersistMessageDbContext { @@ -28,6 +29,27 @@ public class PersistMessageDbContext : DbContext, IPersistMessageDbContext builder.ToSnakeCaseTables(); } + //ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions + public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default) + { + var strategy = Database.CreateExecutionStrategy(); + return strategy.ExecuteAsync(async () => + { + await using var transaction = + await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken); + try + { + await SaveChangesAsync(cancellationToken); + await transaction.CommitAsync(cancellationToken); + } + catch + { + await transaction.RollbackAsync(cancellationToken); + throw; + } + }); + } + public override async Task SaveChangesAsync(CancellationToken cancellationToken = default) { OnBeforeSaving(); diff --git a/src/BuildingBlocks/PersistMessageProcessor/IPersistMessageDbContext.cs b/src/BuildingBlocks/PersistMessageProcessor/IPersistMessageDbContext.cs index e74d944..2995a31 100644 --- a/src/BuildingBlocks/PersistMessageProcessor/IPersistMessageDbContext.cs +++ b/src/BuildingBlocks/PersistMessageProcessor/IPersistMessageDbContext.cs @@ -6,4 +6,5 @@ public interface IPersistMessageDbContext { DbSet PersistMessages { get; } Task SaveChangesAsync(CancellationToken cancellationToken = default); + Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default); } diff --git a/src/BuildingBlocks/PersistMessageProcessor/PersistMessageProcessor.cs b/src/BuildingBlocks/PersistMessageProcessor/PersistMessageProcessor.cs index d34d587..9ba47af 100644 --- a/src/BuildingBlocks/PersistMessageProcessor/PersistMessageProcessor.cs +++ b/src/BuildingBlocks/PersistMessageProcessor/PersistMessageProcessor.cs @@ -10,6 +10,7 @@ using Microsoft.Extensions.Logging; namespace BuildingBlocks.PersistMessageProcessor; using Microsoft.EntityFrameworkCore; +using Polly; public class PersistMessageProcessor : IPersistMessageProcessor { @@ -199,7 +200,10 @@ public class PersistMessageProcessor : IPersistMessageProcessor deliveryType), cancellationToken); - await _persistMessageDbContext.SaveChangesAsync(cancellationToken); + await _persistMessageDbContext.RetryOnFailure(async () => + { + await _persistMessageDbContext.SaveChangesAsync(cancellationToken); + }); _logger.LogInformation( "Message with id: {MessageID} and delivery type: {DeliveryType} saved in persistence message store.", @@ -215,6 +219,9 @@ public class PersistMessageProcessor : IPersistMessageProcessor _persistMessageDbContext.PersistMessages.Update(message); - await _persistMessageDbContext.SaveChangesAsync(cancellationToken); + await _persistMessageDbContext.RetryOnFailure(async () => + { + await _persistMessageDbContext.SaveChangesAsync(cancellationToken); + }); } } diff --git a/src/BuildingBlocks/Polly/Extensions.cs b/src/BuildingBlocks/Polly/Extensions.cs new file mode 100644 index 0000000..81fe460 --- /dev/null +++ b/src/BuildingBlocks/Polly/Extensions.cs @@ -0,0 +1,20 @@ +namespace BuildingBlocks.Polly; + +using global::Polly; +using Exception = System.Exception; + +public static class Extensions +{ + public static T RetryOnFailure(this object retrySource, Func action, int retryCount = 3) + { + var retryPolicy = Policy + .Handle() + .Retry(retryCount, (exception, retryAttempt) => + { + Console.WriteLine($"Retry attempt: {retryAttempt}"); + Console.WriteLine($"Exception: {exception.Message}"); + }); + + return retryPolicy.Execute(action); + } +} diff --git a/src/Services/Identity/src/Identity/Data/IdentityContext.cs b/src/Services/Identity/src/Identity/Data/IdentityContext.cs index 2195862..b896a98 100644 --- a/src/Services/Identity/src/Identity/Data/IdentityContext.cs +++ b/src/Services/Identity/src/Identity/Data/IdentityContext.cs @@ -22,6 +22,7 @@ public sealed class IdentityContext : IdentityDbContext, IDbContext { private readonly ILogger? _logger; + private IDbContextTransaction _currentTransaction; public IdentityContext(DbContextOptions options, ILogger? logger = null) : base(options) { @@ -36,10 +37,51 @@ public sealed class IdentityContext : IdentityDbContext Database.CreateExecutionStrategy(); + + public async Task BeginTransactionAsync(CancellationToken cancellationToken = default) + { + if (_currentTransaction != null) return; + + _currentTransaction = await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken); + } + + public async Task CommitTransactionAsync(CancellationToken cancellationToken = default) + { + try + { + await SaveChangesAsync(cancellationToken); + await _currentTransaction?.CommitAsync(cancellationToken)!; + } + catch + { + await RollbackTransactionAsync(cancellationToken); + throw; + } + finally + { + _currentTransaction?.Dispose(); + _currentTransaction = null; + } + } + + public async Task RollbackTransactionAsync(CancellationToken cancellationToken = default) + { + try + { + await _currentTransaction?.RollbackAsync(cancellationToken)!; + } + finally + { + _currentTransaction?.Dispose(); + _currentTransaction = null; + } + } + //ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default) { - var strategy = Database.CreateExecutionStrategy(); + var strategy = CreateExecutionStrategy(); return strategy.ExecuteAsync(async () => { await using var transaction =