Refactor StatusWorker to prevent race conditions and improve stability (#512)

This commit is contained in:
Leendert de Borst
2025-01-03 20:38:13 +01:00
parent f0d397c8af
commit f19db2c010
7 changed files with 150 additions and 96 deletions

View File

@@ -5,10 +5,10 @@
{
<button @onclick="() => ServiceClick(service.Name)"
class="@GetServiceButtonClasses(service) mx-3 inline-flex items-center justify-center rounded-xl px-8 py-2 text-white"
disabled="@(!IsHeartbeatValid(service.LastHeartbeat))"
title="@GetButtonTooltip(service.LastHeartbeat)">
disabled="@(!service.IsHeartBeatValid)"
title="@GetButtonTooltip(service)">
<span>@service.DisplayName</span>
@if (service.IsPending)
@if (service.IsHeartBeatValid && service.CurrentStatus != service.DesiredStatus && !string.IsNullOrEmpty(service.DesiredStatus))
{
<svg class="animate-spin ml-2 h-5 w-5 text-white" xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24">
<circle class="opacity-25" cx="12" cy="12" r="10" stroke="currentColor" stroke-width="4"></circle>
@@ -54,9 +54,9 @@
{
public string Name { get; set; } = "";
public string DisplayName { get; set; } = "";
public bool Status { get; set; }
public bool IsPending { get; set; }
public DateTime LastHeartbeat { get; set; }
public string CurrentStatus { get; set; } = "";
public string DesiredStatus { get; set; } = "";
public bool IsHeartBeatValid { get; set; }
}
private List<ServiceState> Services { get; set; } = [];
@@ -112,15 +112,23 @@
{
string buttonClass = "cursor-pointer ";
if (!IsHeartbeatValid(service.LastHeartbeat))
if (!service.IsHeartBeatValid)
{
buttonClass += "bg-gray-600";
}
else if (service.Status)
else if (service.CurrentStatus == "Started" && (service.DesiredStatus == string.Empty || service.DesiredStatus == "Started"))
{
buttonClass += "bg-green-600";
}
else
else if (service.CurrentStatus == "Stopping" || (service.DesiredStatus == "Stopped" && service.CurrentStatus != service.DesiredStatus))
{
buttonClass += "bg-red-500";
}
else if (service.CurrentStatus == "Starting" || (service.DesiredStatus == "Started" && service.CurrentStatus != service.DesiredStatus))
{
buttonClass += "bg-emerald-500";
}
else if (service.DesiredStatus == "Stopped" && (service.DesiredStatus == string.Empty || service.DesiredStatus == "Stopped"))
{
buttonClass += "bg-red-600";
}
@@ -131,9 +139,22 @@
/// <summary>
/// Gets the tooltip text for a service button based on its last heartbeat.
/// </summary>
private static string GetButtonTooltip(DateTime lastHeartbeat)
private static string GetButtonTooltip(ServiceState service)
{
return IsHeartbeatValid(lastHeartbeat) ? "" : "Heartbeat offline";
if (!service.IsHeartBeatValid)
{
return "Heartbeat offline";
}
var statusMessages = new Dictionary<string, string>
{
{ "Started", "Service is running" },
{ "Starting", "Service is starting..." },
{ "Stopped", "Service is stopped" },
{ "Stopping", "Service is stopping..." }
};
return statusMessages.GetValueOrDefault(service.CurrentStatus, string.Empty);
}
/// <summary>
@@ -143,18 +164,25 @@
{
var service = Services.First(s => s.Name == serviceName);
if (!IsHeartbeatValid(service.LastHeartbeat))
if (!service.IsHeartBeatValid)
{
return;
}
service.IsPending = true;
// If service not started and not starting, clicking should start it. Otherwise, stop it.
if (service.CurrentStatus == "Started" || service.DesiredStatus == "Started")
{
service.DesiredStatus = "Stopped";
}
else
{
service.DesiredStatus = "Started";
}
StateHasChanged();
service.Status = !service.Status;
await UpdateServiceStatus(serviceName, service.Status);
await UpdateServiceStatus(serviceName, service.DesiredStatus);
service.CurrentStatus = service.DesiredStatus;
service.IsPending = false;
StateHasChanged();
}
@@ -163,7 +191,7 @@
/// </summary>
private async Task InitPage()
{
if (InitInProgress || Services.Any(s => s.IsPending))
if (InitInProgress)
{
return;
}
@@ -179,8 +207,9 @@
var entry = ServiceStatus.Find(x => x.ServiceName == service.Name);
if (entry != null)
{
service.LastHeartbeat = entry.Heartbeat;
service.Status = IsHeartbeatValid(service.LastHeartbeat) && entry.CurrentStatus == "Started";
service.IsHeartBeatValid = IsHeartbeatValid(entry.Heartbeat);
service.CurrentStatus = entry.CurrentStatus;
service.DesiredStatus = entry.DesiredStatus;
}
}
@@ -195,14 +224,13 @@
/// <summary>
/// Updates the status of a service.
/// </summary>
private async Task<bool> UpdateServiceStatus(string serviceName, bool newStatus)
private async Task<bool> UpdateServiceStatus(string serviceName, string desiredStatus)
{
await using var dbContext = await DbContextFactory.CreateDbContextAsync();
var entry = await dbContext.WorkerServiceStatuses.Where(x => x.ServiceName == serviceName).FirstOrDefaultAsync();
if (entry != null)
{
string newDesiredStatus = newStatus ? "Started" : "Stopped";
entry.DesiredStatus = newDesiredStatus;
entry.DesiredStatus = desiredStatus;
await dbContext.SaveChangesAsync();
var timeout = DateTime.UtcNow.AddSeconds(30);
@@ -215,7 +243,7 @@
await using var dbContextInner = await DbContextFactory.CreateDbContextAsync();
var check = await dbContextInner.WorkerServiceStatuses.Where(x => x.ServiceName == serviceName).FirstAsync();
if (check.CurrentStatus == newDesiredStatus)
if (check.CurrentStatus == entry.DesiredStatus)
{
return true;
}

View File

@@ -26,17 +26,23 @@
private RegistrationStatisticsCard? _registrationStatisticsCard;
private EmailStatisticsCard? _emailStatisticsCard;
/// <inheritdoc />
protected override async Task OnInitializedAsync()
{
await base.OnInitializedAsync();
// Check if 2FA is enabled. If not, show a one-time warning on the dashboard.
if (!UserService.User().TwoFactorEnabled)
{
GlobalNotificationService.AddWarningMessage("Two-factor authentication is not enabled. It is recommended to enable it in Account Settings for better security.", true);
}
}
/// <inheritdoc />
protected override async Task OnAfterRenderAsync(bool firstRender)
{
if (firstRender)
{
// Check if 2FA is enabled. If not, show a one-time warning on the dashboard.
if (!UserService.User().TwoFactorEnabled)
{
GlobalNotificationService.AddWarningMessage("Two-factor authentication is not enabled. It is recommended to enable it in Account Settings for better security.", true);
}
await RefreshData();
}
}

View File

@@ -670,6 +670,10 @@ video {
margin-left: auto;
}
.mr-1 {
margin-right: 0.25rem;
}
.mr-14 {
margin-right: 3.5rem;
}
@@ -1148,6 +1152,11 @@ video {
background-color: rgb(59 130 246 / var(--tw-bg-opacity));
}
.bg-emerald-500 {
--tw-bg-opacity: 1;
background-color: rgb(16 185 129 / var(--tw-bg-opacity));
}
.bg-gray-100 {
--tw-bg-opacity: 1;
background-color: rgb(243 244 246 / var(--tw-bg-opacity));
@@ -1498,10 +1507,6 @@ video {
font-weight: 600;
}
.uppercase {
text-transform: uppercase;
}
.leading-6 {
line-height: 1.5rem;
}

View File

@@ -101,23 +101,34 @@ public class TaskRunnerWorker(
{
foreach (var task in tasks)
{
// Check cancellation before each task
stoppingToken.ThrowIfCancellationRequested();
try
{
job.Status = TaskRunnerJobStatus.Running;
await dbContext.SaveChangesAsync(stoppingToken);
await task.ExecuteAsync(stoppingToken);
}
catch (OperationCanceledException)
{
// Handle cancellation gracefully
job.Status = TaskRunnerJobStatus.Canceled;
job.ErrorMessage = "Task execution was canceled.";
await dbContext.SaveChangesAsync(stoppingToken);
throw;
}
catch (Exception ex)
{
logger.LogError(ex, "Error executing task {TaskName}", task.Name);
job.ErrorMessage = $"Task {task.Name} failed: {ex.Message}";
job.Status = TaskRunnerJobStatus.Error;
job.ErrorMessage = $"Task {task.Name} failed: {ex.Message}";
await dbContext.SaveChangesAsync(stoppingToken);
break;
}
}
if (job.Status != TaskRunnerJobStatus.Error)
if (job.Status != TaskRunnerJobStatus.Error && job.Status != TaskRunnerJobStatus.Canceled)
{
job.Status = TaskRunnerJobStatus.Finished;
}

View File

@@ -27,6 +27,11 @@ public enum TaskRunnerJobStatus
/// </summary>
Finished = 2,
/// <summary>
/// The job has been canceled because the task runner has been stopped.
/// </summary>
Canceled = 8,
/// <summary>
/// The job has failed.
/// </summary>

View File

@@ -53,14 +53,8 @@ public class StatusHostedService<T>(ILogger<StatusHostedService<T>> logger, Glob
while (!stoppingToken.IsCancellationRequested)
{
// Add a second cancellationToken linked to the parent cancellation token.
// When the parent gets canceled this gets canceled as well. However, this one can also
// be canceled with a signal from the StatusWorker.
var workerCancellationTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
// Start the inner while loop with the second cancellationToken.
await ExecuteInnerAsync(workerCancellationTokenSource);
await ExecuteInnerAsync(stoppingToken);
if (!stoppingToken.IsCancellationRequested)
{
@@ -73,11 +67,16 @@ public class StatusHostedService<T>(ILogger<StatusHostedService<T>> logger, Glob
/// <summary>
/// Start the inner while loop which adds a second cancellationToken that is controlled by the StatusWorker.
/// </summary>
/// <param name="workerCancellationTokenSource">Cancellation token.</param>
private async Task ExecuteInnerAsync(CancellationTokenSource workerCancellationTokenSource)
/// <param name="cancellationToken">Cancellation token.</param>
private async Task ExecuteInnerAsync(CancellationToken cancellationToken)
{
Task? workerTask = null;
// Add a second cancellationToken linked to the parent cancellation token.
// When the parent gets canceled this gets canceled as well. However, this one can also
// be canceled with a signal from the StatusWorker.
using var workerCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
while (!workerCancellationTokenSource.IsCancellationRequested)
{
if (globalServiceStatus.CurrentStatus.ToStatusEnum() == Status.Started || globalServiceStatus.CurrentStatus.ToStatusEnum() == Status.Starting)
@@ -86,7 +85,6 @@ public class StatusHostedService<T>(ILogger<StatusHostedService<T>> logger, Glob
{
if (workerTask == null)
{
globalServiceStatus.SetWorkerStatus(typeof(T).Name, true);
workerTask = Task.Run(() => WorkerLogic(workerCancellationTokenSource.Token), workerCancellationTokenSource.Token);
}
}
@@ -100,11 +98,15 @@ public class StatusHostedService<T>(ILogger<StatusHostedService<T>> logger, Glob
else if (globalServiceStatus.CurrentStatus.ToStatusEnum() == Status.Stopped)
{
// Do nothing, the worker is stopped.
globalServiceStatus.SetWorkerStatus(typeof(T).Name, false);
}
// Wait for a second before checking the status again.
await Task.Delay(1000);
}
// If we get here, cancel the worker task if it is still running.
await workerCancellationTokenSource.CancelAsync();
}
/// <summary>
@@ -142,36 +144,46 @@ public class StatusHostedService<T>(ILogger<StatusHostedService<T>> logger, Glob
{
// Expected so we only log information.
logger.LogInformation(ex, "StatusHostedService<{ServiceType}> is stopping due to a cancellation request.", typeof(T).Name);
break;
}
catch (Exception ex)
{
logger.LogError(ex, "An error occurred in StatusHostedService<{ServiceType}>", typeof(T).Name);
globalServiceStatus.SetWorkerStatus(typeof(T).Name, false);
// If service is explicitly stopped, break out of the loop immediately.
if (cancellationToken.IsCancellationRequested)
{
break;
}
}
finally
{
logger.LogWarning("StatusHostedService<{ServiceType}> stopped at: {Time}", typeof(T).Name, DateTimeOffset.Now);
globalServiceStatus.SetWorkerStatus(typeof(T).Name, false);
// Reset the delay when the service is explicitly stopped
if (cancellationToken.IsCancellationRequested)
{
_restartDelayInMs = _restartMinDelayInMs;
}
}
// If a fault occurred in the innerService, but it was not explicitly canceled,
// wait for a second before attempting to auto-restart the worker.
while (!cancellationToken.IsCancellationRequested)
if (cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(_restartDelayInMs, cancellationToken);
break; // Exit the loop if delay is successful
}
catch (TaskCanceledException)
{
// If the delay is canceled, exit the loop
break;
}
return;
}
// Exponential backoff with a maximum delay
_restartDelayInMs = Math.Min(_restartDelayInMs * 2, _restartMaxDelayInMs);
try
{
// If an exception occurred, delay with exponential backoff with a maximum before retrying.
await Task.Delay(_restartDelayInMs, cancellationToken);
_restartDelayInMs = Math.Min(_restartDelayInMs * 2, _restartMaxDelayInMs);
}
catch (OperationCanceledException)
{
// Reset delay on cancellation
_restartDelayInMs = _restartMinDelayInMs;
return;
}
}
}
}

View File

@@ -40,20 +40,33 @@ public class StatusWorker(ILogger<StatusWorker> logger, Func<IWorkerStatusDbCont
if (!globalServiceStatus.AreAllWorkersRunning())
{
await SetServiceStatus(statusEntry, Status.Starting.ToString());
logger.LogInformation(
"Status was set to Started but not all workers are running (yet). Reverting to Starting.");
logger.LogInformation("Status was set to Started but not all workers are running (yet). Reverting to Starting.");
}
break;
case Status.Starting:
await WaitForAllWorkersToStart(stoppingToken);
await SetServiceStatus(statusEntry, Status.Started.ToString());
logger.LogInformation("All workers started.");
if (globalServiceStatus.AreAllWorkersRunning())
{
await SetServiceStatus(statusEntry, Status.Started.ToString());
logger.LogInformation("All workers started.");
}
else
{
logger.LogInformation("Waiting for all workers to start.");
}
break;
case Status.Stopping:
await WaitForAllWorkersToStop(stoppingToken);
await SetServiceStatus(statusEntry, Status.Stopped.ToString());
logger.LogInformation("All workers stopped.");
if (globalServiceStatus.AreAllWorkersStopped())
{
await SetServiceStatus(statusEntry, Status.Stopped.ToString());
logger.LogInformation("All workers stopped.");
}
else
{
logger.LogInformation("Waiting for all workers to stop.");
}
break;
case Status.Stopped:
logger.LogInformation("Service is (soft) stopped.");
@@ -126,32 +139,6 @@ public class StatusWorker(ILogger<StatusWorker> logger, Func<IWorkerStatusDbCont
await _dbContext.SaveChangesAsync();
}
/// <summary>
/// Waits for all workers to start.
/// </summary>
/// <param name="stoppingToken">CancellationToken.</param>
private async Task WaitForAllWorkersToStart(CancellationToken stoppingToken)
{
while (!globalServiceStatus.AreAllWorkersRunning() && !stoppingToken.IsCancellationRequested)
{
logger.LogInformation("Waiting for all workers to start...");
await Task.Delay(1000, stoppingToken);
}
}
/// <summary>
/// Waits for all workers to stop.
/// </summary>
/// <param name="stoppingToken">CancellationToken.</param>
private async Task WaitForAllWorkersToStop(CancellationToken stoppingToken)
{
while (!globalServiceStatus.AreAllWorkersStopped() && !stoppingToken.IsCancellationRequested)
{
logger.LogInformation("Waiting for all workers to stop...");
await Task.Delay(1000, stoppingToken);
}
}
/// <summary>
/// Retrieves status record or creates an initial status record if it does not exist.
/// </summary>