fixed event architecture

This commit is contained in:
Flaminel
2025-05-27 14:30:59 +03:00
parent 0da1ef518a
commit 607bebaf0f
12 changed files with 150 additions and 40 deletions

View File

@@ -4,7 +4,9 @@ public enum EventType
{
FailedImportStrike,
StalledStrike,
SlowStrike,
DownloadingMetadataStrike,
SlowSpeedStrike,
SlowTimeStrike,
QueueItemDeleted,
DownloadCleaned,
CategoryChanged

View File

@@ -4,7 +4,7 @@ public enum StrikeType
{
Stalled,
DownloadingMetadata,
ImportFailed,
FailedImport,
SlowSpeed,
SlowTime,
}

View File

@@ -4,6 +4,10 @@ using System.Text.Json;
using Data;
using Data.Enums;
using Data.Models.Events;
using Infrastructure.Verticals.Notifications;
using Infrastructure.Verticals.Context;
using Infrastructure.Interceptors;
using Common.Attributes;
namespace Infrastructure.Events;
@@ -15,19 +19,25 @@ public class EventPublisher
private readonly DataContext _context;
private readonly IHubContext<EventHub> _hubContext;
private readonly ILogger<EventPublisher> _logger;
private readonly INotificationPublisher _notificationPublisher;
private readonly IDryRunInterceptor _dryRunInterceptor;
public EventPublisher(
DataContext context,
IHubContext<EventHub> hubContext,
ILogger<EventPublisher> logger)
ILogger<EventPublisher> logger,
INotificationPublisher notificationPublisher,
IDryRunInterceptor dryRunInterceptor)
{
_context = context;
_hubContext = hubContext;
_logger = logger;
_notificationPublisher = notificationPublisher;
_dryRunInterceptor = dryRunInterceptor;
}
/// <summary>
/// Publishes an event to database and SignalR clients
/// Generic method for publishing events to database and SignalR clients
/// </summary>
public async Task PublishAsync(EventType eventType, string message, EventSeverity severity, object? data = null, Guid? trackingId = null)
{
@@ -40,16 +50,108 @@ public class EventPublisher
TrackingId = trackingId
};
// Save to database
_context.Events.Add(eventEntity);
await _context.SaveChangesAsync();
// Save to database with dry run interception
await _dryRunInterceptor.InterceptAsync(SaveEventToDatabase, eventEntity);
// Send to SignalR clients
// Always send to SignalR clients (not affected by dry run)
await NotifyClientsAsync(eventEntity);
_logger.LogTrace("Published event: {eventType}", eventType);
}
/// <summary>
/// Publishes a strike event with context data and notifications
/// </summary>
public async Task PublishStrike(StrikeType strikeType, int strikeCount, string hash, string itemName)
{
// Determine the appropriate EventType based on StrikeType
EventType eventType = strikeType switch
{
StrikeType.Stalled => EventType.StalledStrike,
StrikeType.DownloadingMetadata => EventType.DownloadingMetadataStrike,
StrikeType.FailedImport => EventType.FailedImportStrike,
StrikeType.SlowSpeed => EventType.SlowSpeedStrike,
StrikeType.SlowTime => EventType.SlowTimeStrike,
};
// Publish the event
await PublishAsync(
eventType,
$"Item '{itemName}' has been struck {strikeCount} times for reason '{strikeType}'",
EventSeverity.Important,
data: new { hash, itemName, strikeCount, strikeType });
// Send notification (uses ContextProvider internally)
await _notificationPublisher.NotifyStrike(strikeType, strikeCount);
}
/// <summary>
/// Publishes a queue item deleted event with context data and notifications
/// </summary>
public async Task PublishQueueItemDeleted(bool removeFromClient, DeleteReason deleteReason)
{
// Get context data for the event
string downloadName = ContextProvider.Get<string>("downloadName") ?? "Unknown";
string hash = ContextProvider.Get<string>("hash") ?? "Unknown";
// Publish the event
await PublishAsync(
EventType.QueueItemDeleted,
$"Deleting item from queue with reason: {deleteReason}",
EventSeverity.Important,
data: new { downloadName, hash, removeFromClient, deleteReason });
// Send notification (uses ContextProvider internally)
await _notificationPublisher.NotifyQueueItemDeleted(removeFromClient, deleteReason);
}
/// <summary>
/// Publishes a download cleaned event with context data and notifications
/// </summary>
public async Task PublishDownloadCleaned(double ratio, TimeSpan seedingTime, string categoryName, CleanReason reason)
{
// Get context data for the event
string downloadName = ContextProvider.Get<string>("downloadName") ?? "Unknown";
string hash = ContextProvider.Get<string>("hash") ?? "Unknown";
// Publish the event
await PublishAsync(
EventType.DownloadCleaned,
$"Cleaned item from download client with reason: {reason}",
EventSeverity.Important,
data: new { downloadName, hash, categoryName, ratio, seedingTime = seedingTime.TotalHours, reason });
// Send notification (uses ContextProvider internally)
await _notificationPublisher.NotifyDownloadCleaned(ratio, seedingTime, categoryName, reason);
}
/// <summary>
/// Publishes a category changed event with context data and notifications
/// </summary>
public async Task PublishCategoryChanged(string oldCategory, string newCategory, bool isTag = false)
{
// Get context data for the event
string downloadName = ContextProvider.Get<string>("downloadName") ?? "Unknown";
string hash = ContextProvider.Get<string>("hash") ?? "Unknown";
// Publish the event
await PublishAsync(
EventType.CategoryChanged,
isTag ? $"Tag '{newCategory}' added to download" : $"Category changed from '{oldCategory}' to '{newCategory}'",
EventSeverity.Information,
data: new { downloadName, hash, oldCategory, newCategory, isTag });
// Send notification (uses ContextProvider internally)
await _notificationPublisher.NotifyCategoryChanged(oldCategory, newCategory, isTag);
}
[DryRunSafeguard]
private async Task SaveEventToDatabase(AppEvent eventEntity)
{
_context.Events.Add(eventEntity);
await _context.SaveChangesAsync();
}
private async Task NotifyClientsAsync(AppEvent appEventEntity)
{
try

View File

@@ -24,7 +24,7 @@ public class LoggingInitializer : BackgroundService
try
{
await _eventPublisher.PublishAsync(
EventType.SlowStrike,
EventType.SlowSpeedStrike,
"test",
EventSeverity.Important,
data: new { Hash = "hash", Name = "name", StrikeCount = "1", Type = "stalled" });

View File

@@ -114,7 +114,7 @@ public abstract class ArrClient : IArrClient
record.DownloadId,
record.Title,
maxStrikes,
StrikeType.ImportFailed
StrikeType.FailedImport
);
}

