From 607bebaf0f93c90aa317ef30ca9c55870e2ce24b Mon Sep 17 00:00:00 2001 From: Flaminel Date: Tue, 27 May 2025 14:30:59 +0300 Subject: [PATCH] fixed event architecture --- code/Data/Enums/EventType.cs | 4 +- code/Data/Enums/StrikeType.cs | 2 +- code/Infrastructure/Events/EventPublisher.cs | 114 +++++++++++++++++- .../Logging/LoggingInitializer.cs | 2 +- .../Infrastructure/Verticals/Arr/ArrClient.cs | 2 +- .../DownloadClient/Deluge/DelugeService.cs | 10 +- .../DownloadClient/DownloadService.cs | 6 +- .../DownloadClient/QBittorrent/QBitService.cs | 10 +- .../Transmission/TransmissionService.cs | 10 +- .../DownloadRemover/QueueItemRemover.cs | 16 ++- .../Verticals/ItemStriker/Striker.cs | 12 +- .../Notifications/NotificationPublisher.cs | 2 +- 12 files changed, 150 insertions(+), 40 deletions(-) diff --git a/code/Data/Enums/EventType.cs b/code/Data/Enums/EventType.cs index eed7e0dc..7b2e31e8 100644 --- a/code/Data/Enums/EventType.cs +++ b/code/Data/Enums/EventType.cs @@ -4,7 +4,9 @@ public enum EventType { FailedImportStrike, StalledStrike, - SlowStrike, + DownloadingMetadataStrike, + SlowSpeedStrike, + SlowTimeStrike, QueueItemDeleted, DownloadCleaned, CategoryChanged diff --git a/code/Data/Enums/StrikeType.cs b/code/Data/Enums/StrikeType.cs index 123ec254..8549a02d 100644 --- a/code/Data/Enums/StrikeType.cs +++ b/code/Data/Enums/StrikeType.cs @@ -4,7 +4,7 @@ public enum StrikeType { Stalled, DownloadingMetadata, - ImportFailed, + FailedImport, SlowSpeed, SlowTime, } \ No newline at end of file diff --git a/code/Infrastructure/Events/EventPublisher.cs b/code/Infrastructure/Events/EventPublisher.cs index af4c41e4..c7190b0e 100644 --- a/code/Infrastructure/Events/EventPublisher.cs +++ b/code/Infrastructure/Events/EventPublisher.cs @@ -4,6 +4,10 @@ using System.Text.Json; using Data; using Data.Enums; using Data.Models.Events; +using Infrastructure.Verticals.Notifications; +using Infrastructure.Verticals.Context; +using Infrastructure.Interceptors; +using Common.Attributes; namespace Infrastructure.Events; @@ -15,19 +19,25 @@ public class EventPublisher private readonly DataContext _context; private readonly IHubContext _hubContext; private readonly ILogger _logger; + private readonly INotificationPublisher _notificationPublisher; + private readonly IDryRunInterceptor _dryRunInterceptor; public EventPublisher( DataContext context, IHubContext hubContext, - ILogger logger) + ILogger logger, + INotificationPublisher notificationPublisher, + IDryRunInterceptor dryRunInterceptor) { _context = context; _hubContext = hubContext; _logger = logger; + _notificationPublisher = notificationPublisher; + _dryRunInterceptor = dryRunInterceptor; } /// - /// Publishes an event to database and SignalR clients + /// Generic method for publishing events to database and SignalR clients /// public async Task PublishAsync(EventType eventType, string message, EventSeverity severity, object? data = null, Guid? trackingId = null) { @@ -40,16 +50,108 @@ public class EventPublisher TrackingId = trackingId }; - // Save to database - _context.Events.Add(eventEntity); - await _context.SaveChangesAsync(); + // Save to database with dry run interception + await _dryRunInterceptor.InterceptAsync(SaveEventToDatabase, eventEntity); - // Send to SignalR clients + // Always send to SignalR clients (not affected by dry run) await NotifyClientsAsync(eventEntity); _logger.LogTrace("Published event: {eventType}", eventType); } + /// + /// Publishes a strike event with context data and notifications + /// + public async Task PublishStrike(StrikeType strikeType, int strikeCount, string hash, string itemName) + { + // Determine the appropriate EventType based on StrikeType + EventType eventType = strikeType switch + { + StrikeType.Stalled => EventType.StalledStrike, + StrikeType.DownloadingMetadata => EventType.DownloadingMetadataStrike, + StrikeType.FailedImport => EventType.FailedImportStrike, + StrikeType.SlowSpeed => EventType.SlowSpeedStrike, + StrikeType.SlowTime => EventType.SlowTimeStrike, + }; + + // Publish the event + await PublishAsync( + eventType, + $"Item '{itemName}' has been struck {strikeCount} times for reason '{strikeType}'", + EventSeverity.Important, + data: new { hash, itemName, strikeCount, strikeType }); + + // Send notification (uses ContextProvider internally) + await _notificationPublisher.NotifyStrike(strikeType, strikeCount); + } + + /// + /// Publishes a queue item deleted event with context data and notifications + /// + public async Task PublishQueueItemDeleted(bool removeFromClient, DeleteReason deleteReason) + { + // Get context data for the event + string downloadName = ContextProvider.Get("downloadName") ?? "Unknown"; + string hash = ContextProvider.Get("hash") ?? "Unknown"; + + // Publish the event + await PublishAsync( + EventType.QueueItemDeleted, + $"Deleting item from queue with reason: {deleteReason}", + EventSeverity.Important, + data: new { downloadName, hash, removeFromClient, deleteReason }); + + // Send notification (uses ContextProvider internally) + await _notificationPublisher.NotifyQueueItemDeleted(removeFromClient, deleteReason); + } + + /// + /// Publishes a download cleaned event with context data and notifications + /// + public async Task PublishDownloadCleaned(double ratio, TimeSpan seedingTime, string categoryName, CleanReason reason) + { + // Get context data for the event + string downloadName = ContextProvider.Get("downloadName") ?? "Unknown"; + string hash = ContextProvider.Get("hash") ?? "Unknown"; + + // Publish the event + await PublishAsync( + EventType.DownloadCleaned, + $"Cleaned item from download client with reason: {reason}", + EventSeverity.Important, + data: new { downloadName, hash, categoryName, ratio, seedingTime = seedingTime.TotalHours, reason }); + + // Send notification (uses ContextProvider internally) + await _notificationPublisher.NotifyDownloadCleaned(ratio, seedingTime, categoryName, reason); + } + + /// + /// Publishes a category changed event with context data and notifications + /// + public async Task PublishCategoryChanged(string oldCategory, string newCategory, bool isTag = false) + { + // Get context data for the event + string downloadName = ContextProvider.Get("downloadName") ?? "Unknown"; + string hash = ContextProvider.Get("hash") ?? "Unknown"; + + // Publish the event + await PublishAsync( + EventType.CategoryChanged, + isTag ? $"Tag '{newCategory}' added to download" : $"Category changed from '{oldCategory}' to '{newCategory}'", + EventSeverity.Information, + data: new { downloadName, hash, oldCategory, newCategory, isTag }); + + // Send notification (uses ContextProvider internally) + await _notificationPublisher.NotifyCategoryChanged(oldCategory, newCategory, isTag); + } + + [DryRunSafeguard] + private async Task SaveEventToDatabase(AppEvent eventEntity) + { + _context.Events.Add(eventEntity); + await _context.SaveChangesAsync(); + } + private async Task NotifyClientsAsync(AppEvent appEventEntity) { try diff --git a/code/Infrastructure/Logging/LoggingInitializer.cs b/code/Infrastructure/Logging/LoggingInitializer.cs index 084bf230..a2deebee 100644 --- a/code/Infrastructure/Logging/LoggingInitializer.cs +++ b/code/Infrastructure/Logging/LoggingInitializer.cs @@ -24,7 +24,7 @@ public class LoggingInitializer : BackgroundService try { await _eventPublisher.PublishAsync( - EventType.SlowStrike, + EventType.SlowSpeedStrike, "test", EventSeverity.Important, data: new { Hash = "hash", Name = "name", StrikeCount = "1", Type = "stalled" }); diff --git a/code/Infrastructure/Verticals/Arr/ArrClient.cs b/code/Infrastructure/Verticals/Arr/ArrClient.cs index 777e5eb6..017208b9 100644 --- a/code/Infrastructure/Verticals/Arr/ArrClient.cs +++ b/code/Infrastructure/Verticals/Arr/ArrClient.cs @@ -114,7 +114,7 @@ public abstract class ArrClient : IArrClient record.DownloadId, record.Title, maxStrikes, - StrikeType.ImportFailed + StrikeType.FailedImport ); } diff --git a/code/Infrastructure/Verticals/DownloadClient/Deluge/DelugeService.cs b/code/Infrastructure/Verticals/DownloadClient/Deluge/DelugeService.cs index 90280380..5549651a 100644 --- a/code/Infrastructure/Verticals/DownloadClient/Deluge/DelugeService.cs +++ b/code/Infrastructure/Verticals/DownloadClient/Deluge/DelugeService.cs @@ -8,6 +8,7 @@ using Common.CustomDataTypes; using Common.Exceptions; using Data.Enums; using Data.Models.Deluge.Response; +using Infrastructure.Events; using Infrastructure.Extensions; using Infrastructure.Interceptors; using Infrastructure.Verticals.ContentBlocker; @@ -35,11 +36,12 @@ public class DelugeService : DownloadService, IDelugeService INotificationPublisher notifier, IDryRunInterceptor dryRunInterceptor, IHardLinkFileService hardLinkFileService, - IDynamicHttpClientProvider httpClientProvider + IDynamicHttpClientProvider httpClientProvider, + EventPublisher eventPublisher ) : base( logger, configManager, cache, filenameEvaluator, striker, notifier, dryRunInterceptor, hardLinkFileService, - httpClientProvider + httpClientProvider, eventPublisher ) { // Client will be initialized when Initialize() is called with a specific client configuration @@ -348,7 +350,7 @@ public class DelugeService : DownloadService, IDelugeService download.Name ); - await _notifier.NotifyDownloadCleaned(download.Ratio, seedingTime, category.Name, result.Reason); + await _eventPublisher.PublishDownloadCleaned(download.Ratio, seedingTime, category.Name, result.Reason); } } @@ -446,7 +448,7 @@ public class DelugeService : DownloadService, IDelugeService _logger.LogInformation("category changed for {name}", download.Name); - await _notifier.NotifyCategoryChanged(download.Label, _downloadCleanerConfig.UnlinkedTargetCategory); + await _eventPublisher.PublishCategoryChanged(download.Label, _downloadCleanerConfig.UnlinkedTargetCategory); download.Label = _downloadCleanerConfig.UnlinkedTargetCategory; } diff --git a/code/Infrastructure/Verticals/DownloadClient/DownloadService.cs b/code/Infrastructure/Verticals/DownloadClient/DownloadService.cs index d5f7388b..e770ddc0 100644 --- a/code/Infrastructure/Verticals/DownloadClient/DownloadService.cs +++ b/code/Infrastructure/Verticals/DownloadClient/DownloadService.cs @@ -9,6 +9,7 @@ using Common.Helpers; using Data.Enums; using Data.Models.Cache; using Infrastructure.Configuration; +using Infrastructure.Events; using Infrastructure.Helpers; using Infrastructure.Http; using Infrastructure.Interceptors; @@ -34,6 +35,7 @@ public abstract class DownloadService : IDownloadService protected readonly IDryRunInterceptor _dryRunInterceptor; protected readonly IHardLinkFileService _hardLinkFileService; protected readonly IDynamicHttpClientProvider _httpClientProvider; + protected readonly EventPublisher _eventPublisher; protected readonly QueueCleanerConfig _queueCleanerConfig; protected readonly ContentBlockerConfig _contentBlockerConfig; protected readonly DownloadCleanerConfig _downloadCleanerConfig; @@ -53,7 +55,8 @@ public abstract class DownloadService : IDownloadService INotificationPublisher notifier, IDryRunInterceptor dryRunInterceptor, IHardLinkFileService hardLinkFileService, - IDynamicHttpClientProvider httpClientProvider + IDynamicHttpClientProvider httpClientProvider, + EventPublisher eventPublisher ) { _logger = logger; @@ -65,6 +68,7 @@ public abstract class DownloadService : IDownloadService _dryRunInterceptor = dryRunInterceptor; _hardLinkFileService = hardLinkFileService; _httpClientProvider = httpClientProvider; + _eventPublisher = eventPublisher; _cacheOptions = new MemoryCacheEntryOptions() .SetSlidingExpiration(StaticConfiguration.TriggerValue + Constants.CacheLimitBuffer); diff --git a/code/Infrastructure/Verticals/DownloadClient/QBittorrent/QBitService.cs b/code/Infrastructure/Verticals/DownloadClient/QBittorrent/QBitService.cs index ed8ecbd7..29706c50 100644 --- a/code/Infrastructure/Verticals/DownloadClient/QBittorrent/QBitService.cs +++ b/code/Infrastructure/Verticals/DownloadClient/QBittorrent/QBitService.cs @@ -20,6 +20,7 @@ using Infrastructure.Verticals.Notifications; using Microsoft.Extensions.Caching.Memory; using Microsoft.Extensions.Logging; using QBittorrent.Client; +using Infrastructure.Events; namespace Infrastructure.Verticals.DownloadClient.QBittorrent; @@ -37,10 +38,11 @@ public class QBitService : DownloadService, IQBitService INotificationPublisher notifier, IDryRunInterceptor dryRunInterceptor, IHardLinkFileService hardLinkFileService, - IDynamicHttpClientProvider httpClientProvider + IDynamicHttpClientProvider httpClientProvider, + EventPublisher eventPublisher ) : base( logger, configManager, cache, filenameEvaluator, striker, notifier, dryRunInterceptor, hardLinkFileService, - httpClientProvider + httpClientProvider, eventPublisher ) { // Client will be initialized when Initialize() is called with a specific client configuration @@ -394,7 +396,7 @@ public class QBitService : DownloadService, IQBitService download.Name ); - await _notifier.NotifyDownloadCleaned(download.Ratio, download.SeedingTime ?? TimeSpan.Zero, category.Name, result.Reason); + await _eventPublisher.PublishDownloadCleaned(download.Ratio, download.SeedingTime ?? TimeSpan.Zero, category.Name, result.Reason); } } @@ -517,7 +519,7 @@ public class QBitService : DownloadService, IQBitService download.Category = _downloadCleanerConfig.UnlinkedTargetCategory; } - await _notifier.NotifyCategoryChanged(download.Category, _downloadCleanerConfig.UnlinkedTargetCategory, _downloadCleanerConfig.UnlinkedUseTag); + await _eventPublisher.PublishCategoryChanged(download.Category, _downloadCleanerConfig.UnlinkedTargetCategory, _downloadCleanerConfig.UnlinkedUseTag); } } diff --git a/code/Infrastructure/Verticals/DownloadClient/Transmission/TransmissionService.cs b/code/Infrastructure/Verticals/DownloadClient/Transmission/TransmissionService.cs index b9ea8343..9d787840 100644 --- a/code/Infrastructure/Verticals/DownloadClient/Transmission/TransmissionService.cs +++ b/code/Infrastructure/Verticals/DownloadClient/Transmission/TransmissionService.cs @@ -8,6 +8,7 @@ using Common.Configuration.QueueCleaner; using Common.CustomDataTypes; using Common.Helpers; using Data.Enums; +using Infrastructure.Events; using Infrastructure.Extensions; using Infrastructure.Interceptors; using Infrastructure.Verticals.ContentBlocker; @@ -57,11 +58,12 @@ public class TransmissionService : DownloadService, ITransmissionService INotificationPublisher notifier, IDryRunInterceptor dryRunInterceptor, IHardLinkFileService hardLinkFileService, - IDynamicHttpClientProvider httpClientProvider + IDynamicHttpClientProvider httpClientProvider, + EventPublisher eventPublisher ) : base( logger, configManager, cache, filenameEvaluator, striker, notifier, dryRunInterceptor, hardLinkFileService, - httpClientProvider + httpClientProvider, eventPublisher ) { // Client will be initialized when Initialize() is called with a specific client configuration @@ -360,7 +362,7 @@ public class TransmissionService : DownloadService, ITransmissionService download.Name ); - await _notifier.NotifyDownloadCleaned(download.uploadRatio ?? 0, seedingTime, category.Name, result.Reason); + await _eventPublisher.PublishDownloadCleaned(download.uploadRatio ?? 0, seedingTime, category.Name, result.Reason); } } @@ -452,7 +454,7 @@ public class TransmissionService : DownloadService, ITransmissionService _logger.LogInformation("category changed for {name}", download.Name); - await _notifier.NotifyCategoryChanged(currentCategory, _downloadCleanerConfig.UnlinkedTargetCategory); + await _eventPublisher.PublishCategoryChanged(currentCategory, _downloadCleanerConfig.UnlinkedTargetCategory); download.DownloadDir = newLocation; } diff --git a/code/Infrastructure/Verticals/DownloadRemover/QueueItemRemover.cs b/code/Infrastructure/Verticals/DownloadRemover/QueueItemRemover.cs index 59579ae4..4310c7d3 100644 --- a/code/Infrastructure/Verticals/DownloadRemover/QueueItemRemover.cs +++ b/code/Infrastructure/Verticals/DownloadRemover/QueueItemRemover.cs @@ -3,12 +3,12 @@ using Common.Configuration.General; using Data.Enums; using Data.Models.Arr; using Data.Models.Arr.Queue; +using Infrastructure.Events; using Infrastructure.Helpers; using Infrastructure.Verticals.Arr; using Infrastructure.Verticals.Context; using Infrastructure.Verticals.DownloadRemover.Interfaces; using Infrastructure.Verticals.DownloadRemover.Models; -using Infrastructure.Verticals.Notifications; using Microsoft.Extensions.Caching.Memory; using Infrastructure.Configuration; @@ -19,19 +19,19 @@ public sealed class QueueItemRemover : IQueueItemRemover private readonly GeneralConfig _generalConfig; private readonly IMemoryCache _cache; private readonly ArrClientFactory _arrClientFactory; - private readonly INotificationPublisher _notifier; + private readonly EventPublisher _eventPublisher; public QueueItemRemover( IConfigManager configManager, IMemoryCache cache, ArrClientFactory arrClientFactory, - INotificationPublisher notifier + EventPublisher eventPublisher ) { _generalConfig = configManager.GetConfiguration(); _cache = cache; _arrClientFactory = arrClientFactory; - _notifier = notifier; + _eventPublisher = eventPublisher; } public async Task RemoveQueueItemAsync(QueueItemRemoveRequest request) @@ -42,11 +42,15 @@ public sealed class QueueItemRemover : IQueueItemRemover var arrClient = _arrClientFactory.GetClient(request.InstanceType); await arrClient.DeleteQueueItemAsync(request.Instance, request.Record, request.RemoveFromClient, request.DeleteReason); - // push to context + // Set context for EventPublisher + ContextProvider.Set("downloadName", request.Record.Title); + ContextProvider.Set("hash", request.Record.DownloadId); ContextProvider.Set(nameof(QueueRecord), request.Record); ContextProvider.Set(nameof(ArrInstance) + nameof(ArrInstance.Url), request.Instance.Url); ContextProvider.Set(nameof(InstanceType), request.InstanceType); - await _notifier.NotifyQueueItemDeleted(request.RemoveFromClient, request.DeleteReason); + + // Use the new centralized EventPublisher method + await _eventPublisher.PublishQueueItemDeleted(request.RemoveFromClient, request.DeleteReason); if (!_generalConfig.SearchEnabled) { diff --git a/code/Infrastructure/Verticals/ItemStriker/Striker.cs b/code/Infrastructure/Verticals/ItemStriker/Striker.cs index b3068b8e..e7ce0319 100644 --- a/code/Infrastructure/Verticals/ItemStriker/Striker.cs +++ b/code/Infrastructure/Verticals/ItemStriker/Striker.cs @@ -4,7 +4,6 @@ using Infrastructure.Events; using Infrastructure.Helpers; using Infrastructure.Interceptors; using Infrastructure.Verticals.Context; -using Infrastructure.Verticals.Notifications; using Microsoft.Extensions.Caching.Memory; using Microsoft.Extensions.Logging; @@ -15,14 +14,12 @@ public sealed class Striker : IStriker private readonly ILogger _logger; private readonly IMemoryCache _cache; private readonly MemoryCacheEntryOptions _cacheOptions; - private readonly INotificationPublisher _notifier; private readonly EventPublisher _eventPublisher; - public Striker(ILogger logger, IMemoryCache cache, INotificationPublisher notifier, EventPublisher eventPublisher) + public Striker(ILogger logger, IMemoryCache cache, EventPublisher eventPublisher) { _logger = logger; _cache = cache; - _notifier = notifier; _eventPublisher = eventPublisher; _cacheOptions = new MemoryCacheEntryOptions() .SetSlidingExpiration(StaticConfiguration.TriggerValue + Constants.CacheLimitBuffer); @@ -48,12 +45,7 @@ public sealed class Striker : IStriker _logger.LogInformation("item on strike number {strike} | reason {reason} | {name}", strikeCount, strikeType.ToString(), itemName); - await _notifier.NotifyStrike(strikeType, strikeCount); - await _eventPublisher.PublishAsync( - EventType.SlowStrike, // TODO - $"Item '{itemName}' has been struck {strikeCount} times for reason '{strikeType}'", - EventSeverity.Important, - data: new { hash, itemName, strikeCount, strikeType }); + await _eventPublisher.PublishStrike(strikeType, strikeCount, hash, itemName); _cache.Set(key, strikeCount, _cacheOptions); diff --git a/code/Infrastructure/Verticals/Notifications/NotificationPublisher.cs b/code/Infrastructure/Verticals/Notifications/NotificationPublisher.cs index 33c12e08..44005d03 100644 --- a/code/Infrastructure/Verticals/Notifications/NotificationPublisher.cs +++ b/code/Infrastructure/Verticals/Notifications/NotificationPublisher.cs @@ -51,7 +51,7 @@ public class NotificationPublisher : INotificationPublisher case StrikeType.DownloadingMetadata: await NotifyInternal(notification.Adapt()); break; - case StrikeType.ImportFailed: + case StrikeType.FailedImport: await NotifyInternal(notification.Adapt()); break; case StrikeType.SlowSpeed: