fix: Fix DbUpdateConcurrencyException in PersistMessageDbContext

This commit is contained in:
meysamhadeli 2023-01-22 19:11:34 +03:30
parent e57ec7315c
commit 7a8cc281af
11 changed files with 99 additions and 67 deletions

View File

@ -1,18 +1,13 @@
namespace BuildingBlocks.EFCore;
using System.Collections.Immutable;
using BuildingBlocks.Core.Event;
using BuildingBlocks.Core.Model;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
namespace BuildingBlocks.EFCore;
using System.Data;
using System.Net;
using System.Security.Claims;
using global::Polly;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Exception = System.Exception;
public abstract class AppDbContextBase : DbContext, IDbContext
@ -76,51 +71,7 @@ public abstract class AppDbContextBase : DbContext, IDbContext
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
{
OnBeforeSaving();
try
{
return await base.SaveChangesAsync(cancellationToken);
}
//ref: https://learn.microsoft.com/en-us/ef/core/saving/concurrency?tabs=fluent-api#resolving-concurrency-conflicts
catch (DbUpdateConcurrencyException ex)
{
throw new DbUpdateConcurrencyException("try for get unhandled exception with DbUpdateConcurrencyException", ex);
var logger = _httpContextAccessor?.HttpContext?.RequestServices
.GetRequiredService<ILogger<AppDbContextBase>>();
var entry = ex.Entries.SingleOrDefault();
if (entry == null)
{
return 0;
}
var currentValue = entry.CurrentValues;
var databaseValue = await entry.GetDatabaseValuesAsync(cancellationToken);
logger?.LogInformation("The entity being updated is already use by another Thread!" +
" database value is: {DatabaseValue} and current value is: {CurrentValue}",
databaseValue, currentValue);
var policy = Policy.Handle<DbUpdateConcurrencyException>()
.WaitAndRetryAsync(retryCount: 3,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(1),
onRetry: (exception, timeSpan, retryCount, context) =>
{
if (exception != null)
{
logger?.LogError(exception,
"Request failed with {StatusCode}. Waiting {TimeSpan} before next retry. Retry attempt {RetryCount}.",
HttpStatusCode.Conflict,
timeSpan,
retryCount);
}
});
return await policy.ExecuteAsync(async () => await base.SaveChangesAsync(cancellationToken));
}
catch (Exception ex)
{
throw new Exception("try for get unhandled exception bt default", ex);
}
return await base.SaveChangesAsync(cancellationToken);
}
public IReadOnlyList<IDomainEvent> GetDomainEvents()

View File

