Merge pull request #121 from meysamhadeli/feat/add-retry-failure-for-npgsql

feat: Add retry failure for Npgsql
This commit is contained in:
Meysam Hadeli 2023-01-23 16:38:46 +03:30 committed by GitHub
commit bbcfdf1f82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 112 additions and 169 deletions

View File

@ -4,68 +4,42 @@ using System.Collections.Immutable;
using BuildingBlocks.Core.Event; using BuildingBlocks.Core.Event;
using BuildingBlocks.Core.Model; using BuildingBlocks.Core.Model;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using System.Data; using System.Data;
using System.Security.Claims; using Web;
using Microsoft.AspNetCore.Http;
using Exception = System.Exception; using Exception = System.Exception;
public abstract class AppDbContextBase : DbContext, IDbContext public abstract class AppDbContextBase : DbContext, IDbContext
{ {
private readonly IHttpContextAccessor _httpContextAccessor; private readonly ICurrentUserProvider _currentUserProvider;
private IDbContextTransaction _currentTransaction;
protected AppDbContextBase(DbContextOptions options, IHttpContextAccessor httpContextAccessor = default) : protected AppDbContextBase(DbContextOptions options, ICurrentUserProvider currentUserProvider) :
base(options) base(options)
{ {
_httpContextAccessor = httpContextAccessor; _currentUserProvider = currentUserProvider;
} }
protected override void OnModelCreating(ModelBuilder builder) protected override void OnModelCreating(ModelBuilder builder)
{ {
// ref: https://github.com/pdevito3/MessageBusTestingInMemHarness/blob/main/RecipeManagement/src/RecipeManagement/Databases/RecipesDbContext.cs
} }
public async Task BeginTransactionAsync(CancellationToken cancellationToken = default) //ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default)
{ {
if (_currentTransaction != null) var strategy = Database.CreateExecutionStrategy();
return strategy.ExecuteAsync(async () =>
{ {
return; await using var transaction = await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
} try
{
_currentTransaction ??= await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken); await SaveChangesAsync(cancellationToken);
} await transaction.CommitAsync(cancellationToken);
}
public async Task CommitTransactionAsync(CancellationToken cancellationToken = default) catch
{ {
try await transaction.RollbackAsync(cancellationToken);
{ throw;
await SaveChangesAsync(cancellationToken); }
await _currentTransaction?.CommitAsync(cancellationToken)!; });
}
catch
{
await RollbackTransactionAsync(cancellationToken);
throw;
}
finally
{
_currentTransaction?.Dispose();
_currentTransaction = null;
}
}
public async Task RollbackTransactionAsync(CancellationToken cancellationToken = default)
{
try
{
await _currentTransaction?.RollbackAsync(cancellationToken)!;
}
finally
{
_currentTransaction?.Dispose();
_currentTransaction = null;
}
} }
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default) public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
@ -100,7 +74,7 @@ public abstract class AppDbContextBase : DbContext, IDbContext
foreach (var entry in ChangeTracker.Entries<IAggregate>()) foreach (var entry in ChangeTracker.Entries<IAggregate>())
{ {
var isAuditable = entry.Entity.GetType().IsAssignableTo(typeof(IAggregate)); var isAuditable = entry.Entity.GetType().IsAssignableTo(typeof(IAggregate));
var userId = GetCurrentUserId(); var userId = _currentUserProvider?.GetCurrentUserId() ?? 0;
if (isAuditable) if (isAuditable)
{ {
@ -133,13 +107,4 @@ public abstract class AppDbContextBase : DbContext, IDbContext
throw new Exception("try for find IAggregate", ex); throw new Exception("try for find IAggregate", ex);
} }
} }
private long? GetCurrentUserId()
{
var nameIdentifier = _httpContextAccessor?.HttpContext?.User?.FindFirstValue(ClaimTypes.NameIdentifier);
long.TryParse(nameIdentifier, out var userId);
return userId;
}
} }

View File

@ -42,29 +42,20 @@ public class EfTxBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TRe
nameof(EfTxBehavior<TRequest, TResponse>), nameof(EfTxBehavior<TRequest, TResponse>),
typeof(TRequest).FullName); typeof(TRequest).FullName);
await _dbContextBase.BeginTransactionAsync(cancellationToken);
try var response = await next();
{
var response = await next();
_logger.LogInformation( _logger.LogInformation(
"{Prefix} Executed the {MediatrRequest} request", "{Prefix} Executed the {MediatrRequest} request",
nameof(EfTxBehavior<TRequest, TResponse>), nameof(EfTxBehavior<TRequest, TResponse>),
typeof(TRequest).FullName); typeof(TRequest).FullName);
var domainEvents = _dbContextBase.GetDomainEvents(); var domainEvents = _dbContextBase.GetDomainEvents();
await _eventDispatcher.SendAsync(domainEvents.ToArray(), typeof(TRequest), cancellationToken); await _eventDispatcher.SendAsync(domainEvents.ToArray(), typeof(TRequest), cancellationToken);
await _dbContextBase.CommitTransactionAsync(cancellationToken); await _dbContextBase.ExecuteTransactionalAsync(cancellationToken);
return response; return response;
}
catch (System.Exception ex)
{
await _dbContextBase.RollbackTransactionAsync(cancellationToken);
throw;
}
} }
} }

View File

@ -32,10 +32,12 @@ public static class Extensions
Guard.Against.Null(options, nameof(postgresOptions)); Guard.Against.Null(options, nameof(postgresOptions));
options.UseNpgsql(postgresOptions?.ConnectionString, options.UseNpgsql(postgresOptions?.ConnectionString,
dbOptions => dbOptions =>
{ {
dbOptions.MigrationsAssembly(typeof(TContext).Assembly.GetName().Name); dbOptions.MigrationsAssembly(typeof(TContext).Assembly.GetName().Name);
}) //ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
dbOptions.EnableRetryOnFailure(3, TimeSpan.FromSeconds(1), null);
})
// https://github.com/efcore/EFCore.NamingConventions // https://github.com/efcore/EFCore.NamingConventions
.UseSnakeCaseNamingConvention(); .UseSnakeCaseNamingConvention();
}); });
@ -86,7 +88,8 @@ public static class Extensions
// Replace table names // Replace table names
entity.SetTableName(entity.GetTableName()?.Underscore()); entity.SetTableName(entity.GetTableName()?.Underscore());
var tableObjectIdentifier = StoreObjectIdentifier.Table(entity.GetTableName()?.Underscore()!, entity.GetSchema()); var tableObjectIdentifier =
StoreObjectIdentifier.Table(entity.GetTableName()?.Underscore()!, entity.GetSchema());
// Replace column names // Replace column names
foreach (var property in entity.GetProperties()) foreach (var property in entity.GetProperties())

View File

@ -7,8 +7,6 @@ public interface IDbContext
{ {
DbSet<TEntity> Set<TEntity>() where TEntity : class; DbSet<TEntity> Set<TEntity>() where TEntity : class;
IReadOnlyList<IDomainEvent> GetDomainEvents(); IReadOnlyList<IDomainEvent> GetDomainEvents();
Task BeginTransactionAsync(CancellationToken cancellationToken = default); Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default);
Task CommitTransactionAsync(CancellationToken cancellationToken = default);
Task RollbackTransactionAsync(CancellationToken cancellationToken = default);
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default); Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
} }

View File