View File

@@ -8,6 +8,7 @@ using Common.CustomDataTypes;
using Common.Exceptions;
using Data.Enums;
using Data.Models.Deluge.Response;
using Infrastructure.Events;
using Infrastructure.Extensions;
using Infrastructure.Interceptors;
using Infrastructure.Verticals.ContentBlocker;
@@ -35,11 +36,12 @@ public class DelugeService : DownloadService, IDelugeService
INotificationPublisher notifier,
IDryRunInterceptor dryRunInterceptor,
IHardLinkFileService hardLinkFileService,
IDynamicHttpClientProvider httpClientProvider
IDynamicHttpClientProvider httpClientProvider,
EventPublisher eventPublisher
) : base(
logger, configManager, cache,
filenameEvaluator, striker, notifier, dryRunInterceptor, hardLinkFileService,
httpClientProvider
httpClientProvider, eventPublisher
)
{
// Client will be initialized when Initialize() is called with a specific client configuration
@@ -348,7 +350,7 @@ public class DelugeService : DownloadService, IDelugeService
download.Name
);
await _notifier.NotifyDownloadCleaned(download.Ratio, seedingTime, category.Name, result.Reason);
await _eventPublisher.PublishDownloadCleaned(download.Ratio, seedingTime, category.Name, result.Reason);
}
}
@@ -446,7 +448,7 @@ public class DelugeService : DownloadService, IDelugeService
_logger.LogInformation("category changed for {name}", download.Name);
await _notifier.NotifyCategoryChanged(download.Label, _downloadCleanerConfig.UnlinkedTargetCategory);
await _eventPublisher.PublishCategoryChanged(download.Label, _downloadCleanerConfig.UnlinkedTargetCategory);
download.Label = _downloadCleanerConfig.UnlinkedTargetCategory;
}

View File

