From 196f0a0a178eae9d53bdeb2f08fdbe91c9350792 Mon Sep 17 00:00:00 2001 From: meysamhadeli Date: Sun, 29 Jan 2023 22:28:29 +0330 Subject: [PATCH 1/2] refactor: Refactor PersistMessageProcessor with locking --- .../Data/PersistMessageDbContext.cs | 14 +++++++------- .../PersistMessageProcessor.cs | 15 ++++++++++++++- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/BuildingBlocks/PersistMessageProcessor/Data/PersistMessageDbContext.cs b/src/BuildingBlocks/PersistMessageProcessor/Data/PersistMessageDbContext.cs index 9972b7d..fe57458 100644 --- a/src/BuildingBlocks/PersistMessageProcessor/Data/PersistMessageDbContext.cs +++ b/src/BuildingBlocks/PersistMessageProcessor/Data/PersistMessageDbContext.cs @@ -58,16 +58,16 @@ public class PersistMessageDbContext : DbContext, IPersistMessageDbContext { foreach (var entry in ex.Entries) { - var currentValues = entry.CurrentValues; var databaseValues = await entry.GetDatabaseValuesAsync(cancellationToken); - _logger.LogInformation( - "Entry to entity with database-value: {@databaseValues} and current-value: {@currentValues}", - databaseValues, - currentValues); + if (databaseValues == null) + { + _logger.LogError("The record no longer exists in the database, The record has been deleted by another user."); + throw; + } - // Refresh the original values with current values - entry.OriginalValues.SetValues(currentValues); + // Refresh the original values to bypass next concurrency check + entry.OriginalValues.SetValues(databaseValues); } return await base.SaveChangesAsync(cancellationToken); diff --git a/src/BuildingBlocks/PersistMessageProcessor/PersistMessageProcessor.cs b/src/BuildingBlocks/PersistMessageProcessor/PersistMessageProcessor.cs index 8dc5c6e..ce2c8eb 100644 --- a/src/BuildingBlocks/PersistMessageProcessor/PersistMessageProcessor.cs +++ b/src/BuildingBlocks/PersistMessageProcessor/PersistMessageProcessor.cs @@ -18,6 +18,7 @@ public class PersistMessageProcessor : IPersistMessageProcessor private readonly IMediator _mediator; private readonly IPersistMessageDbContext _persistMessageDbContext; private readonly IPublishEndpoint _publishEndpoint; + private static SemaphoreSlim Semaphore => new SemaphoreSlim(initialCount: 0, maxCount: 1); public PersistMessageProcessor( ILogger logger, @@ -115,7 +116,19 @@ public class PersistMessageProcessor : IPersistMessageProcessor .Where(x => x.MessageStatus != MessageStatus.Processed) .ToListAsync(cancellationToken); - foreach (var message in messages) await ProcessAsync(message.Id, message.DeliveryType, cancellationToken); + foreach (var message in messages) + { + await Semaphore.WaitAsync(cancellationToken); + + try + { + await ProcessAsync(message.Id, message.DeliveryType, cancellationToken); + } + finally + { + Semaphore.Release(); + } + } } public async Task ProcessInboxAsync(long messageId, CancellationToken cancellationToken = default) From a6957c272ccdfb60bdbbc3366ae892876cdb733d Mon Sep 17 00:00:00 2001 From: meysamhadeli Date: Sun, 29 Jan 2023 22:47:15 +0330 Subject: [PATCH 2/2] Update PersistMessageProcessor --- .../PersistMessageProcessor/PersistMessageProcessor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/BuildingBlocks/PersistMessageProcessor/PersistMessageProcessor.cs b/src/BuildingBlocks/PersistMessageProcessor/PersistMessageProcessor.cs index ce2c8eb..674a438 100644 --- a/src/BuildingBlocks/PersistMessageProcessor/PersistMessageProcessor.cs +++ b/src/BuildingBlocks/PersistMessageProcessor/PersistMessageProcessor.cs @@ -18,7 +18,7 @@ public class PersistMessageProcessor : IPersistMessageProcessor private readonly IMediator _mediator; private readonly IPersistMessageDbContext _persistMessageDbContext; private readonly IPublishEndpoint _publishEndpoint; - private static SemaphoreSlim Semaphore => new SemaphoreSlim(initialCount: 0, maxCount: 1); + private SemaphoreSlim Semaphore => new SemaphoreSlim(1); public PersistMessageProcessor( ILogger logger,