Merge pull request #137 from meysamhadeli/develop

Develop
This commit is contained in:
Meysam Hadeli 2023-01-29 23:20:02 +03:30 committed by GitHub
commit 5cb5dad0ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 8 deletions

View File

@ -58,16 +58,16 @@ public class PersistMessageDbContext : DbContext, IPersistMessageDbContext
{
foreach (var entry in ex.Entries)
{
var currentValues = entry.CurrentValues;
var databaseValues = await entry.GetDatabaseValuesAsync(cancellationToken);
_logger.LogInformation(
"Entry to entity with database-value: {@databaseValues} and current-value: {@currentValues}",
databaseValues,
currentValues);
if (databaseValues == null)
{
_logger.LogError("The record no longer exists in the database, The record has been deleted by another user.");
throw;
}
// Refresh the original values with current values
entry.OriginalValues.SetValues(currentValues);
// Refresh the original values to bypass next concurrency check
entry.OriginalValues.SetValues(databaseValues);
}
return await base.SaveChangesAsync(cancellationToken);

View File

@ -18,6 +18,7 @@ public class PersistMessageProcessor : IPersistMessageProcessor
private readonly IMediator _mediator;
private readonly IPersistMessageDbContext _persistMessageDbContext;
private readonly IPublishEndpoint _publishEndpoint;
private SemaphoreSlim Semaphore => new SemaphoreSlim(1);
public PersistMessageProcessor(
ILogger<PersistMessageProcessor> logger,
@ -115,7 +116,19 @@ public class PersistMessageProcessor : IPersistMessageProcessor
.Where(x => x.MessageStatus != MessageStatus.Processed)
.ToListAsync(cancellationToken);
foreach (var message in messages) await ProcessAsync(message.Id, message.DeliveryType, cancellationToken);
foreach (var message in messages)
{
await Semaphore.WaitAsync(cancellationToken);
try
{
await ProcessAsync(message.Id, message.DeliveryType, cancellationToken);
}
finally
{
Semaphore.Release();
}
}
}
public async Task ProcessInboxAsync(long messageId, CancellationToken cancellationToken = default)