@ -11,6 +11,7 @@ using Microsoft.Extensions.Hosting;
namespace BuildingBlocks.EFCore;
using Ardalis.GuardClauses;
using Humanizer;
using Microsoft.EntityFrameworkCore.Metadata;
@ -28,6 +29,8 @@ public static class Extensions
{
var postgresOptions = sp.GetRequiredService<PostgresOptions>();
Guard.Against.Null(options, nameof(postgresOptions));
options.UseNpgsql(postgresOptions?.ConnectionString,
dbOptions =>
{

View File

@ -14,6 +14,9 @@ public class PersistMessageConfiguration : IEntityTypeConfiguration<PersistMessa
builder.Property(r => r.Id)
.IsRequired().ValueGeneratedNever();
// // ref: https://learn.microsoft.com/en-us/ef/core/saving/concurrency?tabs=fluent-api
builder.Property(r => r.Version).IsConcurrencyToken();
builder.Property(x => x.DeliveryType)
.HasDefaultValue(MessageDeliveryType.Outbox)
.HasConversion(

View File

@ -12,7 +12,7 @@ using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
namespace BuildingBlocks.PersistMessageProcessor.Data.Migrations
{
[DbContext(typeof(PersistMessageDbContext))]
[Migration("20230120222214_initial")]
[Migration("20230122153121_initial")]
partial class initial
{
/// <inheritdoc />
@ -61,6 +61,11 @@ namespace BuildingBlocks.PersistMessageProcessor.Data.Migrations
.HasColumnType("integer")
.HasColumnName("retry_count");
b.Property<long>("Version")
.IsConcurrencyToken()
.HasColumnType("bigint")
.HasColumnName("version");
b.HasKey("Id")
.HasName("pk_persist_message");

View File

@ -21,7 +21,8 @@ namespace BuildingBlocks.PersistMessageProcessor.Data.Migrations
created = table.Column<DateTime>(type: "timestamp with time zone", nullable: false),
retrycount = table.Column<int>(name: "retry_count", type: "integer", nullable: false),
messagestatus = table.Column<string>(name: "message_status", type: "text", nullable: false, defaultValue: "InProgress"),
deliverytype = table.Column<string>(name: "delivery_type", type: "text", nullable: false, defaultValue: "Outbox")
deliverytype = table.Column<string>(name: "delivery_type", type: "text", nullable: false, defaultValue: "Outbox"),
version = table.Column<long>(type: "bigint", nullable: false)
},
constraints: table =>
{

View File

@ -58,6 +58,11 @@ namespace BuildingBlocks.PersistMessageProcessor.Data.Migrations
.HasColumnType("integer")
.HasColumnName("retry_count");
b.Property<long>("Version")
.IsConcurrencyToken()
.HasColumnType("bigint")
.HasColumnName("version");
b.HasKey("Id")
.HasName("pk_persist_message");

View File

@ -1,22 +1,85 @@
using BuildingBlocks.EFCore;
using BuildingBlocks.PersistMessageProcessor.Data.Configurations;
using Microsoft.EntityFrameworkCore;
namespace BuildingBlocks.PersistMessageProcessor.Data;
using Microsoft.AspNetCore.Http;
using System.Net;
using Configurations;
using global::Polly;
using Microsoft.Extensions.Logging;
public class PersistMessageDbContext : AppDbContextBase, IPersistMessageDbContext
public class PersistMessageDbContext : DbContext, IPersistMessageDbContext
{
public PersistMessageDbContext(DbContextOptions<PersistMessageDbContext> options, IHttpContextAccessor httpContextAccessor = default)
: base(options, httpContextAccessor)
public PersistMessageDbContext(DbContextOptions<PersistMessageDbContext> options)
: base(options)
{
}
public DbSet<PersistMessage> PersistMessages { get; set; }
protected override void OnModelCreating(ModelBuilder builder)
{
builder.ApplyConfiguration(new PersistMessageConfiguration());
base.OnModelCreating(builder);
builder.ToSnakeCaseTables();
}
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
{
OnBeforeSaving();
var policy = Policy.Handle<DbUpdateConcurrencyException>()
.WaitAndRetryAsync(retryCount: 3,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(1),
onRetry: (exception, timeSpan, retryCount, context) =>
{
if (exception != null)
{
var factory = LoggerFactory.Create(b => b.AddConsole());
var logger = factory.CreateLogger<PersistMessageDbContext>();
logger.LogError(exception,
"Request failed with {StatusCode}. Waiting {TimeSpan} before next retry. Retry attempt {RetryCount}.",
HttpStatusCode.Conflict,
timeSpan,
retryCount);
}
});
try
{
await policy.ExecuteAsync(async () => await base.SaveChangesAsync(cancellationToken));
}
catch (DbUpdateConcurrencyException ex)
{
foreach (var entry in ex.Entries)
{
var currentEntity = entry.Entity;
var databaseValues = await entry.GetDatabaseValuesAsync(cancellationToken);
if (databaseValues != null)
entry.OriginalValues.SetValues(databaseValues);
}
return await base.SaveChangesAsync(cancellationToken);
}
return 0;
}
private void OnBeforeSaving()
{
foreach (var entry in ChangeTracker.Entries<PersistMessage>())
{
switch (entry.State)
{
case EntityState.Modified:
entry.Entity.Version++;
break;
case EntityState.Deleted:
entry.Entity.Version++;
break;
}
}
}
}

View File

@ -1,9 +1,11 @@
using BuildingBlocks.EFCore;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
namespace BuildingBlocks.PersistMessageProcessor;
public interface IPersistMessageDbContext : IDbContext
using EFCore;
public interface IPersistMessageDbContext
{
DbSet<PersistMessage> PersistMessages => Set<PersistMessage>();
DbSet<PersistMessage> PersistMessages { get; set; }
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
}

View File

@ -1,6 +1,4 @@
using System.Reflection;
namespace BuildingBlocks.PersistMessageProcessor;
namespace BuildingBlocks.PersistMessageProcessor;
public class PersistMessage
{
@ -22,6 +20,7 @@ public class PersistMessage
public int RetryCount { get; private set; }
public MessageStatus MessageStatus { get; private set; }
public MessageDeliveryType DeliveryType { get; private set; }
public long Version { get; set; }
public void ChangeState(MessageStatus messageStatus)
{

View File

@ -100,7 +100,7 @@ public static class InfrastructureExtensions
});
app.UseCorrelationId();
app.UseHttpMetrics();
app.UseMigration<PersistMessageDbContext>(env);
// app.UseMigration<PersistMessageDbContext>(env);
app.UseCustomHealthCheck();
app.MapMetrics();
app.MapGet("/", x => x.Response.WriteAsync(appOptions.Name));