@ -8,6 +8,7 @@ using Configurations;
using Core.Model; using Core.Model;
using global::Polly; using global::Polly;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Exception = System.Exception;
public class PersistMessageDbContext : DbContext, IPersistMessageDbContext public class PersistMessageDbContext : DbContext, IPersistMessageDbContext
{ {
@ -16,7 +17,7 @@ public class PersistMessageDbContext : DbContext, IPersistMessageDbContext
{ {
} }
public DbSet<PersistMessage> PersistMessages { get; set; } public DbSet<PersistMessage> PersistMessages => Set<PersistMessage>();
protected override void OnModelCreating(ModelBuilder builder) protected override void OnModelCreating(ModelBuilder builder)
{ {
@ -48,13 +49,13 @@ public class PersistMessageDbContext : DbContext, IPersistMessageDbContext
}); });
try try
{ {
await policy.ExecuteAsync(async () => await base.SaveChangesAsync(cancellationToken)); return await policy.ExecuteAsync(async () => await base.SaveChangesAsync(cancellationToken));
} }
catch (DbUpdateConcurrencyException ex) catch (DbUpdateConcurrencyException ex)
{ {
foreach (var entry in ex.Entries) foreach (var entry in ex.Entries)
{ {
var currentEntity = entry.Entity; var currentEntity = entry.Entity; // we can use it for specific merging
var databaseValues = await entry.GetDatabaseValuesAsync(cancellationToken); var databaseValues = await entry.GetDatabaseValuesAsync(cancellationToken);
if (databaseValues != null) if (databaseValues != null)
@ -65,24 +66,29 @@ public class PersistMessageDbContext : DbContext, IPersistMessageDbContext
return await base.SaveChangesAsync(cancellationToken); return await base.SaveChangesAsync(cancellationToken);
} }
return 0;
} }
private void OnBeforeSaving() private void OnBeforeSaving()
{ {
foreach (var entry in ChangeTracker.Entries<IVersion>()) try
{ {
switch (entry.State) foreach (var entry in ChangeTracker.Entries<IVersion>())
{ {
case EntityState.Modified: switch (entry.State)
entry.Entity.Version++; {
break; case EntityState.Modified:
entry.Entity.Version++;
break;
case EntityState.Deleted: case EntityState.Deleted:
entry.Entity.Version++; entry.Entity.Version++;
break; break;
}
} }
} }
catch (Exception ex)
{
throw new Exception("try for find IVersion", ex);
}
} }
} }

View File

@ -2,10 +2,8 @@
namespace BuildingBlocks.PersistMessageProcessor; namespace BuildingBlocks.PersistMessageProcessor;
using EFCore;
public interface IPersistMessageDbContext public interface IPersistMessageDbContext
{ {
DbSet<PersistMessage> PersistMessages { get; set; } DbSet<PersistMessage> PersistMessages { get; }
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default); Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
} }

View File

@ -6,11 +6,12 @@ using BuildingBlocks.IdsGenerator;
using BuildingBlocks.Utils; using BuildingBlocks.Utils;
using MassTransit; using MassTransit;
using MediatR; using MediatR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
namespace BuildingBlocks.PersistMessageProcessor; namespace BuildingBlocks.PersistMessageProcessor;
using Microsoft.EntityFrameworkCore;
public class PersistMessageProcessor : IPersistMessageProcessor public class PersistMessageProcessor : IPersistMessageProcessor
{ {
private readonly ILogger<PersistMessageProcessor> _logger; private readonly ILogger<PersistMessageProcessor> _logger;

View File

@ -11,7 +11,7 @@ namespace Flight.Data
builder.UseNpgsql("Server=localhost;Port=5432;Database=flight;User Id=postgres;Password=postgres;Include Error Detail=true") builder.UseNpgsql("Server=localhost;Port=5432;Database=flight;User Id=postgres;Password=postgres;Include Error Detail=true")
.UseSnakeCaseNamingConvention(); .UseSnakeCaseNamingConvention();
return new FlightDbContext(builder.Options); return new FlightDbContext(builder.Options, null);
} }
} }
} }

View File

@ -6,12 +6,12 @@ using Microsoft.EntityFrameworkCore;
namespace Flight.Data; namespace Flight.Data;
using Microsoft.AspNetCore.Http; using BuildingBlocks.Web;
public sealed class FlightDbContext : AppDbContextBase public sealed class FlightDbContext : AppDbContextBase
{ {
public FlightDbContext(DbContextOptions<FlightDbContext> options, IHttpContextAccessor httpContextAccessor = default) : base( public FlightDbContext(DbContextOptions<FlightDbContext> options, ICurrentUserProvider currentUserProvider) : base(
options, httpContextAccessor) options, currentUserProvider)
{ {
} }
@ -23,8 +23,8 @@ public sealed class FlightDbContext : AppDbContextBase
protected override void OnModelCreating(ModelBuilder builder) protected override void OnModelCreating(ModelBuilder builder)
{ {
base.OnModelCreating(builder); base.OnModelCreating(builder);
builder.FilterSoftDeletedProperties();
builder.ApplyConfigurationsFromAssembly(typeof(FlightRoot).Assembly); builder.ApplyConfigurationsFromAssembly(typeof(FlightRoot).Assembly);
builder.FilterSoftDeletedProperties();
builder.ToSnakeCaseTables(); builder.ToSnakeCaseTables();
} }
} }

View File

@ -1,12 +1,8 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using Flight.Aircrafts.Models;
using Flight.Airports.Models;
using Flight.Data; using Flight.Data;
using Flight.Flights.Enums; using Flight.Flights.Enums;
using Flight.Flights.Models;
using Flight.Seats.Enums; using Flight.Seats.Enums;
using Flight.Seats.Models;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
namespace Unit.Test.Common namespace Unit.Test.Common
@ -18,7 +14,7 @@ namespace Unit.Test.Common
var options = new DbContextOptionsBuilder<FlightDbContext>() var options = new DbContextOptionsBuilder<FlightDbContext>()
.UseInMemoryDatabase(databaseName: Guid.NewGuid().ToString()).Options; .UseInMemoryDatabase(databaseName: Guid.NewGuid().ToString()).Options;
var context = new FlightDbContext(options); var context = new FlightDbContext(options, currentUserProvider: null);
// Seed our data // Seed our data
FlightDataSeeder(context); FlightDataSeeder(context);

View File

@ -16,13 +16,12 @@ using Microsoft.EntityFrameworkCore.Storage;
namespace Identity.Data; namespace Identity.Data;
using System;
public sealed class IdentityContext : IdentityDbContext<User, Role, long, public sealed class IdentityContext : IdentityDbContext<User, Role, long,
UserClaim, UserRole, UserLogin, RoleClaim, UserToken>, IDbContext UserClaim, UserRole, UserLogin, RoleClaim, UserToken>, IDbContext
{ {
private IDbContextTransaction _currentTransaction; public IdentityContext(DbContextOptions<IdentityContext> options) : base(options)
public IdentityContext(DbContextOptions<IdentityContext> options, IHttpContextAccessor httpContextAccessor = default) :
base(options)
{ {
} }
@ -34,54 +33,33 @@ public sealed class IdentityContext : IdentityDbContext<User, Role, long,
builder.ToSnakeCaseTables(); builder.ToSnakeCaseTables();
} }
//ref: https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency#execution-strategies-and-transactions
public Task ExecuteTransactionalAsync(CancellationToken cancellationToken = default)
{
var strategy = Database.CreateExecutionStrategy();
return strategy.ExecuteAsync(async () =>
{
await using var transaction =
await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
try
{
await SaveChangesAsync(cancellationToken);
await transaction.CommitAsync(cancellationToken);
}
catch
{
await transaction.RollbackAsync(cancellationToken);
throw;
}
});
}
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default) public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
{ {
OnBeforeSaving(); OnBeforeSaving();
return await base.SaveChangesAsync(cancellationToken); return await base.SaveChangesAsync(cancellationToken);
} }
public async Task BeginTransactionAsync(CancellationToken cancellationToken = default)
{
if (_currentTransaction != null)
{
return;
}
_currentTransaction = await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
}
public async Task CommitTransactionAsync(CancellationToken cancellationToken = default)
{
try
{
await SaveChangesAsync(cancellationToken);
await _currentTransaction?.CommitAsync(cancellationToken)!;
}
catch
{
await RollbackTransactionAsync(cancellationToken);
throw;
}
finally
{
_currentTransaction?.Dispose();
_currentTransaction = null;
}
}
public async Task RollbackTransactionAsync(CancellationToken cancellationToken = default)
{
try
{
await _currentTransaction?.RollbackAsync(cancellationToken)!;
}
finally
{
_currentTransaction?.Dispose();
_currentTransaction = null;
}
}
public IReadOnlyList<IDomainEvent> GetDomainEvents() public IReadOnlyList<IDomainEvent> GetDomainEvents()
{ {
var domainEntities = ChangeTracker var domainEntities = ChangeTracker
@ -101,18 +79,25 @@ public sealed class IdentityContext : IdentityDbContext<User, Role, long,
private void OnBeforeSaving() private void OnBeforeSaving()
{ {
foreach (var entry in ChangeTracker.Entries<IVersion>()) try
{ {
switch (entry.State) foreach (var entry in ChangeTracker.Entries<IVersion>())
{ {
case EntityState.Modified: switch (entry.State)
entry.Entity.Version++; {
break; case EntityState.Modified:
entry.Entity.Version++;
break;
case EntityState.Deleted: case EntityState.Deleted:
entry.Entity.Version++; entry.Entity.Version++;
break; break;
}
} }
} }
catch (Exception ex)
{
throw new Exception("try for find IVersion", ex);
}
} }
} }

