diff --git a/src/BuildingBlocks/PersistMessageProcessor/Data/PersistMessageDbContext.cs b/src/BuildingBlocks/PersistMessageProcessor/Data/PersistMessageDbContext.cs index 9972b7d..fe57458 100644 --- a/src/BuildingBlocks/PersistMessageProcessor/Data/PersistMessageDbContext.cs +++ b/src/BuildingBlocks/PersistMessageProcessor/Data/PersistMessageDbContext.cs @@ -58,16 +58,16 @@ public class PersistMessageDbContext : DbContext, IPersistMessageDbContext { foreach (var entry in ex.Entries) { - var currentValues = entry.CurrentValues; var databaseValues = await entry.GetDatabaseValuesAsync(cancellationToken); - _logger.LogInformation( - "Entry to entity with database-value: {@databaseValues} and current-value: {@currentValues}", - databaseValues, - currentValues); + if (databaseValues == null) + { + _logger.LogError("The record no longer exists in the database, The record has been deleted by another user."); + throw; + } - // Refresh the original values with current values - entry.OriginalValues.SetValues(currentValues); + // Refresh the original values to bypass next concurrency check + entry.OriginalValues.SetValues(databaseValues); } return await base.SaveChangesAsync(cancellationToken); diff --git a/src/BuildingBlocks/PersistMessageProcessor/PersistMessageProcessor.cs b/src/BuildingBlocks/PersistMessageProcessor/PersistMessageProcessor.cs index 8dc5c6e..674a438 100644 --- a/src/BuildingBlocks/PersistMessageProcessor/PersistMessageProcessor.cs +++ b/src/BuildingBlocks/PersistMessageProcessor/PersistMessageProcessor.cs @@ -18,6 +18,7 @@ public class PersistMessageProcessor : IPersistMessageProcessor private readonly IMediator _mediator; private readonly IPersistMessageDbContext _persistMessageDbContext; private readonly IPublishEndpoint _publishEndpoint; + private SemaphoreSlim Semaphore => new SemaphoreSlim(1); public PersistMessageProcessor( ILogger logger, @@ -115,7 +116,19 @@ public class PersistMessageProcessor : IPersistMessageProcessor .Where(x => x.MessageStatus != MessageStatus.Processed) .ToListAsync(cancellationToken); - foreach (var message in messages) await ProcessAsync(message.Id, message.DeliveryType, cancellationToken); + foreach (var message in messages) + { + await Semaphore.WaitAsync(cancellationToken); + + try + { + await ProcessAsync(message.Id, message.DeliveryType, cancellationToken); + } + finally + { + Semaphore.Release(); + } + } } public async Task ProcessInboxAsync(long messageId, CancellationToken cancellationToken = default)