Merge pull request #263 from meysamhadeli/refactor/refactor-message-processor

refactor: Refactor persist message processor
This commit is contained in:
Meysam Hadeli 2023-06-01 17:26:11 +03:30 committed by GitHub
commit 8a02c0a234
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 88 additions and 53 deletions

View File

@ -3,9 +3,9 @@
<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Ardalis.GuardClauses" Version="4.0.1" />
<PackageReference Include="Asp.Versioning.Abstractions" Version="7.0.0" />
@ -144,7 +144,7 @@
<PackageReference Include="Google.Protobuf" Version="3.21.12" />
<PackageReference Include="Grpc.Net.Client" Version="2.51.0" />
<PackageReference Include="Grpc.Net.ClientFactory" Version="2.51.0" />
<PackageReference Update="Meziantou.Analyzer" Version="1.0.758" />
<PackageReference Update="AsyncFixer" Version="1.6.0" />
<PackageReference Update="Roslynator.Analyzers" Version="4.2.0" />

View File

@ -1,5 +1,6 @@
namespace BuildingBlocks.Core.Model;
// For handling optimistic concurrency
public interface IVersion
{
long Version { get; set; }

View File

@ -4,22 +4,24 @@ using System.Collections.Immutable;
using Core.Event;
using Core.Model;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
using Web;
using Exception = System.Exception;
using IsolationLevel = System.Data.IsolationLevel;
public abstract class AppDbContextBase : DbContext, IDbContext
{
private readonly ICurrentUserProvider _currentUserProvider;
private IDbContextTransaction? _currentTransaction;
private readonly ICurrentUserProvider? _currentUserProvider;
private readonly ILogger<AppDbContextBase>? _logger;
protected AppDbContextBase(DbContextOptions options, ICurrentUserProvider currentUserProvider) :
protected AppDbContextBase(DbContextOptions options, ICurrentUserProvider? currentUserProvider = null, ILogger<AppDbContextBase>? logger = null) :
base(options)
{
_currentUserProvider = currentUserProvider;
_logger = logger;
}
protected override void OnModelCreating(ModelBuilder builder)
{
}
@ -48,7 +50,29 @@ public abstract class AppDbContextBase : DbContext, IDbContext
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
{
OnBeforeSaving();
return await base.SaveChangesAsync(cancellationToken);
try
{
return await base.SaveChangesAsync(cancellationToken);
}
//ref: https://learn.microsoft.com/en-us/ef/core/saving/concurrency?tabs=data-annotations#resolving-concurrency-conflicts
catch (DbUpdateConcurrencyException ex)
{
foreach (var entry in ex.Entries)
{
var databaseValues = await entry.GetDatabaseValuesAsync(cancellationToken);
if (databaseValues == null)
{
_logger.LogError("The record no longer exists in the database, The record has been deleted by another user.");
throw;
}
// Refresh the original values to bypass next concurrency check
entry.OriginalValues.SetValues(databaseValues);
}
return await base.SaveChangesAsync(cancellationToken);
}
}
public IReadOnlyList<IDomainEvent> GetDomainEvents()

View File

@ -11,6 +11,6 @@ public class DesignTimeDbContextFactory : IDesignTimeDbContextFactory<PersistMes
builder.UseNpgsql("Server=localhost;Port=5432;Database=persist_message;User Id=postgres;Password=postgres;Include Error Detail=true")
.UseSnakeCaseNamingConvention();
return new PersistMessageDbContext(builder.Options, null);
return new PersistMessageDbContext(builder.Options);
}
}

View File

@ -3,19 +3,17 @@ using Microsoft.EntityFrameworkCore;
namespace BuildingBlocks.PersistMessageProcessor.Data;
using System.Net;
using Configurations;
using Core.Model;
using global::Polly;
using Microsoft.Extensions.Logging;
using Exception = System.Exception;
public class PersistMessageDbContext : DbContext, IPersistMessageDbContext
{
private readonly ILogger<PersistMessageDbContext> _logger;
private readonly ILogger<PersistMessageDbContext>? _logger;
public PersistMessageDbContext(DbContextOptions<PersistMessageDbContext> options,
ILogger<PersistMessageDbContext> logger)
ILogger<PersistMessageDbContext>? logger = null)
: base(options)
{
_logger = logger;
@ -34,23 +32,9 @@ public class PersistMessageDbContext : DbContext, IPersistMessageDbContext
{
OnBeforeSaving();
var policy = Policy.Handle<DbUpdateConcurrencyException>()
.WaitAndRetryAsync(retryCount: 3,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(1),
onRetry: (exception, timeSpan, retryCount, context) =>
{
if (exception != null)
{
_logger.LogError(exception,
"Request failed with {StatusCode}. Waiting {TimeSpan} before next retry. Retry attempt {RetryCount}.",
HttpStatusCode.Conflict,
timeSpan,
retryCount);
}
});
try
{
return await policy.ExecuteAsync(async () => await base.SaveChangesAsync(cancellationToken));
return await base.SaveChangesAsync(cancellationToken);
}
//ref: https://learn.microsoft.com/en-us/ef/core/saving/concurrency?tabs=data-annotations#resolving-concurrency-conflicts
catch (DbUpdateConcurrencyException ex)

View File

@ -17,8 +17,6 @@ public class PersistMessageProcessor : IPersistMessageProcessor
private readonly IMediator _mediator;
private readonly IPersistMessageDbContext _persistMessageDbContext;
private readonly IPublishEndpoint _publishEndpoint;
private SemaphoreSlim Semaphore => new SemaphoreSlim(1);
public PersistMessageProcessor(
ILogger<PersistMessageProcessor> logger,
IMediator mediator,
@ -108,7 +106,6 @@ public class PersistMessageProcessor : IPersistMessageProcessor
}
}
public async Task ProcessAllAsync(CancellationToken cancellationToken = default)
{
var messages = await _persistMessageDbContext.PersistMessages
@ -117,16 +114,7 @@ public class PersistMessageProcessor : IPersistMessageProcessor
foreach (var message in messages)
{
await Semaphore.WaitAsync(cancellationToken);
try
{
await ProcessAsync(message.Id, message.DeliveryType, cancellationToken);
}
finally
{
Semaphore.Release();
}
await ProcessAsync(message.Id, message.DeliveryType, cancellationToken);
}
}

View File

@ -3,6 +3,7 @@
<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
@ -29,11 +30,11 @@
<ItemGroup>
<Folder Include="GrpcClient\Protos" />
</ItemGroup>
<ItemGroup>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleTo">
<_Parameter1>Integration.Test</_Parameter1>
</AssemblyAttribute>
</ItemGroup>
</Project>

View File

@ -3,6 +3,7 @@
<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<IsPackable>false</IsPackable>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -11,7 +11,7 @@ namespace Flight.Data
builder.UseNpgsql("Server=localhost;Port=5432;Database=flight;User Id=postgres;Password=postgres;Include Error Detail=true")
.UseSnakeCaseNamingConvention();
return new FlightDbContext(builder.Options, null);
return new FlightDbContext(builder.Options);
}
}
}

