Files
Cleanuparr/code/backend/Cleanuparr.Infrastructure/Events/EventPublisher.cs
2026-03-20 01:51:03 +02:00

380 lines
15 KiB
C#

using System.Text.Json;
using System.Text.Json.Serialization;
using Cleanuparr.Domain.Entities.Arr.Queue;
using Cleanuparr.Domain.Enums;
using Cleanuparr.Infrastructure.Events.Interfaces;
using Cleanuparr.Infrastructure.Features.Context;
using Cleanuparr.Infrastructure.Features.Notifications;
using Cleanuparr.Infrastructure.Hubs;
using Cleanuparr.Infrastructure.Interceptors;
using Cleanuparr.Persistence;
using Cleanuparr.Persistence.Models.Configuration.Arr;
using Cleanuparr.Persistence.Models.Events;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
namespace Cleanuparr.Infrastructure.Events;
/// <summary>
/// Service for publishing events to database and SignalR hub
/// </summary>
public class EventPublisher : IEventPublisher
{
private readonly EventsContext _context;
private readonly IHubContext<AppHub> _appHubContext;
private readonly ILogger<EventPublisher> _logger;
private readonly INotificationPublisher _notificationPublisher;
private readonly IDryRunInterceptor _dryRunInterceptor;
public EventPublisher(
EventsContext context,
IHubContext<AppHub> appHubContext,
ILogger<EventPublisher> logger,
INotificationPublisher notificationPublisher,
IDryRunInterceptor dryRunInterceptor)
{
_context = context;
_appHubContext = appHubContext;
_logger = logger;
_notificationPublisher = notificationPublisher;
_dryRunInterceptor = dryRunInterceptor;
}
/// <summary>
/// Generic method for publishing events to database and SignalR clients
/// </summary>
public async Task PublishAsync(EventType eventType, string message, EventSeverity severity, object? data = null, Guid? trackingId = null, Guid? strikeId = null, bool? isDryRun = null)
{
AppEvent eventEntity = new()
{
EventType = eventType,
Message = message,
Severity = severity,
Data = data != null ? JsonSerializer.Serialize(data, new JsonSerializerOptions
{
Converters = { new JsonStringEnumConverter() }
}) : null,
TrackingId = trackingId,
StrikeId = strikeId,
JobRunId = ContextProvider.TryGetJobRunId(),
InstanceType = ContextProvider.Get(nameof(InstanceType)) is InstanceType it ? it : null,
InstanceUrl = (ContextProvider.Get(ContextProvider.Keys.ArrInstanceUrl) as Uri)?.ToString(),
DownloadClientType = ContextProvider.Get(ContextProvider.Keys.DownloadClientType) is DownloadClientTypeName dct ? dct : null,
DownloadClientName = ContextProvider.Get(ContextProvider.Keys.DownloadClientName) as string,
};
eventEntity.IsDryRun = isDryRun ?? await _dryRunInterceptor.IsDryRunEnabled();
await SaveEventToDatabase(eventEntity);
await NotifyClientsAsync(eventEntity);
_logger.LogTrace("Published event: {eventType}", eventType);
}
public async Task PublishManualAsync(string message, EventSeverity severity, object? data = null, bool? isDryRun = null)
{
ManualEvent eventEntity = new()
{
Message = message,
Severity = severity,
Data = data != null ? JsonSerializer.Serialize(data, new JsonSerializerOptions
{
Converters = { new JsonStringEnumConverter() }
}) : null,
JobRunId = ContextProvider.TryGetJobRunId(),
InstanceType = ContextProvider.Get(nameof(InstanceType)) is InstanceType it ? it : null,
InstanceUrl = (ContextProvider.Get(ContextProvider.Keys.ArrInstanceUrl) as Uri)?.ToString(),
DownloadClientType = ContextProvider.Get(ContextProvider.Keys.DownloadClientType) is DownloadClientTypeName dct ? dct : null,
DownloadClientName = ContextProvider.Get(ContextProvider.Keys.DownloadClientName) as string,
};
eventEntity.IsDryRun = isDryRun ?? await _dryRunInterceptor.IsDryRunEnabled();
await SaveManualEventToDatabase(eventEntity);
await NotifyClientsAsync(eventEntity);
_logger.LogTrace("Published manual event: {message}", message);
}
/// <summary>
/// Publishes a strike event with context data and notifications
/// </summary>
public async Task PublishStrike(StrikeType strikeType, int strikeCount, string hash, string itemName, Guid? strikeId = null)
{
// Determine the appropriate EventType based on StrikeType
EventType eventType = strikeType switch
{
StrikeType.Stalled => EventType.StalledStrike,
StrikeType.DownloadingMetadata => EventType.DownloadingMetadataStrike,
StrikeType.FailedImport => EventType.FailedImportStrike,
StrikeType.SlowSpeed => EventType.SlowSpeedStrike,
StrikeType.SlowTime => EventType.SlowTimeStrike,
_ => throw new ArgumentOutOfRangeException(nameof(strikeType), strikeType, null)
};
dynamic data;
if (strikeType is StrikeType.FailedImport)
{
QueueRecord record = ContextProvider.Get<QueueRecord>(nameof(QueueRecord));
data = new
{
hash,
itemName,
strikeCount,
strikeType,
failedImportReasons = record.StatusMessages ?? [],
};
}
else
{
data = new
{
hash,
itemName,
strikeCount,
strikeType,
};
}
bool isDryRun = await _dryRunInterceptor.IsDryRunEnabled();
// Publish the event
await PublishAsync(
eventType,
$"Item '{itemName}' has been struck {strikeCount} times for reason '{strikeType}'",
EventSeverity.Important,
data: data,
strikeId: strikeId,
isDryRun: isDryRun);
// Broadcast strike to SignalR clients for real-time dashboard updates
await BroadcastStrikeAsync(strikeId, strikeType, hash, itemName, isDryRun);
// Send notification (uses ContextProvider internally)
await _notificationPublisher.NotifyStrike(strikeType, strikeCount);
}
/// <summary>
/// Publishes a queue item deleted event with context data and notifications
/// </summary>
public async Task PublishQueueItemDeleted(bool removeFromClient, DeleteReason deleteReason)
{
// Get context data for the event
string itemName = ContextProvider.Get<string>(ContextProvider.Keys.ItemName);
string hash = ContextProvider.Get<string>(ContextProvider.Keys.Hash);
// Publish the event
await PublishAsync(
EventType.QueueItemDeleted,
$"Deleting item from queue with reason: {deleteReason}",
EventSeverity.Important,
data: new { itemName, hash, removeFromClient, deleteReason });
// Send notification (uses ContextProvider internally)
await _notificationPublisher.NotifyQueueItemDeleted(removeFromClient, deleteReason);
}
/// <summary>
/// Publishes a download cleaned event with context data and notifications
/// </summary>
public async Task PublishDownloadCleaned(double ratio, TimeSpan seedingTime, string categoryName, CleanReason reason)
{
// Get context data for the event
string itemName = ContextProvider.Get<string>(ContextProvider.Keys.ItemName);
string hash = ContextProvider.Get<string>(ContextProvider.Keys.Hash);
// Publish the event
await PublishAsync(
EventType.DownloadCleaned,
$"Cleaned item from download client with reason: {reason}",
EventSeverity.Important,
data: new { itemName, hash, categoryName, ratio, seedingTime = seedingTime.TotalHours, reason });
// Send notification (uses ContextProvider internally)
await _notificationPublisher.NotifyDownloadCleaned(ratio, seedingTime, categoryName, reason);
}
/// <summary>
/// Publishes a category changed event with context data and notifications
/// </summary>
public async Task PublishCategoryChanged(string oldCategory, string newCategory, bool isTag = false)
{
// Get context data for the event
string itemName = ContextProvider.Get<string>(ContextProvider.Keys.ItemName);
string hash = ContextProvider.Get<string>(ContextProvider.Keys.Hash);
// Publish the event
await PublishAsync(
EventType.CategoryChanged,
isTag ? $"Tag '{newCategory}' added to download" : $"Category changed from '{oldCategory}' to '{newCategory}'",
EventSeverity.Information,
data: new { itemName, hash, oldCategory, newCategory, isTag });
// Send notification (uses ContextProvider internally)
await _notificationPublisher.NotifyCategoryChanged(oldCategory, newCategory, isTag);
}
/// <summary>
/// Publishes an event alerting that an item keeps coming back
/// </summary>
public async Task PublishRecurringItem(string hash, string itemName, int strikeCount)
{
await PublishManualAsync(
"Download keeps coming back after deletion\nTo prevent further issues, please consult the prerequisites: https://cleanuparr.github.io/Cleanuparr/docs/installation/",
EventSeverity.Important,
data: new { itemName, hash, strikeCount }
);
}
/// <summary>
/// Publishes a search triggered event with context data and notifications.
/// Returns the event ID so the SeekerCommandMonitor can update it on completion.
/// </summary>
public async Task<Guid> PublishSearchTriggered(string instanceName, int itemCount, IEnumerable<string> items, SeekerSearchType searchType, Guid? cycleRunId = null)
{
var itemList = items as string[] ?? items.ToArray();
var itemsDisplay = string.Join(", ", itemList.Take(5)) + (itemList.Length > 5 ? $" (+{itemList.Length - 5} more)" : "");
AppEvent eventEntity = new()
{
EventType = EventType.SearchTriggered,
Message = $"Searched {itemCount} items on {instanceName}: {itemsDisplay}",
Severity = EventSeverity.Information,
Data = JsonSerializer.Serialize(
new { InstanceName = instanceName, ItemCount = itemCount, Items = itemList, SearchType = searchType.ToString(), CycleRunId = cycleRunId },
new JsonSerializerOptions { Converters = { new JsonStringEnumConverter() } }),
SearchStatus = SearchCommandStatus.Pending,
JobRunId = ContextProvider.TryGetJobRunId(),
InstanceType = ContextProvider.Get(nameof(InstanceType)) is InstanceType it ? it : null,
InstanceUrl = (ContextProvider.Get(ContextProvider.Keys.ArrInstanceUrl) as Uri)?.ToString(),
DownloadClientType = ContextProvider.Get(ContextProvider.Keys.DownloadClientType) is DownloadClientTypeName dct ? dct : null,
DownloadClientName = ContextProvider.Get(ContextProvider.Keys.DownloadClientName) as string,
CycleRunId = cycleRunId,
};
eventEntity.IsDryRun = await _dryRunInterceptor.IsDryRunEnabled();
await SaveEventToDatabase(eventEntity);
await NotifyClientsAsync(eventEntity);
await _notificationPublisher.NotifySearchTriggered(instanceName, itemCount, itemList);
return eventEntity.Id;
}
/// <summary>
/// Updates an existing search event with completion status and optional result data
/// </summary>
public async Task PublishSearchCompleted(Guid eventId, SearchCommandStatus status, object? resultData = null)
{
var existingEvent = await _context.Events.FindAsync(eventId);
if (existingEvent is null)
{
_logger.LogWarning("Could not find search event {EventId} to update completion status", eventId);
return;
}
existingEvent.SearchStatus = status;
existingEvent.CompletedAt = DateTime.UtcNow;
if (resultData is not null)
{
// Merge result data into existing Data JSON
var existingData = existingEvent.Data is not null
? JsonSerializer.Deserialize<Dictionary<string, object>>(existingEvent.Data)
: new Dictionary<string, object>();
var resultJson = JsonSerializer.Serialize(resultData, new JsonSerializerOptions
{
Converters = { new JsonStringEnumConverter() }
});
var resultDict = JsonSerializer.Deserialize<Dictionary<string, object>>(resultJson);
if (existingData is not null && resultDict is not null)
{
foreach (var kvp in resultDict)
{
existingData[kvp.Key] = kvp.Value;
}
existingEvent.Data = JsonSerializer.Serialize(existingData, new JsonSerializerOptions
{
Converters = { new JsonStringEnumConverter() }
});
}
}
await _context.SaveChangesAsync();
await NotifyClientsAsync(existingEvent);
}
/// <summary>
/// Publishes an event alerting that search was not triggered for an item
/// </summary>
public async Task PublishSearchNotTriggered(string hash, string itemName)
{
await PublishManualAsync(
"Replacement search was not triggered after removal\nPlease trigger a manual search if needed",
EventSeverity.Warning,
data: new { itemName, hash }
);
}
private async Task SaveEventToDatabase(AppEvent eventEntity)
{
_context.Events.Add(eventEntity);
await _context.SaveChangesAsync();
}
private async Task SaveManualEventToDatabase(ManualEvent eventEntity)
{
_context.ManualEvents.Add(eventEntity);
await _context.SaveChangesAsync();
}
private async Task NotifyClientsAsync(AppEvent appEventEntity)
{
try
{
// Send to all connected clients via the unified AppHub
await _appHubContext.Clients.All.SendAsync("EventReceived", appEventEntity);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to send event {eventId} to SignalR clients", appEventEntity.Id);
}
}
private async Task NotifyClientsAsync(ManualEvent appEventEntity)
{
try
{
// Send to all connected clients via the unified AppHub
await _appHubContext.Clients.All.SendAsync("ManualEventReceived", appEventEntity);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to send event {eventId} to SignalR clients", appEventEntity.Id);
}
}
private async Task BroadcastStrikeAsync(Guid? strikeId, StrikeType strikeType, string hash, string itemName, bool isDryRun)
{
try
{
var strike = new
{
Id = strikeId ?? Guid.Empty,
Type = strikeType.ToString(),
CreatedAt = DateTime.UtcNow,
DownloadId = hash,
Title = itemName,
IsDryRun = isDryRun,
};
await _appHubContext.Clients.All.SendAsync("StrikeReceived", strike);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to send strike to SignalR clients");
}
}
}