mirror of
https://github.com/meysamhadeli/booking-microservices.git
synced 2026-05-16 13:01:13 +08:00
feat: Add retry failure for Npgsql
This commit is contained in:
parent
c4002bf295
commit
17082a2365
@ -6,66 +6,41 @@ using BuildingBlocks.Core.Model;
|
|||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using Microsoft.EntityFrameworkCore.Storage;
|
using Microsoft.EntityFrameworkCore.Storage;
|
||||||
using System.Data;
|
using System.Data;
|
||||||
using System.Security.Claims;
|
using Web;
|
||||||
using Microsoft.AspNetCore.Http;
|
|
||||||
using Exception = System.Exception;
|
using Exception = System.Exception;
|
||||||
|
|
||||||
public abstract class AppDbContextBase : DbContext, IDbContext
|
public abstract class AppDbContextBase : DbContext, IDbContext
|
||||||
{
|
{
|
||||||
private readonly IHttpContextAccessor _httpContextAccessor;
|
private readonly ICurrentUserProvider _currentUserProvider;
|
||||||
private IDbContextTransaction _currentTransaction;
|
|
||||||
|
|
||||||
protected AppDbContextBase(DbContextOptions options, IHttpContextAccessor httpContextAccessor = default) :
|
protected AppDbContextBase(DbContextOptions options, ICurrentUserProvider currentUserProvider) :
|
||||||
base(options)
|
base(options)
|
||||||
{
|
{
|
||||||
_httpContextAccessor = httpContextAccessor;
|
_currentUserProvider = currentUserProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override void OnModelCreating(ModelBuilder builder)
|
protected override void OnModelCreating(ModelBuilder builder)
|
||||||
{
|
{
|
||||||
// ref: https://github.com/pdevito3/MessageBusTestingInMemHarness/blob/main/RecipeManagement/src/RecipeManagement/Databases/RecipesDbContext.cs
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task BeginTransactionAsync(CancellationToken cancellationToken = default)
|
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
|
||||||
|
public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
if (_currentTransaction != null)
|
var strategy = Database.CreateExecutionStrategy();
|
||||||
|
return strategy.ExecuteAsync(async () =>
|
||||||
{
|
{
|
||||||
return;
|
await using var transaction = await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
|
||||||
}
|
try
|
||||||
|
{
|
||||||
_currentTransaction ??= await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
|
await SaveChangesAsync(cancellationToken);
|
||||||
}
|
await transaction.CommitAsync(cancellationToken);
|
||||||
|
}
|
||||||
public async Task CommitTransactionAsync(CancellationToken cancellationToken = default)
|
catch
|
||||||
{
|
{
|
||||||
try
|
await transaction.RollbackAsync(cancellationToken);
|
||||||
{
|
throw;
|
||||||
await SaveChangesAsync(cancellationToken);
|
}
|
||||||
await _currentTransaction?.CommitAsync(cancellationToken)!;
|
});
|
||||||
}
|
|
||||||
catch
|
|
||||||
{
|
|
||||||
await RollbackTransactionAsync(cancellationToken);
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
_currentTransaction?.Dispose();
|
|
||||||
_currentTransaction = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task RollbackTransactionAsync(CancellationToken cancellationToken = default)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
await _currentTransaction?.RollbackAsync(cancellationToken)!;
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
_currentTransaction?.Dispose();
|
|
||||||
_currentTransaction = null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
|
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
|
||||||
@ -100,7 +75,7 @@ public abstract class AppDbContextBase : DbContext, IDbContext
|
|||||||
foreach (var entry in ChangeTracker.Entries<IAggregate>())
|
foreach (var entry in ChangeTracker.Entries<IAggregate>())
|
||||||
{
|
{
|
||||||
var isAuditable = entry.Entity.GetType().IsAssignableTo(typeof(IAggregate));
|
var isAuditable = entry.Entity.GetType().IsAssignableTo(typeof(IAggregate));
|
||||||
var userId = GetCurrentUserId();
|
var userId = _currentUserProvider.GetCurrentUserId();
|
||||||
|
|
||||||
if (isAuditable)
|
if (isAuditable)
|
||||||
{
|
{
|
||||||
@ -133,13 +108,4 @@ public abstract class AppDbContextBase : DbContext, IDbContext
|
|||||||
throw new Exception("try for find IAggregate", ex);
|
throw new Exception("try for find IAggregate", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private long? GetCurrentUserId()
|
|
||||||
{
|
|
||||||
var nameIdentifier = _httpContextAccessor?.HttpContext?.User?.FindFirstValue(ClaimTypes.NameIdentifier);
|
|
||||||
|
|
||||||
long.TryParse(nameIdentifier, out var userId);
|
|
||||||
|
|
||||||
return userId;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -42,29 +42,20 @@ public class EfTxBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TRe
|
|||||||
nameof(EfTxBehavior<TRequest, TResponse>),
|
nameof(EfTxBehavior<TRequest, TResponse>),
|
||||||
typeof(TRequest).FullName);
|
typeof(TRequest).FullName);
|
||||||
|
|
||||||
await _dbContextBase.BeginTransactionAsync(cancellationToken);
|
|
||||||
|
|
||||||
try
|
var response = await next();
|
||||||
{
|
|
||||||
var response = await next();
|
|
||||||
|
|
||||||
_logger.LogInformation(
|
_logger.LogInformation(
|
||||||
"{Prefix} Executed the {MediatrRequest} request",
|
"{Prefix} Executed the {MediatrRequest} request",
|
||||||
nameof(EfTxBehavior<TRequest, TResponse>),
|
nameof(EfTxBehavior<TRequest, TResponse>),
|
||||||
typeof(TRequest).FullName);
|
typeof(TRequest).FullName);
|
||||||
|
|
||||||
var domainEvents = _dbContextBase.GetDomainEvents();
|
var domainEvents = _dbContextBase.GetDomainEvents();
|
||||||
|
|
||||||
await _eventDispatcher.SendAsync(domainEvents.ToArray(), typeof(TRequest), cancellationToken);
|
await _eventDispatcher.SendAsync(domainEvents.ToArray(), typeof(TRequest), cancellationToken);
|
||||||
|
|
||||||
await _dbContextBase.CommitTransactionAsync(cancellationToken);
|
await _dbContextBase.ExecuteTransactionalAsync(cancellationToken);
|
||||||
|
|
||||||
return response;
|
return response;
|
||||||
}
|
|
||||||
catch (System.Exception ex)
|
|
||||||
{
|
|
||||||
await _dbContextBase.RollbackTransactionAsync(cancellationToken);
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -35,6 +35,8 @@ public static class Extensions
|
|||||||
dbOptions =>
|
dbOptions =>
|
||||||
{
|
{
|
||||||
dbOptions.MigrationsAssembly(typeof(TContext).Assembly.GetName().Name);
|
dbOptions.MigrationsAssembly(typeof(TContext).Assembly.GetName().Name);
|
||||||
|
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
|
||||||
|
dbOptions.EnableRetryOnFailure(3, TimeSpan.FromSeconds(1), null);
|
||||||
})
|
})
|
||||||
// https://github.com/efcore/EFCore.NamingConventions
|
// https://github.com/efcore/EFCore.NamingConventions
|
||||||
.UseSnakeCaseNamingConvention();
|
.UseSnakeCaseNamingConvention();
|
||||||
|
|||||||
@ -7,8 +7,6 @@ public interface IDbContext
|
|||||||
{
|
{
|
||||||
DbSet<TEntity> Set<TEntity>() where TEntity : class;
|
DbSet<TEntity> Set<TEntity>() where TEntity : class;
|
||||||
IReadOnlyList<IDomainEvent> GetDomainEvents();
|
IReadOnlyList<IDomainEvent> GetDomainEvents();
|
||||||
Task BeginTransactionAsync(CancellationToken cancellationToken = default);
|
Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default);
|
||||||
Task CommitTransactionAsync(CancellationToken cancellationToken = default);
|
|
||||||
Task RollbackTransactionAsync(CancellationToken cancellationToken = default);
|
|
||||||
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
|
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,7 +16,7 @@ public class PersistMessageDbContext : DbContext, IPersistMessageDbContext
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
public DbSet<PersistMessage> PersistMessages { get; set; }
|
public DbSet<PersistMessage> PersistMessages => Set<PersistMessage>();
|
||||||
|
|
||||||
protected override void OnModelCreating(ModelBuilder builder)
|
protected override void OnModelCreating(ModelBuilder builder)
|
||||||
{
|
{
|
||||||
@ -48,13 +48,13 @@ public class PersistMessageDbContext : DbContext, IPersistMessageDbContext
|
|||||||
});
|
});
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await policy.ExecuteAsync(async () => await base.SaveChangesAsync(cancellationToken));
|
return await policy.ExecuteAsync(async () => await base.SaveChangesAsync(cancellationToken));
|
||||||
}
|
}
|
||||||
catch (DbUpdateConcurrencyException ex)
|
catch (DbUpdateConcurrencyException ex)
|
||||||
{
|
{
|
||||||
foreach (var entry in ex.Entries)
|
foreach (var entry in ex.Entries)
|
||||||
{
|
{
|
||||||
var currentEntity = entry.Entity;
|
var currentEntity = entry.Entity; // we can use it for specific merging
|
||||||
var databaseValues = await entry.GetDatabaseValuesAsync(cancellationToken);
|
var databaseValues = await entry.GetDatabaseValuesAsync(cancellationToken);
|
||||||
|
|
||||||
if (databaseValues != null)
|
if (databaseValues != null)
|
||||||
@ -65,8 +65,6 @@ public class PersistMessageDbContext : DbContext, IPersistMessageDbContext
|
|||||||
|
|
||||||
return await base.SaveChangesAsync(cancellationToken);
|
return await base.SaveChangesAsync(cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void OnBeforeSaving()
|
private void OnBeforeSaving()
|
||||||
|
|||||||
@ -2,10 +2,8 @@
|
|||||||
|
|
||||||
namespace BuildingBlocks.PersistMessageProcessor;
|
namespace BuildingBlocks.PersistMessageProcessor;
|
||||||
|
|
||||||
using EFCore;
|
|
||||||
|
|
||||||
public interface IPersistMessageDbContext
|
public interface IPersistMessageDbContext
|
||||||
{
|
{
|
||||||
DbSet<PersistMessage> PersistMessages { get; set; }
|
DbSet<PersistMessage> PersistMessages { get; }
|
||||||
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
|
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,11 +6,12 @@ using BuildingBlocks.IdsGenerator;
|
|||||||
using BuildingBlocks.Utils;
|
using BuildingBlocks.Utils;
|
||||||
using MassTransit;
|
using MassTransit;
|
||||||
using MediatR;
|
using MediatR;
|
||||||
using Microsoft.EntityFrameworkCore;
|
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace BuildingBlocks.PersistMessageProcessor;
|
namespace BuildingBlocks.PersistMessageProcessor;
|
||||||
|
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
public class PersistMessageProcessor : IPersistMessageProcessor
|
public class PersistMessageProcessor : IPersistMessageProcessor
|
||||||
{
|
{
|
||||||
private readonly ILogger<PersistMessageProcessor> _logger;
|
private readonly ILogger<PersistMessageProcessor> _logger;
|
||||||
|
|||||||
@ -6,12 +6,12 @@ using Microsoft.EntityFrameworkCore;
|
|||||||
|
|
||||||
namespace Flight.Data;
|
namespace Flight.Data;
|
||||||
|
|
||||||
using Microsoft.AspNetCore.Http;
|
using BuildingBlocks.Web;
|
||||||
|
|
||||||
public sealed class FlightDbContext : AppDbContextBase
|
public sealed class FlightDbContext : AppDbContextBase
|
||||||
{
|
{
|
||||||
public FlightDbContext(DbContextOptions<FlightDbContext> options, IHttpContextAccessor httpContextAccessor = default) : base(
|
public FlightDbContext(DbContextOptions<FlightDbContext> options, ICurrentUserProvider currentUserProvider = default) : base(
|
||||||
options, httpContextAccessor)
|
options, currentUserProvider)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -23,8 +23,8 @@ public sealed class FlightDbContext : AppDbContextBase
|
|||||||
protected override void OnModelCreating(ModelBuilder builder)
|
protected override void OnModelCreating(ModelBuilder builder)
|
||||||
{
|
{
|
||||||
base.OnModelCreating(builder);
|
base.OnModelCreating(builder);
|
||||||
builder.FilterSoftDeletedProperties();
|
|
||||||
builder.ApplyConfigurationsFromAssembly(typeof(FlightRoot).Assembly);
|
builder.ApplyConfigurationsFromAssembly(typeof(FlightRoot).Assembly);
|
||||||
|
builder.FilterSoftDeletedProperties();
|
||||||
builder.ToSnakeCaseTables();
|
builder.ToSnakeCaseTables();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -34,54 +34,32 @@ public sealed class IdentityContext : IdentityDbContext<User, Role, long,
|
|||||||
builder.ToSnakeCaseTables();
|
builder.ToSnakeCaseTables();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
|
||||||
|
public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
var strategy = Database.CreateExecutionStrategy();
|
||||||
|
return strategy.ExecuteAsync(async () =>
|
||||||
|
{
|
||||||
|
await using var transaction = await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await SaveChangesAsync(cancellationToken);
|
||||||
|
await transaction.CommitAsync(cancellationToken);
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
await transaction.RollbackAsync(cancellationToken);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
|
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
OnBeforeSaving();
|
OnBeforeSaving();
|
||||||
return await base.SaveChangesAsync(cancellationToken);
|
return await base.SaveChangesAsync(cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task BeginTransactionAsync(CancellationToken cancellationToken = default)
|
|
||||||
{
|
|
||||||
if (_currentTransaction != null)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
_currentTransaction = await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task CommitTransactionAsync(CancellationToken cancellationToken = default)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
await SaveChangesAsync(cancellationToken);
|
|
||||||
await _currentTransaction?.CommitAsync(cancellationToken)!;
|
|
||||||
}
|
|
||||||
catch
|
|
||||||
{
|
|
||||||
await RollbackTransactionAsync(cancellationToken);
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
_currentTransaction?.Dispose();
|
|
||||||
_currentTransaction = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task RollbackTransactionAsync(CancellationToken cancellationToken = default)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
await _currentTransaction?.RollbackAsync(cancellationToken)!;
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
_currentTransaction?.Dispose();
|
|
||||||
_currentTransaction = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public IReadOnlyList<IDomainEvent> GetDomainEvents()
|
public IReadOnlyList<IDomainEvent> GetDomainEvents()
|
||||||
{
|
{
|
||||||
var domainEntities = ChangeTracker
|
var domainEntities = ChangeTracker
|
||||||
|
|||||||
@ -1,16 +1,15 @@
|
|||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
using BuildingBlocks.EFCore;
|
using BuildingBlocks.EFCore;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using BuildingBlocks.Web;
|
||||||
|
|
||||||
namespace Passenger.Data;
|
namespace Passenger.Data;
|
||||||
|
|
||||||
using Microsoft.AspNetCore.Http;
|
|
||||||
|
|
||||||
public sealed class PassengerDbContext : AppDbContextBase
|
public sealed class PassengerDbContext : AppDbContextBase
|
||||||
{
|
{
|
||||||
public PassengerDbContext(DbContextOptions<PassengerDbContext> options,
|
public PassengerDbContext(DbContextOptions<PassengerDbContext> options,
|
||||||
IHttpContextAccessor httpContextAccessor = default) :
|
ICurrentUserProvider currentUserProvider = default) :
|
||||||
base(options, httpContextAccessor)
|
base(options, currentUserProvider)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -20,6 +19,7 @@ public sealed class PassengerDbContext : AppDbContextBase
|
|||||||
{
|
{
|
||||||
builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly());
|
builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly());
|
||||||
base.OnModelCreating(builder);
|
base.OnModelCreating(builder);
|
||||||
|
builder.FilterSoftDeletedProperties();
|
||||||
builder.ToSnakeCaseTables();
|
builder.ToSnakeCaseTables();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user