View File

@ -7,11 +7,13 @@ using Microsoft.EntityFrameworkCore;
namespace Flight.Data;
using BuildingBlocks.Web;
using Microsoft.Extensions.Logging;
public sealed class FlightDbContext : AppDbContextBase
{
public FlightDbContext(DbContextOptions<FlightDbContext> options, ICurrentUserProvider currentUserProvider) : base(
options, currentUserProvider)
public FlightDbContext(DbContextOptions<FlightDbContext> options, ICurrentUserProvider? currentUserProvider = null,
ILogger<FlightDbContext>? logger = null) : base(
options, currentUserProvider, logger)
{
}

View File

@ -2,6 +2,7 @@
<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -4,7 +4,6 @@
<TargetFramework>net7.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
</PropertyGroup>

View File

@ -3,6 +3,7 @@
<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<IsPackable>false</IsPackable>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -23,7 +23,7 @@ public static class DbContextFactory
var options = new DbContextOptionsBuilder<FlightDbContext>()
.UseInMemoryDatabase(databaseName: Guid.NewGuid().ToString()).Options;
var context = new FlightDbContext(options, currentUserProvider: null);
var context = new FlightDbContext(options, currentUserProvider: null, null);
// Seed our data
FlightDataSeeder(context);

View File

@ -3,6 +3,7 @@
<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<IsPackable>false</IsPackable>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -16,14 +16,16 @@ namespace Identity.Data;
using System;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
public sealed class IdentityContext : IdentityDbContext<User, Role, Guid,
UserClaim, UserRole, UserLogin, RoleClaim, UserToken>, IDbContext
{
private IDbContextTransaction? _currentTransaction;
private readonly ILogger<IdentityContext>? _logger;
public IdentityContext(DbContextOptions<IdentityContext> options) : base(options)
public IdentityContext(DbContextOptions<IdentityContext> options, ILogger<IdentityContext>? logger = null) : base(options)
{
_logger = logger;
}
protected override void OnModelCreating(ModelBuilder builder)
@ -58,7 +60,29 @@ public sealed class IdentityContext : IdentityDbContext<User, Role, Guid,
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
{
OnBeforeSaving();
return await base.SaveChangesAsync(cancellationToken);
try
{
return await base.SaveChangesAsync(cancellationToken);
}
//ref: https://learn.microsoft.com/en-us/ef/core/saving/concurrency?tabs=data-annotations#resolving-concurrency-conflicts
catch (DbUpdateConcurrencyException ex)
{
foreach (var entry in ex.Entries)
{
var databaseValues = await entry.GetDatabaseValuesAsync(cancellationToken);
if (databaseValues == null)
{
_logger.LogError("The record no longer exists in the database, The record has been deleted by another user.");
throw;
}
// Refresh the original values to bypass next concurrency check
entry.OriginalValues.SetValues(databaseValues);
}
return await base.SaveChangesAsync(cancellationToken);
}
}
public IReadOnlyList<IDomainEvent> GetDomainEvents()

View File

@ -2,6 +2,7 @@
<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -3,6 +3,7 @@
<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<IsPackable>false</IsPackable>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -3,6 +3,7 @@
<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -11,6 +11,6 @@ public class DesignTimeDbContextFactory: IDesignTimeDbContextFactory<PassengerDb
builder.UseNpgsql("Server=localhost;Port=5432;Database=passenger;User Id=postgres;Password=postgres;Include Error Detail=true")
.UseSnakeCaseNamingConvention();
return new PassengerDbContext(builder.Options, null);
return new PassengerDbContext(builder.Options);
}
}

View File

@ -5,10 +5,13 @@ using BuildingBlocks.Web;
namespace Passenger.Data;
using Microsoft.Extensions.Logging;
public sealed class PassengerDbContext : AppDbContextBase
{
public PassengerDbContext(DbContextOptions<PassengerDbContext> options, ICurrentUserProvider currentUserProvider) :
base(options, currentUserProvider)
public PassengerDbContext(DbContextOptions<PassengerDbContext> options,
ICurrentUserProvider? currentUserProvider = null, ILogger<PassengerDbContext>? logger = null) :
base(options, currentUserProvider, logger)
{
}

View File

@ -3,6 +3,7 @@
<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -3,6 +3,7 @@
<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<IsPackable>false</IsPackable>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>