View File

@ -33,6 +33,7 @@ public static class InfrastructureExtensions
var configuration = builder.Configuration; var configuration = builder.Configuration;
var env = builder.Environment; var env = builder.Environment;
builder.Services.AddScoped<ICurrentUserProvider, CurrentUserProvider>();
builder.Services.AddScoped<IEventMapper, EventMapper>(); builder.Services.AddScoped<IEventMapper, EventMapper>();
builder.Services.AddScoped<IEventDispatcher, EventDispatcher>(); builder.Services.AddScoped<IEventDispatcher, EventDispatcher>();

View File

@ -11,6 +11,6 @@ public class DesignTimeDbContextFactory: IDesignTimeDbContextFactory<PassengerDb
builder.UseNpgsql("Server=localhost;Port=5432;Database=passenger;User Id=postgres;Password=postgres;Include Error Detail=true") builder.UseNpgsql("Server=localhost;Port=5432;Database=passenger;User Id=postgres;Password=postgres;Include Error Detail=true")
.UseSnakeCaseNamingConvention(); .UseSnakeCaseNamingConvention();
return new PassengerDbContext(builder.Options); return new PassengerDbContext(builder.Options, null);
} }
} }

View File

@ -1,16 +1,14 @@
using System.Reflection; using System.Reflection;
using BuildingBlocks.EFCore; using BuildingBlocks.EFCore;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using BuildingBlocks.Web;
namespace Passenger.Data; namespace Passenger.Data;
using Microsoft.AspNetCore.Http;
public sealed class PassengerDbContext : AppDbContextBase public sealed class PassengerDbContext : AppDbContextBase
{ {
public PassengerDbContext(DbContextOptions<PassengerDbContext> options, public PassengerDbContext(DbContextOptions<PassengerDbContext> options, ICurrentUserProvider currentUserProvider) :
IHttpContextAccessor httpContextAccessor = default) : base(options, currentUserProvider)
base(options, httpContextAccessor)
{ {
} }
@ -20,6 +18,7 @@ public sealed class PassengerDbContext : AppDbContextBase
{ {
builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly()); builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly());
base.OnModelCreating(builder); base.OnModelCreating(builder);
builder.FilterSoftDeletedProperties();
builder.ToSnakeCaseTables(); builder.ToSnakeCaseTables();
} }
} }