using System.Text.Json; using System.Text.Json.Serialization; using Cleanuparr.Domain.Entities.Arr.Queue; using Cleanuparr.Domain.Enums; using Cleanuparr.Infrastructure.Events.Interfaces; using Cleanuparr.Infrastructure.Features.Context; using Cleanuparr.Infrastructure.Features.Notifications; using Cleanuparr.Infrastructure.Hubs; using Cleanuparr.Infrastructure.Interceptors; using Cleanuparr.Persistence; using Cleanuparr.Persistence.Models.Configuration.Arr; using Cleanuparr.Persistence.Models.Events; using Microsoft.AspNetCore.SignalR; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.Logging; namespace Cleanuparr.Infrastructure.Events; /// /// Service for publishing events to database and SignalR hub /// public class EventPublisher : IEventPublisher { private readonly EventsContext _context; private readonly IHubContext _appHubContext; private readonly ILogger _logger; private readonly INotificationPublisher _notificationPublisher; private readonly IDryRunInterceptor _dryRunInterceptor; public EventPublisher( EventsContext context, IHubContext appHubContext, ILogger logger, INotificationPublisher notificationPublisher, IDryRunInterceptor dryRunInterceptor) { _context = context; _appHubContext = appHubContext; _logger = logger; _notificationPublisher = notificationPublisher; _dryRunInterceptor = dryRunInterceptor; } /// /// Generic method for publishing events to database and SignalR clients /// public async Task PublishAsync(EventType eventType, string message, EventSeverity severity, object? data = null, Guid? trackingId = null, Guid? strikeId = null, bool? isDryRun = null) { AppEvent eventEntity = new() { EventType = eventType, Message = message, Severity = severity, Data = data != null ? JsonSerializer.Serialize(data, new JsonSerializerOptions { Converters = { new JsonStringEnumConverter() } }) : null, TrackingId = trackingId, StrikeId = strikeId, JobRunId = ContextProvider.TryGetJobRunId(), ArrInstanceId = ContextProvider.Get(ContextProvider.Keys.ArrInstanceId) as Guid?, DownloadClientId = ContextProvider.Get(ContextProvider.Keys.DownloadClientId) as Guid?, InstanceType = ContextProvider.Get(nameof(InstanceType)) is InstanceType it ? it : null, InstanceUrl = (ContextProvider.Get(ContextProvider.Keys.ArrInstanceUrl) as Uri)?.ToString(), DownloadClientType = ContextProvider.Get(ContextProvider.Keys.DownloadClientType) is DownloadClientTypeName dct ? dct : null, DownloadClientName = ContextProvider.Get(ContextProvider.Keys.DownloadClientName) as string, }; eventEntity.IsDryRun = isDryRun ?? await _dryRunInterceptor.IsDryRunEnabled(); _context.Events.Add(eventEntity); await _context.SaveChangesAsync(); await NotifyClientsAsync(eventEntity); _logger.LogTrace("Published event: {eventType}", eventType); } public async Task PublishManualAsync(string message, EventSeverity severity, object? data = null, bool? isDryRun = null) { ManualEvent eventEntity = new() { Message = message, Severity = severity, Data = data != null ? JsonSerializer.Serialize(data, new JsonSerializerOptions { Converters = { new JsonStringEnumConverter() } }) : null, JobRunId = ContextProvider.TryGetJobRunId(), InstanceType = ContextProvider.Get(nameof(InstanceType)) is InstanceType it ? it : null, InstanceUrl = (ContextProvider.Get(ContextProvider.Keys.ArrInstanceUrl) as Uri)?.ToString(), DownloadClientType = ContextProvider.Get(ContextProvider.Keys.DownloadClientType) is DownloadClientTypeName dct ? dct : null, DownloadClientName = ContextProvider.Get(ContextProvider.Keys.DownloadClientName) as string, }; eventEntity.IsDryRun = isDryRun ?? await _dryRunInterceptor.IsDryRunEnabled(); _context.ManualEvents.Add(eventEntity); await _context.SaveChangesAsync(); await NotifyClientsAsync(eventEntity); _logger.LogTrace("Published manual event: {message}", message); } /// /// Publishes a strike event with context data and notifications /// public async Task PublishStrike(StrikeType strikeType, int strikeCount, string hash, string itemName, Guid? strikeId = null) { // Determine the appropriate EventType based on StrikeType EventType eventType = strikeType switch { StrikeType.Stalled => EventType.StalledStrike, StrikeType.DownloadingMetadata => EventType.DownloadingMetadataStrike, StrikeType.FailedImport => EventType.FailedImportStrike, StrikeType.SlowSpeed => EventType.SlowSpeedStrike, StrikeType.SlowTime => EventType.SlowTimeStrike, _ => throw new ArgumentOutOfRangeException(nameof(strikeType), strikeType, null) }; dynamic data; if (strikeType is StrikeType.FailedImport) { QueueRecord record = ContextProvider.Get(nameof(QueueRecord)); data = new { hash, itemName, strikeCount, strikeType, failedImportReasons = record.StatusMessages ?? [], }; } else { data = new { hash, itemName, strikeCount, strikeType, }; } bool isDryRun = await _dryRunInterceptor.IsDryRunEnabled(); // Publish the event await PublishAsync( eventType, $"Item '{itemName}' has been struck {strikeCount} times for reason '{strikeType}'", EventSeverity.Important, data: data, strikeId: strikeId, isDryRun: isDryRun); // Broadcast strike to SignalR clients for real-time dashboard updates await BroadcastStrikeAsync(strikeId, strikeType, hash, itemName, isDryRun); // Send notification (uses ContextProvider internally) await _notificationPublisher.NotifyStrike(strikeType, strikeCount); } /// /// Publishes a queue item deleted event with context data and notifications /// public async Task PublishQueueItemDeleted(bool removeFromClient, DeleteReason deleteReason) { // Get context data for the event string itemName = ContextProvider.Get(ContextProvider.Keys.ItemName); string hash = ContextProvider.Get(ContextProvider.Keys.Hash); // Publish the event await PublishAsync( EventType.QueueItemDeleted, $"Deleting item from queue with reason: {deleteReason}", EventSeverity.Important, data: new { itemName, hash, removeFromClient, deleteReason }); // Send notification (uses ContextProvider internally) await _notificationPublisher.NotifyQueueItemDeleted(removeFromClient, deleteReason); } /// /// Publishes a download cleaned event with context data and notifications /// public async Task PublishDownloadCleaned(double ratio, TimeSpan seedingTime, string categoryName, CleanReason reason) { // Get context data for the event string itemName = ContextProvider.Get(ContextProvider.Keys.ItemName); string hash = ContextProvider.Get(ContextProvider.Keys.Hash); // Publish the event await PublishAsync( EventType.DownloadCleaned, $"Cleaned item from download client with reason: {reason}", EventSeverity.Important, data: new { itemName, hash, categoryName, ratio, seedingTime = seedingTime.TotalHours, reason }); // Send notification (uses ContextProvider internally) await _notificationPublisher.NotifyDownloadCleaned(ratio, seedingTime, categoryName, reason); } /// /// Publishes a category changed event with context data and notifications /// public async Task PublishCategoryChanged(string oldCategory, string newCategory, bool isTag = false) { // Get context data for the event string itemName = ContextProvider.Get(ContextProvider.Keys.ItemName); string hash = ContextProvider.Get(ContextProvider.Keys.Hash); // Publish the event await PublishAsync( EventType.CategoryChanged, isTag ? $"Tag '{newCategory}' added to download" : $"Category changed from '{oldCategory}' to '{newCategory}'", EventSeverity.Information, data: new { itemName, hash, oldCategory, newCategory, isTag }); // Send notification (uses ContextProvider internally) await _notificationPublisher.NotifyCategoryChanged(oldCategory, newCategory, isTag); } /// /// Publishes an event alerting that an item keeps coming back /// public async Task PublishRecurringItem(string hash, string itemName, int strikeCount) { await PublishManualAsync( "Download keeps coming back after deletion\nTo prevent further issues, please consult the prerequisites: https://cleanuparr.github.io/Cleanuparr/docs/installation/", EventSeverity.Important, data: new { itemName, hash, strikeCount } ); } /// /// Publishes a search triggered event with context data and notifications. /// Returns the event ID so the SeekerCommandMonitor can update it on completion. /// public async Task PublishSearchTriggered(string itemTitle, SeekerSearchType searchType, SeekerSearchReason searchReason, Guid? cycleId = null) { AppEvent eventEntity = new() { EventType = EventType.SearchTriggered, Message = $"Search triggered for {itemTitle}", Severity = EventSeverity.Information, SearchStatus = SearchCommandStatus.Pending, JobRunId = ContextProvider.TryGetJobRunId(), ArrInstanceId = ContextProvider.Get(ContextProvider.Keys.ArrInstanceId) as Guid?, DownloadClientId = ContextProvider.Get(ContextProvider.Keys.DownloadClientId) as Guid?, InstanceType = ContextProvider.Get(nameof(InstanceType)) is InstanceType it ? it : null, InstanceUrl = (ContextProvider.Get(ContextProvider.Keys.ArrInstanceUrl) as Uri)?.ToString(), DownloadClientType = ContextProvider.Get(ContextProvider.Keys.DownloadClientType) is DownloadClientTypeName dct ? dct : null, DownloadClientName = ContextProvider.Get(ContextProvider.Keys.DownloadClientName) as string, CycleId = cycleId, }; eventEntity.IsDryRun = await _dryRunInterceptor.IsDryRunEnabled(); await using IDbContextTransaction transaction = await _context.Database.BeginTransactionAsync(); try { _context.Events.Add(eventEntity); _context.SearchEventData.Add(new SearchEventData { AppEventId = eventEntity.Id, ItemTitle = itemTitle, SearchType = searchType, SearchReason = searchReason, }); await _context.SaveChangesAsync(); await transaction.CommitAsync(); } catch { await transaction.RollbackAsync(); throw; } await NotifyClientsAsync(eventEntity); await _notificationPublisher.NotifySearchTriggered(itemTitle, searchType, searchReason); return eventEntity.Id; } /// /// Updates an existing search event with completion status and optional grabbed item titles /// public async Task PublishSearchCompleted(Guid eventId, SearchCommandStatus status, InstanceType instanceType, string instanceUrl, List? grabbedItems = null) { var existingEvent = await _context.Events .Include(e => e.SearchEventData) .FirstOrDefaultAsync(e => e.Id == eventId); if (existingEvent is null) { _logger.LogWarning("Could not find search event {EventId} to update completion status", eventId); return; } existingEvent.SearchStatus = status; existingEvent.CompletedAt = DateTime.UtcNow; if (grabbedItems is { Count: > 0 } && existingEvent.SearchEventData is not null) { existingEvent.SearchEventData.GrabbedItems = grabbedItems; } await _context.SaveChangesAsync(); await NotifyClientsAsync(existingEvent); if (status is SearchCommandStatus.Completed && grabbedItems is { Count: > 0 } && existingEvent.SearchEventData is not null) { await _notificationPublisher.NotifySearchItemGrabbed(existingEvent.SearchEventData.ItemTitle, grabbedItems, instanceType, instanceUrl); } } /// /// Publishes an event alerting that search was not triggered for an item /// public async Task PublishSearchNotTriggered(string hash, string itemName) { await PublishManualAsync( "Replacement search was not triggered after removal\nPlease trigger a manual search if needed", EventSeverity.Warning, data: new { itemName, hash } ); } private async Task NotifyClientsAsync(AppEvent appEventEntity) { try { // Send to all connected clients via the unified AppHub await _appHubContext.Clients.All.SendAsync("EventReceived", appEventEntity); } catch (Exception ex) { _logger.LogError(ex, "Failed to send event {eventId} to SignalR clients", appEventEntity.Id); } } private async Task NotifyClientsAsync(ManualEvent appEventEntity) { try { // Send to all connected clients via the unified AppHub await _appHubContext.Clients.All.SendAsync("ManualEventReceived", appEventEntity); } catch (Exception ex) { _logger.LogError(ex, "Failed to send event {eventId} to SignalR clients", appEventEntity.Id); } } private async Task BroadcastStrikeAsync(Guid? strikeId, StrikeType strikeType, string hash, string itemName, bool isDryRun) { try { var strike = new { Id = strikeId ?? Guid.Empty, Type = strikeType.ToString(), CreatedAt = DateTime.UtcNow, DownloadId = hash, Title = itemName, IsDryRun = isDryRun, }; await _appHubContext.Clients.All.SendAsync("StrikeReceived", strike); } catch (Exception ex) { _logger.LogError(ex, "Failed to send strike to SignalR clients"); } } }