@@ -9,6 +9,7 @@ using Common.Helpers;
using Data.Enums;
using Data.Models.Cache;
using Infrastructure.Configuration;
using Infrastructure.Events;
using Infrastructure.Helpers;
using Infrastructure.Http;
using Infrastructure.Interceptors;
@@ -34,6 +35,7 @@ public abstract class DownloadService : IDownloadService
protected readonly IDryRunInterceptor _dryRunInterceptor;
protected readonly IHardLinkFileService _hardLinkFileService;
protected readonly IDynamicHttpClientProvider _httpClientProvider;
protected readonly EventPublisher _eventPublisher;
protected readonly QueueCleanerConfig _queueCleanerConfig;
protected readonly ContentBlockerConfig _contentBlockerConfig;
protected readonly DownloadCleanerConfig _downloadCleanerConfig;
@@ -53,7 +55,8 @@ public abstract class DownloadService : IDownloadService
INotificationPublisher notifier,
IDryRunInterceptor dryRunInterceptor,
IHardLinkFileService hardLinkFileService,
IDynamicHttpClientProvider httpClientProvider
IDynamicHttpClientProvider httpClientProvider,
EventPublisher eventPublisher
)
{
_logger = logger;
@@ -65,6 +68,7 @@ public abstract class DownloadService : IDownloadService
_dryRunInterceptor = dryRunInterceptor;
_hardLinkFileService = hardLinkFileService;
_httpClientProvider = httpClientProvider;
_eventPublisher = eventPublisher;
_cacheOptions = new MemoryCacheEntryOptions()
.SetSlidingExpiration(StaticConfiguration.TriggerValue + Constants.CacheLimitBuffer);

View File

@@ -20,6 +20,7 @@ using Infrastructure.Verticals.Notifications;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
using QBittorrent.Client;
using Infrastructure.Events;
namespace Infrastructure.Verticals.DownloadClient.QBittorrent;
@@ -37,10 +38,11 @@ public class QBitService : DownloadService, IQBitService
INotificationPublisher notifier,
IDryRunInterceptor dryRunInterceptor,
IHardLinkFileService hardLinkFileService,
IDynamicHttpClientProvider httpClientProvider
IDynamicHttpClientProvider httpClientProvider,
EventPublisher eventPublisher
) : base(
logger, configManager, cache, filenameEvaluator, striker, notifier, dryRunInterceptor, hardLinkFileService,
httpClientProvider
httpClientProvider, eventPublisher
)
{
// Client will be initialized when Initialize() is called with a specific client configuration
@@ -394,7 +396,7 @@ public class QBitService : DownloadService, IQBitService
download.Name
);
await _notifier.NotifyDownloadCleaned(download.Ratio, download.SeedingTime ?? TimeSpan.Zero, category.Name, result.Reason);
await _eventPublisher.PublishDownloadCleaned(download.Ratio, download.SeedingTime ?? TimeSpan.Zero, category.Name, result.Reason);
}
}
@@ -517,7 +519,7 @@ public class QBitService : DownloadService, IQBitService
download.Category = _downloadCleanerConfig.UnlinkedTargetCategory;
}
await _notifier.NotifyCategoryChanged(download.Category, _downloadCleanerConfig.UnlinkedTargetCategory, _downloadCleanerConfig.UnlinkedUseTag);
await _eventPublisher.PublishCategoryChanged(download.Category, _downloadCleanerConfig.UnlinkedTargetCategory, _downloadCleanerConfig.UnlinkedUseTag);
}
}

View File

@@ -8,6 +8,7 @@ using Common.Configuration.QueueCleaner;
using Common.CustomDataTypes;
using Common.Helpers;
using Data.Enums;
using Infrastructure.Events;
using Infrastructure.Extensions;
using Infrastructure.Interceptors;
using Infrastructure.Verticals.ContentBlocker;
@@ -57,11 +58,12 @@ public class TransmissionService : DownloadService, ITransmissionService
INotificationPublisher notifier,
IDryRunInterceptor dryRunInterceptor,
IHardLinkFileService hardLinkFileService,
IDynamicHttpClientProvider httpClientProvider
IDynamicHttpClientProvider httpClientProvider,
EventPublisher eventPublisher
) : base(
logger, configManager, cache,
filenameEvaluator, striker, notifier, dryRunInterceptor, hardLinkFileService,
httpClientProvider
httpClientProvider, eventPublisher
)
{
// Client will be initialized when Initialize() is called with a specific client configuration
@@ -360,7 +362,7 @@ public class TransmissionService : DownloadService, ITransmissionService
download.Name
);
await _notifier.NotifyDownloadCleaned(download.uploadRatio ?? 0, seedingTime, category.Name, result.Reason);
await _eventPublisher.PublishDownloadCleaned(download.uploadRatio ?? 0, seedingTime, category.Name, result.Reason);
}
}
@@ -452,7 +454,7 @@ public class TransmissionService : DownloadService, ITransmissionService
_logger.LogInformation("category changed for {name}", download.Name);
await _notifier.NotifyCategoryChanged(currentCategory, _downloadCleanerConfig.UnlinkedTargetCategory);
await _eventPublisher.PublishCategoryChanged(currentCategory, _downloadCleanerConfig.UnlinkedTargetCategory);
download.DownloadDir = newLocation;
}

View File

@@ -3,12 +3,12 @@ using Common.Configuration.General;
using Data.Enums;
using Data.Models.Arr;
using Data.Models.Arr.Queue;
using Infrastructure.Events;
using Infrastructure.Helpers;
using Infrastructure.Verticals.Arr;
using Infrastructure.Verticals.Context;
using Infrastructure.Verticals.DownloadRemover.Interfaces;
using Infrastructure.Verticals.DownloadRemover.Models;
using Infrastructure.Verticals.Notifications;
using Microsoft.Extensions.Caching.Memory;
using Infrastructure.Configuration;
@@ -19,19 +19,19 @@ public sealed class QueueItemRemover : IQueueItemRemover
private readonly GeneralConfig _generalConfig;
private readonly IMemoryCache _cache;
private readonly ArrClientFactory _arrClientFactory;
private readonly INotificationPublisher _notifier;
private readonly EventPublisher _eventPublisher;
public QueueItemRemover(
IConfigManager configManager,
IMemoryCache cache,
ArrClientFactory arrClientFactory,
INotificationPublisher notifier
EventPublisher eventPublisher
)
{
_generalConfig = configManager.GetConfiguration<GeneralConfig>();
_cache = cache;
_arrClientFactory = arrClientFactory;
_notifier = notifier;
_eventPublisher = eventPublisher;
}
public async Task RemoveQueueItemAsync<T>(QueueItemRemoveRequest<T> request)
@@ -42,11 +42,15 @@ public sealed class QueueItemRemover : IQueueItemRemover
var arrClient = _arrClientFactory.GetClient(request.InstanceType);
await arrClient.DeleteQueueItemAsync(request.Instance, request.Record, request.RemoveFromClient, request.DeleteReason);
// push to context
// Set context for EventPublisher
ContextProvider.Set("downloadName", request.Record.Title);
ContextProvider.Set("hash", request.Record.DownloadId);
ContextProvider.Set(nameof(QueueRecord), request.Record);
ContextProvider.Set(nameof(ArrInstance) + nameof(ArrInstance.Url), request.Instance.Url);
ContextProvider.Set(nameof(InstanceType), request.InstanceType);
await _notifier.NotifyQueueItemDeleted(request.RemoveFromClient, request.DeleteReason);
// Use the new centralized EventPublisher method
await _eventPublisher.PublishQueueItemDeleted(request.RemoveFromClient, request.DeleteReason);
if (!_generalConfig.SearchEnabled)
{

View File

@@ -4,7 +4,6 @@ using Infrastructure.Events;
using Infrastructure.Helpers;
using Infrastructure.Interceptors;
using Infrastructure.Verticals.Context;
using Infrastructure.Verticals.Notifications;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
@@ -15,14 +14,12 @@ public sealed class Striker : IStriker
private readonly ILogger<Striker> _logger;
private readonly IMemoryCache _cache;
private readonly MemoryCacheEntryOptions _cacheOptions;
private readonly INotificationPublisher _notifier;
private readonly EventPublisher _eventPublisher;
public Striker(ILogger<Striker> logger, IMemoryCache cache, INotificationPublisher notifier, EventPublisher eventPublisher)
public Striker(ILogger<Striker> logger, IMemoryCache cache, EventPublisher eventPublisher)
{
_logger = logger;
_cache = cache;
_notifier = notifier;
_eventPublisher = eventPublisher;
_cacheOptions = new MemoryCacheEntryOptions()
.SetSlidingExpiration(StaticConfiguration.TriggerValue + Constants.CacheLimitBuffer);
@@ -48,12 +45,7 @@ public sealed class Striker : IStriker
_logger.LogInformation("item on strike number {strike} | reason {reason} | {name}", strikeCount, strikeType.ToString(), itemName);
await _notifier.NotifyStrike(strikeType, strikeCount);
await _eventPublisher.PublishAsync(
EventType.SlowStrike, // TODO
$"Item '{itemName}' has been struck {strikeCount} times for reason '{strikeType}'",
EventSeverity.Important,
data: new { hash, itemName, strikeCount, strikeType });
await _eventPublisher.PublishStrike(strikeType, strikeCount, hash, itemName);
_cache.Set(key, strikeCount, _cacheOptions);

View File

@@ -51,7 +51,7 @@ public class NotificationPublisher : INotificationPublisher
case StrikeType.DownloadingMetadata:
await NotifyInternal(notification.Adapt<StalledStrikeNotification>());
break;
case StrikeType.ImportFailed:
case StrikeType.FailedImport:
await NotifyInternal(notification.Adapt<FailedImportStrikeNotification>());
break;
case StrikeType.SlowSpeed: