using System.Text.Json; using System.Text.Json.Serialization; using Cleanuparr.Domain.Entities.Arr.Queue; using Cleanuparr.Domain.Enums; using Cleanuparr.Infrastructure.Events.Interfaces; using Cleanuparr.Infrastructure.Features.Context; using Cleanuparr.Infrastructure.Features.Notifications; using Cleanuparr.Infrastructure.Hubs; using Cleanuparr.Infrastructure.Interceptors; using Cleanuparr.Persistence; using Cleanuparr.Persistence.Models.Configuration.Arr; using Cleanuparr.Persistence.Models.Events; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Logging; namespace Cleanuparr.Infrastructure.Events; /// /// Service for publishing events to database and SignalR hub /// public class EventPublisher : IEventPublisher { private readonly EventsContext _context; private readonly IHubContext _appHubContext; private readonly ILogger _logger; private readonly INotificationPublisher _notificationPublisher; private readonly IDryRunInterceptor _dryRunInterceptor; public EventPublisher( EventsContext context, IHubContext appHubContext, ILogger logger, INotificationPublisher notificationPublisher, IDryRunInterceptor dryRunInterceptor) { _context = context; _appHubContext = appHubContext; _logger = logger; _notificationPublisher = notificationPublisher; _dryRunInterceptor = dryRunInterceptor; } /// /// Generic method for publishing events to database and SignalR clients /// public async Task PublishAsync(EventType eventType, string message, EventSeverity severity, object? data = null, Guid? trackingId = null, Guid? strikeId = null, bool? isDryRun = null) { AppEvent eventEntity = new() { EventType = eventType, Message = message, Severity = severity, Data = data != null ? JsonSerializer.Serialize(data, new JsonSerializerOptions { Converters = { new JsonStringEnumConverter() } }) : null, TrackingId = trackingId, StrikeId = strikeId, JobRunId = ContextProvider.TryGetJobRunId(), InstanceType = ContextProvider.Get(nameof(InstanceType)) is InstanceType it ? it : null, InstanceUrl = (ContextProvider.Get(ContextProvider.Keys.ArrInstanceUrl) as Uri)?.ToString(), DownloadClientType = ContextProvider.Get(ContextProvider.Keys.DownloadClientType) is DownloadClientTypeName dct ? dct : null, DownloadClientName = ContextProvider.Get(ContextProvider.Keys.DownloadClientName) as string, }; eventEntity.IsDryRun = isDryRun ?? await _dryRunInterceptor.IsDryRunEnabled(); await SaveEventToDatabase(eventEntity); await NotifyClientsAsync(eventEntity); _logger.LogTrace("Published event: {eventType}", eventType); } public async Task PublishManualAsync(string message, EventSeverity severity, object? data = null, bool? isDryRun = null) { ManualEvent eventEntity = new() { Message = message, Severity = severity, Data = data != null ? JsonSerializer.Serialize(data, new JsonSerializerOptions { Converters = { new JsonStringEnumConverter() } }) : null, JobRunId = ContextProvider.TryGetJobRunId(), InstanceType = ContextProvider.Get(nameof(InstanceType)) is InstanceType it ? it : null, InstanceUrl = (ContextProvider.Get(ContextProvider.Keys.ArrInstanceUrl) as Uri)?.ToString(), DownloadClientType = ContextProvider.Get(ContextProvider.Keys.DownloadClientType) is DownloadClientTypeName dct ? dct : null, DownloadClientName = ContextProvider.Get(ContextProvider.Keys.DownloadClientName) as string, }; eventEntity.IsDryRun = isDryRun ?? await _dryRunInterceptor.IsDryRunEnabled(); await SaveManualEventToDatabase(eventEntity); await NotifyClientsAsync(eventEntity); _logger.LogTrace("Published manual event: {message}", message); } /// /// Publishes a strike event with context data and notifications /// public async Task PublishStrike(StrikeType strikeType, int strikeCount, string hash, string itemName, Guid? strikeId = null) { // Determine the appropriate EventType based on StrikeType EventType eventType = strikeType switch { StrikeType.Stalled => EventType.StalledStrike, StrikeType.DownloadingMetadata => EventType.DownloadingMetadataStrike, StrikeType.FailedImport => EventType.FailedImportStrike, StrikeType.SlowSpeed => EventType.SlowSpeedStrike, StrikeType.SlowTime => EventType.SlowTimeStrike, _ => throw new ArgumentOutOfRangeException(nameof(strikeType), strikeType, null) }; dynamic data; if (strikeType is StrikeType.FailedImport) { QueueRecord record = ContextProvider.Get(nameof(QueueRecord)); data = new { hash, itemName, strikeCount, strikeType, failedImportReasons = record.StatusMessages ?? [], }; } else { data = new { hash, itemName, strikeCount, strikeType, }; } bool isDryRun = await _dryRunInterceptor.IsDryRunEnabled(); // Publish the event await PublishAsync( eventType, $"Item '{itemName}' has been struck {strikeCount} times for reason '{strikeType}'", EventSeverity.Important, data: data, strikeId: strikeId, isDryRun: isDryRun); // Broadcast strike to SignalR clients for real-time dashboard updates await BroadcastStrikeAsync(strikeId, strikeType, hash, itemName, isDryRun); // Send notification (uses ContextProvider internally) await _notificationPublisher.NotifyStrike(strikeType, strikeCount); } /// /// Publishes a queue item deleted event with context data and notifications /// public async Task PublishQueueItemDeleted(bool removeFromClient, DeleteReason deleteReason) { // Get context data for the event string itemName = ContextProvider.Get(ContextProvider.Keys.ItemName); string hash = ContextProvider.Get(ContextProvider.Keys.Hash); // Publish the event await PublishAsync( EventType.QueueItemDeleted, $"Deleting item from queue with reason: {deleteReason}", EventSeverity.Important, data: new { itemName, hash, removeFromClient, deleteReason }); // Send notification (uses ContextProvider internally) await _notificationPublisher.NotifyQueueItemDeleted(removeFromClient, deleteReason); } /// /// Publishes a download cleaned event with context data and notifications /// public async Task PublishDownloadCleaned(double ratio, TimeSpan seedingTime, string categoryName, CleanReason reason) { // Get context data for the event string itemName = ContextProvider.Get(ContextProvider.Keys.ItemName); string hash = ContextProvider.Get(ContextProvider.Keys.Hash); // Publish the event await PublishAsync( EventType.DownloadCleaned, $"Cleaned item from download client with reason: {reason}", EventSeverity.Important, data: new { itemName, hash, categoryName, ratio, seedingTime = seedingTime.TotalHours, reason }); // Send notification (uses ContextProvider internally) await _notificationPublisher.NotifyDownloadCleaned(ratio, seedingTime, categoryName, reason); } /// /// Publishes a category changed event with context data and notifications /// public async Task PublishCategoryChanged(string oldCategory, string newCategory, bool isTag = false) { // Get context data for the event string itemName = ContextProvider.Get(ContextProvider.Keys.ItemName); string hash = ContextProvider.Get(ContextProvider.Keys.Hash); // Publish the event await PublishAsync( EventType.CategoryChanged, isTag ? $"Tag '{newCategory}' added to download" : $"Category changed from '{oldCategory}' to '{newCategory}'", EventSeverity.Information, data: new { itemName, hash, oldCategory, newCategory, isTag }); // Send notification (uses ContextProvider internally) await _notificationPublisher.NotifyCategoryChanged(oldCategory, newCategory, isTag); } /// /// Publishes an event alerting that an item keeps coming back /// public async Task PublishRecurringItem(string hash, string itemName, int strikeCount) { await PublishManualAsync( "Download keeps coming back after deletion\nTo prevent further issues, please consult the prerequisites: https://cleanuparr.github.io/Cleanuparr/docs/installation/", EventSeverity.Important, data: new { itemName, hash, strikeCount } ); } /// /// Publishes a search triggered event with context data and notifications. /// Returns the event ID so the SeekerCommandMonitor can update it on completion. /// public async Task PublishSearchTriggered(string instanceName, int itemCount, IEnumerable items, SeekerSearchType searchType, Guid? cycleId = null) { var itemList = items as string[] ?? items.ToArray(); var itemsDisplay = string.Join(", ", itemList.Take(5)) + (itemList.Length > 5 ? $" (+{itemList.Length - 5} more)" : ""); AppEvent eventEntity = new() { EventType = EventType.SearchTriggered, Message = $"Searched {itemCount} items on {instanceName}: {itemsDisplay}", Severity = EventSeverity.Information, Data = JsonSerializer.Serialize( new { InstanceName = instanceName, ItemCount = itemCount, Items = itemList, SearchType = searchType.ToString(), CycleId = cycleId }, new JsonSerializerOptions { Converters = { new JsonStringEnumConverter() } }), SearchStatus = SearchCommandStatus.Pending, JobRunId = ContextProvider.TryGetJobRunId(), InstanceType = ContextProvider.Get(nameof(InstanceType)) is InstanceType it ? it : null, InstanceUrl = (ContextProvider.Get(ContextProvider.Keys.ArrInstanceUrl) as Uri)?.ToString(), DownloadClientType = ContextProvider.Get(ContextProvider.Keys.DownloadClientType) is DownloadClientTypeName dct ? dct : null, DownloadClientName = ContextProvider.Get(ContextProvider.Keys.DownloadClientName) as string, CycleId = cycleId, }; eventEntity.IsDryRun = await _dryRunInterceptor.IsDryRunEnabled(); await SaveEventToDatabase(eventEntity); await NotifyClientsAsync(eventEntity); await _notificationPublisher.NotifySearchTriggered(instanceName, itemCount, itemList); return eventEntity.Id; } /// /// Updates an existing search event with completion status and optional result data /// public async Task PublishSearchCompleted(Guid eventId, SearchCommandStatus status, object? resultData = null) { var existingEvent = await _context.Events.FindAsync(eventId); if (existingEvent is null) { _logger.LogWarning("Could not find search event {EventId} to update completion status", eventId); return; } existingEvent.SearchStatus = status; existingEvent.CompletedAt = DateTime.UtcNow; if (resultData is not null) { // Merge result data into existing Data JSON var existingData = existingEvent.Data is not null ? JsonSerializer.Deserialize>(existingEvent.Data) : new Dictionary(); var resultJson = JsonSerializer.Serialize(resultData, new JsonSerializerOptions { Converters = { new JsonStringEnumConverter() } }); var resultDict = JsonSerializer.Deserialize>(resultJson); if (existingData is not null && resultDict is not null) { foreach (var kvp in resultDict) { existingData[kvp.Key] = kvp.Value; } existingEvent.Data = JsonSerializer.Serialize(existingData, new JsonSerializerOptions { Converters = { new JsonStringEnumConverter() } }); } } await _context.SaveChangesAsync(); await NotifyClientsAsync(existingEvent); } /// /// Publishes an event alerting that search was not triggered for an item /// public async Task PublishSearchNotTriggered(string hash, string itemName) { await PublishManualAsync( "Replacement search was not triggered after removal\nPlease trigger a manual search if needed", EventSeverity.Warning, data: new { itemName, hash } ); } private async Task SaveEventToDatabase(AppEvent eventEntity) { _context.Events.Add(eventEntity); await _context.SaveChangesAsync(); } private async Task SaveManualEventToDatabase(ManualEvent eventEntity) { _context.ManualEvents.Add(eventEntity); await _context.SaveChangesAsync(); } private async Task NotifyClientsAsync(AppEvent appEventEntity) { try { // Send to all connected clients via the unified AppHub await _appHubContext.Clients.All.SendAsync("EventReceived", appEventEntity); } catch (Exception ex) { _logger.LogError(ex, "Failed to send event {eventId} to SignalR clients", appEventEntity.Id); } } private async Task NotifyClientsAsync(ManualEvent appEventEntity) { try { // Send to all connected clients via the unified AppHub await _appHubContext.Clients.All.SendAsync("ManualEventReceived", appEventEntity); } catch (Exception ex) { _logger.LogError(ex, "Failed to send event {eventId} to SignalR clients", appEventEntity.Id); } } private async Task BroadcastStrikeAsync(Guid? strikeId, StrikeType strikeType, string hash, string itemName, bool isDryRun) { try { var strike = new { Id = strikeId ?? Guid.Empty, Type = strikeType.ToString(), CreatedAt = DateTime.UtcNow, DownloadId = hash, Title = itemName, IsDryRun = isDryRun, }; await _appHubContext.Clients.All.SendAsync("StrikeReceived", strike); } catch (Exception ex) { _logger.LogError(ex, "Failed to send strike to SignalR clients"); } } }