added handling for items that are already downloading

This commit is contained in:
Flaminel
2026-03-24 02:10:38 +02:00
parent 34348aff0f
commit 45006aa8b7
2 changed files with 366 additions and 29 deletions

View File

@@ -1,3 +1,5 @@
using Cleanuparr.Domain.Entities.Arr;
using Cleanuparr.Domain.Entities.Arr.Queue;
using Cleanuparr.Domain.Enums;
using Cleanuparr.Infrastructure.Events.Interfaces;
using Cleanuparr.Infrastructure.Features.Arr.Interfaces;
@@ -78,6 +80,7 @@ public class SeekerTests : IDisposable
_radarrClient.Object,
_sonarrClient.Object,
_fixture.ArrClientFactory.Object,
_fixture.ArrQueueIterator.Object,
_fixture.EventPublisher.Object,
_dryRunInterceptor.Object,
_hostingEnvironment.Object,
@@ -255,10 +258,16 @@ public class SeekerTests : IDisposable
await _fixture.DataContext.SaveChangesAsync();
var mockArrClient = new Mock<IArrClient>();
// Current active downloads = 2, which meets the limit
mockArrClient
.Setup(x => x.GetActiveDownloadCountAsync(radarrInstance))
.ReturnsAsync(2);
// Return 2 queue items with SizeLeft > 0 (actively downloading), which meets the limit
QueueRecord[] activeDownloads =
[
new() { Id = 1, Title = "Download 1", DownloadId = "hash1", Protocol = "torrent", SizeLeft = 1000, MovieId = 10, TrackedDownloadState = "downloading" },
new() { Id = 2, Title = "Download 2", DownloadId = "hash2", Protocol = "torrent", SizeLeft = 2000, MovieId = 20, TrackedDownloadState = "downloading" }
];
_fixture.ArrQueueIterator
.Setup(x => x.Iterate(mockArrClient.Object, It.IsAny<ArrInstance>(), It.IsAny<Func<IReadOnlyList<QueueRecord>, Task>>()))
.Returns<IArrClient, ArrInstance, Func<IReadOnlyList<QueueRecord>, Task>>((_, _, action) => action(activeDownloads));
_fixture.ArrClientFactory
.Setup(x => x.GetClient(InstanceType.Radarr, It.IsAny<float>()))
@@ -284,5 +293,248 @@ public class SeekerTests : IDisposable
Times.Never);
}
[Fact]
public async Task ExecuteAsync_Radarr_ExcludesMoviesAlreadyInQueue()
{
// Arrange — proactive search enabled with 3 movies, one already in queue
var config = await _fixture.DataContext.SeekerConfigs.FirstAsync();
config.SearchEnabled = true;
config.ProactiveSearchEnabled = true;
await _fixture.DataContext.SaveChangesAsync();
var radarrInstance = TestDataContextFactory.AddRadarrInstance(_fixture.DataContext);
_fixture.DataContext.SeekerInstanceConfigs.Add(new SeekerInstanceConfig
{
ArrInstanceId = radarrInstance.Id,
ArrInstance = radarrInstance,
Enabled = true
});
await _fixture.DataContext.SaveChangesAsync();
var mockArrClient = new Mock<IArrClient>();
// Movie 2 is already in the download queue
QueueRecord[] queuedRecords =
[
new() { Id = 1, Title = "Movie 2 Download", DownloadId = "hash1", Protocol = "torrent", SizeLeft = 1000, MovieId = 2, TrackedDownloadState = "downloading" }
];
_fixture.ArrQueueIterator
.Setup(x => x.Iterate(mockArrClient.Object, It.IsAny<ArrInstance>(), It.IsAny<Func<IReadOnlyList<QueueRecord>, Task>>()))
.Returns<IArrClient, ArrInstance, Func<IReadOnlyList<QueueRecord>, Task>>((_, _, action) => action(queuedRecords));
_radarrClient
.Setup(x => x.GetAllMoviesAsync(radarrInstance))
.ReturnsAsync(
[
new SearchableMovie { Id = 1, Title = "Movie 1", Status = "released", Monitored = true, Tags = [] },
new SearchableMovie { Id = 2, Title = "Movie 2", Status = "released", Monitored = true, Tags = [] },
new SearchableMovie { Id = 3, Title = "Movie 3", Status = "released", Monitored = true, Tags = [] }
]);
HashSet<SearchItem>? capturedSearchItems = null;
mockArrClient
.Setup(x => x.SearchItemsAsync(radarrInstance, It.IsAny<HashSet<SearchItem>>()))
.Callback<ArrInstance, HashSet<SearchItem>>((_, items) => capturedSearchItems = items)
.ReturnsAsync([100L]);
_fixture.ArrClientFactory
.Setup(x => x.GetClient(InstanceType.Radarr, It.IsAny<float>()))
.Returns(mockArrClient.Object);
var sut = CreateSut();
// Act
await sut.ExecuteAsync();
// Assert — search was triggered, but NOT for movie 2
mockArrClient.Verify(
x => x.SearchItemsAsync(radarrInstance, It.IsAny<HashSet<SearchItem>>()),
Times.Once);
Assert.NotNull(capturedSearchItems);
Assert.DoesNotContain(capturedSearchItems, item => item.Id == 2);
}
[Fact]
public async Task ExecuteAsync_Radarr_DoesNotExcludeImportFailedItems()
{
// Arrange — movie in queue with importFailed state should still be searchable
var config = await _fixture.DataContext.SeekerConfigs.FirstAsync();
config.SearchEnabled = true;
config.ProactiveSearchEnabled = true;
await _fixture.DataContext.SaveChangesAsync();
var radarrInstance = TestDataContextFactory.AddRadarrInstance(_fixture.DataContext);
_fixture.DataContext.SeekerInstanceConfigs.Add(new SeekerInstanceConfig
{
ArrInstanceId = radarrInstance.Id,
ArrInstance = radarrInstance,
Enabled = true
});
await _fixture.DataContext.SaveChangesAsync();
var mockArrClient = new Mock<IArrClient>();
// Movie 1 is in queue but with importFailed state — should NOT be excluded
QueueRecord[] queuedRecords =
[
new() { Id = 1, Title = "Movie 1 Download", DownloadId = "hash1", Protocol = "torrent", SizeLeft = 0, MovieId = 1, TrackedDownloadState = "importFailed" }
];
_fixture.ArrQueueIterator
.Setup(x => x.Iterate(mockArrClient.Object, It.IsAny<ArrInstance>(), It.IsAny<Func<IReadOnlyList<QueueRecord>, Task>>()))
.Returns<IArrClient, ArrInstance, Func<IReadOnlyList<QueueRecord>, Task>>((_, _, action) => action(queuedRecords));
_radarrClient
.Setup(x => x.GetAllMoviesAsync(radarrInstance))
.ReturnsAsync(
[
new SearchableMovie { Id = 1, Title = "Movie 1", Status = "released", Monitored = true, Tags = [] }
]);
mockArrClient
.Setup(x => x.SearchItemsAsync(radarrInstance, It.IsAny<HashSet<SearchItem>>()))
.ReturnsAsync([100L]);
_fixture.ArrClientFactory
.Setup(x => x.GetClient(InstanceType.Radarr, It.IsAny<float>()))
.Returns(mockArrClient.Object);
var sut = CreateSut();
// Act
await sut.ExecuteAsync();
// Assert — search was triggered for movie 1 (importFailed does not exclude)
mockArrClient.Verify(
x => x.SearchItemsAsync(radarrInstance, It.IsAny<HashSet<SearchItem>>()),
Times.Once);
}
[Fact]
public async Task ExecuteAsync_Sonarr_ExcludesSeasonsAlreadyInQueue()
{
// Arrange — series with 2 seasons, season 1 in queue
var config = await _fixture.DataContext.SeekerConfigs.FirstAsync();
config.SearchEnabled = true;
config.ProactiveSearchEnabled = true;
await _fixture.DataContext.SaveChangesAsync();
var sonarrInstance = TestDataContextFactory.AddSonarrInstance(_fixture.DataContext);
_fixture.DataContext.SeekerInstanceConfigs.Add(new SeekerInstanceConfig
{
ArrInstanceId = sonarrInstance.Id,
ArrInstance = sonarrInstance,
Enabled = true
});
await _fixture.DataContext.SaveChangesAsync();
var mockArrClient = new Mock<IArrClient>();
// Season 1 of series 10 is in the queue
QueueRecord[] queuedRecords =
[
new() { Id = 1, Title = "Series Episode", DownloadId = "hash1", Protocol = "torrent", SizeLeft = 1000, SeriesId = 10, SeasonNumber = 1, TrackedDownloadState = "downloading" }
];
_fixture.ArrQueueIterator
.Setup(x => x.Iterate(mockArrClient.Object, It.IsAny<ArrInstance>(), It.IsAny<Func<IReadOnlyList<QueueRecord>, Task>>()))
.Returns<IArrClient, ArrInstance, Func<IReadOnlyList<QueueRecord>, Task>>((_, _, action) => action(queuedRecords));
_sonarrClient
.Setup(x => x.GetAllSeriesAsync(It.IsAny<ArrInstance>()))
.ReturnsAsync(
[
new SearchableSeries { Id = 10, Title = "Test Series", Status = "continuing", Monitored = true, Tags = [], Statistics = new SeriesStatistics { EpisodeCount = 20, EpisodeFileCount = 10 } }
]);
// Use dates relative to FakeTimeProvider (defaults to Jan 1, 2000)
var pastDate = _fixture.TimeProvider.GetUtcNow().UtcDateTime.AddDays(-30);
_sonarrClient
.Setup(x => x.GetEpisodesAsync(It.IsAny<ArrInstance>(), 10))
.ReturnsAsync(
[
new SearchableEpisode { Id = 100, SeasonNumber = 1, EpisodeNumber = 1, Monitored = true, AirDateUtc = pastDate, HasFile = false },
new SearchableEpisode { Id = 101, SeasonNumber = 2, EpisodeNumber = 1, Monitored = true, AirDateUtc = pastDate, HasFile = false }
]);
SeriesSearchItem? capturedSearchItem = null;
mockArrClient
.Setup(x => x.SearchItemsAsync(It.IsAny<ArrInstance>(), It.IsAny<HashSet<SearchItem>>()))
.Callback<ArrInstance, HashSet<SearchItem>>((_, items) => capturedSearchItem = items.OfType<SeriesSearchItem>().FirstOrDefault())
.ReturnsAsync([100L]);
_fixture.ArrClientFactory
.Setup(x => x.GetClient(InstanceType.Sonarr, It.IsAny<float>()))
.Returns(mockArrClient.Object);
var sut = CreateSut();
// Act
await sut.ExecuteAsync();
// Assert — season 2 was searched (season 1 excluded because it's in queue)
mockArrClient.Verify(
x => x.SearchItemsAsync(It.IsAny<ArrInstance>(), It.IsAny<HashSet<SearchItem>>()),
Times.Once);
Assert.NotNull(capturedSearchItem);
Assert.Equal(2, capturedSearchItem.Id); // Season 2
Assert.Equal(10, capturedSearchItem.SeriesId);
}
[Fact]
public async Task ExecuteAsync_QueueFetchFails_ProceedsWithoutFiltering()
{
// Arrange — queue fetch throws, but search should still proceed
var config = await _fixture.DataContext.SeekerConfigs.FirstAsync();
config.SearchEnabled = true;
config.ProactiveSearchEnabled = true;
await _fixture.DataContext.SaveChangesAsync();
var radarrInstance = TestDataContextFactory.AddRadarrInstance(_fixture.DataContext);
_fixture.DataContext.SeekerInstanceConfigs.Add(new SeekerInstanceConfig
{
ArrInstanceId = radarrInstance.Id,
ArrInstance = radarrInstance,
Enabled = true
});
await _fixture.DataContext.SaveChangesAsync();
var mockArrClient = new Mock<IArrClient>();
// Queue fetch fails
_fixture.ArrQueueIterator
.Setup(x => x.Iterate(mockArrClient.Object, It.IsAny<ArrInstance>(), It.IsAny<Func<IReadOnlyList<QueueRecord>, Task>>()))
.ThrowsAsync(new HttpRequestException("Connection refused"));
_radarrClient
.Setup(x => x.GetAllMoviesAsync(radarrInstance))
.ReturnsAsync(
[
new SearchableMovie { Id = 1, Title = "Movie 1", Status = "released", Monitored = true, Tags = [] }
]);
mockArrClient
.Setup(x => x.SearchItemsAsync(radarrInstance, It.IsAny<HashSet<SearchItem>>()))
.ReturnsAsync([100L]);
_fixture.ArrClientFactory
.Setup(x => x.GetClient(InstanceType.Radarr, It.IsAny<float>()))
.Returns(mockArrClient.Object);
var sut = CreateSut();
// Act
await sut.ExecuteAsync();
// Assert — search still proceeded despite queue fetch failure
mockArrClient.Verify(
x => x.SearchItemsAsync(radarrInstance, It.IsAny<HashSet<SearchItem>>()),
Times.Once);
}
#endregion
}

View File

@@ -1,4 +1,5 @@
using Cleanuparr.Domain.Entities.Arr;
using Cleanuparr.Domain.Entities.Arr.Queue;
using Cleanuparr.Domain.Enums;
using Cleanuparr.Infrastructure.Events.Interfaces;
using Cleanuparr.Infrastructure.Features.Arr.Interfaces;
@@ -22,11 +23,25 @@ public sealed class Seeker : IHandler
{
private const double JitterFactor = 0.7;
/// <summary>
/// Queue states that indicate an item is actively being processed.
/// Items in these states are excluded from proactive searches.
/// "importFailed" is intentionally excluded — failed imports should be re-searched.
/// </summary>
private static readonly HashSet<string> ActiveQueueStates = new(StringComparer.OrdinalIgnoreCase)
{
"downloading",
"importing",
"importPending",
"importBlocked"
};
private readonly ILogger<Seeker> _logger;
private readonly DataContext _dataContext;
private readonly IRadarrClient _radarrClient;
private readonly ISonarrClient _sonarrClient;
private readonly IArrClientFactory _arrClientFactory;
private readonly IArrQueueIterator _arrQueueIterator;
private readonly IEventPublisher _eventPublisher;
private readonly IDryRunInterceptor _dryRunInterceptor;
private readonly IHostingEnvironment _environment;
@@ -39,6 +54,7 @@ public sealed class Seeker : IHandler
IRadarrClient radarrClient,
ISonarrClient sonarrClient,
IArrClientFactory arrClientFactory,
IArrQueueIterator arrQueueIterator,
IEventPublisher eventPublisher,
IDryRunInterceptor dryRunInterceptor,
IHostingEnvironment environment,
@@ -50,6 +66,7 @@ public sealed class Seeker : IHandler
_radarrClient = radarrClient;
_sonarrClient = sonarrClient;
_arrClientFactory = arrClientFactory;
_arrQueueIterator = arrQueueIterator;
_eventPublisher = eventPublisher;
_dryRunInterceptor = dryRunInterceptor;
_environment = environment;
@@ -231,30 +248,40 @@ public sealed class Seeker : IHandler
ContextProvider.Set(nameof(InstanceType), instanceType);
ContextProvider.Set(ContextProvider.Keys.ArrInstanceUrl, arrInstance.ExternalUrl ?? arrInstance.Url);
// Fetch queue once for both active download limit check and queue cross-referencing
IArrClient arrClient = _arrClientFactory.GetClient(instanceType, arrInstance.Version);
List<QueueRecord> queueRecords = [];
try
{
await _arrQueueIterator.Iterate(arrClient, arrInstance, records =>
{
queueRecords.AddRange(records);
return Task.CompletedTask;
});
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to fetch queue for {InstanceName}, proceeding without queue cross-referencing",
arrInstance.Name);
}
// Check active download limit using the fetched queue data
if (instanceConfig.ActiveDownloadLimit > 0)
{
try
int activeDownloads = queueRecords.Count(r => r.SizeLeft > 0);
if (activeDownloads >= instanceConfig.ActiveDownloadLimit)
{
IArrClient arrClient = _arrClientFactory.GetClient(instanceType, arrInstance.Version);
int activeDownloads = await arrClient.GetActiveDownloadCountAsync(arrInstance);
if (activeDownloads >= instanceConfig.ActiveDownloadLimit)
{
_logger.LogInformation(
"Skipping proactive search for {InstanceName} — {Count} items actively downloading (limit: {Limit})",
arrInstance.Name, activeDownloads, instanceConfig.ActiveDownloadLimit);
return;
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to check active downloads for {InstanceName}, proceeding anyway",
arrInstance.Name);
_logger.LogInformation(
"Skipping proactive search for {InstanceName} — {Count} items actively downloading (limit: {Limit})",
arrInstance.Name, activeDownloads, instanceConfig.ActiveDownloadLimit);
return;
}
}
try
{
await ProcessInstanceAsync(config, instanceConfig, arrInstance, instanceType, isDryRun);
await ProcessInstanceAsync(config, instanceConfig, arrInstance, instanceType, isDryRun, queueRecords);
}
catch (Exception ex)
{
@@ -273,7 +300,8 @@ public sealed class Seeker : IHandler
SeekerInstanceConfig instanceConfig,
ArrInstance arrInstance,
InstanceType instanceType,
bool isDryRun)
bool isDryRun,
List<QueueRecord> queueRecords)
{
// Load search history for the current cycle
List<SeekerHistory> currentCycleHistory = await _dataContext.SeekerHistory
@@ -292,6 +320,21 @@ public sealed class Seeker : IHandler
.GroupBy(h => h.ExternalItemId)
.ToDictionary(g => g.Key, g => g.Max(h => h.LastSearchedAt));
// Build queued-item lookups from active queue records
var activeQueueRecords = queueRecords
.Where(r => ActiveQueueStates.Contains(r.TrackedDownloadState))
.ToList();
HashSet<long> queuedMovieIds = activeQueueRecords
.Where(r => r.MovieId > 0)
.Select(r => r.MovieId)
.ToHashSet();
HashSet<(long SeriesId, long SeasonNumber)> queuedSeasons = activeQueueRecords
.Where(r => r.SeriesId > 0)
.Select(r => (r.SeriesId, r.SeasonNumber))
.ToHashSet();
HashSet<SearchItem> searchItems;
List<string> selectedNames;
List<long> allLibraryIds;
@@ -301,14 +344,14 @@ public sealed class Seeker : IHandler
if (instanceType == InstanceType.Radarr)
{
List<long> selectedIds;
(selectedIds, selectedNames, allLibraryIds) = await ProcessRadarrAsync(config, arrInstance, instanceConfig, itemSearchHistory, isDryRun);
(selectedIds, selectedNames, allLibraryIds) = await ProcessRadarrAsync(config, arrInstance, instanceConfig, itemSearchHistory, isDryRun, queuedMovieIds);
searchItems = selectedIds.Select(id => new SearchItem { Id = id }).ToHashSet();
historyIds = selectedIds;
}
else
{
(searchItems, selectedNames, allLibraryIds, historyIds, seasonNumber) =
await ProcessSonarrAsync(config, arrInstance, instanceConfig, itemSearchHistory, currentCycleHistory, isDryRun);
await ProcessSonarrAsync(config, arrInstance, instanceConfig, itemSearchHistory, currentCycleHistory, isDryRun, queuedSeasons: queuedSeasons);
}
IEnumerable<long> historyExternalIds = allHistory.Select(h => h.ExternalItemId);
@@ -354,7 +397,8 @@ public sealed class Seeker : IHandler
ArrInstance arrInstance,
SeekerInstanceConfig instanceConfig,
Dictionary<long, DateTime> searchHistory,
bool isDryRun)
bool isDryRun,
HashSet<long> queuedMovieIds)
{
List<SearchableMovie> movies = await _radarrClient.GetAllMoviesAsync(arrInstance);
List<long> allLibraryIds = movies.Select(m => m.Id).ToList();
@@ -388,6 +432,27 @@ public sealed class Seeker : IHandler
return ([], [], allLibraryIds);
}
// Exclude movies already in the download queue
if (queuedMovieIds.Count > 0)
{
int beforeCount = candidates.Count;
candidates = candidates
.Where(m => !queuedMovieIds.Contains(m.Id))
.ToList();
int skipped = beforeCount - candidates.Count;
if (skipped > 0)
{
_logger.LogDebug("Excluded {Count} movies already in queue on {InstanceName}",
skipped, arrInstance.Name);
}
if (candidates.Count == 0)
{
return ([], [], allLibraryIds);
}
}
// Check for cycle completion: all candidates already searched in current cycle
bool cycleComplete = candidates.All(m => searchHistory.ContainsKey(m.Id));
if (cycleComplete)
@@ -439,7 +504,8 @@ public sealed class Seeker : IHandler
Dictionary<long, DateTime> seriesSearchHistory,
List<SeekerHistory> currentCycleHistory,
bool isDryRun,
bool isRetry = false)
bool isRetry = false,
HashSet<(long SeriesId, long SeasonNumber)>? queuedSeasons = null)
{
List<SearchableSeries> series = await _sonarrClient.GetAllSeriesAsync(arrInstance);
List<long> allLibraryIds = series.Select(s => s.Id).ToList();
@@ -486,7 +552,7 @@ public sealed class Seeker : IHandler
seriesTitle = candidates.First(s => s.Id == seriesId).Title;
(SeriesSearchItem? searchItem, SearchableEpisode? selectedEpisode) =
await BuildSonarrSearchItemAsync(config, arrInstance, seriesId, seriesHistory, seriesTitle);
await BuildSonarrSearchItemAsync(config, arrInstance, seriesId, seriesHistory, seriesTitle, queuedSeasons);
if (searchItem is not null)
{
@@ -518,7 +584,7 @@ public sealed class Seeker : IHandler
// Retry with fresh cycle (only once to prevent infinite recursion)
return await ProcessSonarrAsync(config, arrInstance, instanceConfig,
new Dictionary<long, DateTime>(), [], isDryRun, isRetry: true);
new Dictionary<long, DateTime>(), [], isDryRun, isRetry: true, queuedSeasons: queuedSeasons);
}
return ([], [], allLibraryIds, [], 0);
@@ -533,7 +599,8 @@ public sealed class Seeker : IHandler
ArrInstance arrInstance,
long seriesId,
List<SeekerHistory> seriesHistory,
string seriesTitle)
string seriesTitle,
HashSet<(long SeriesId, long SeasonNumber)>? queuedSeasons = null)
{
List<SearchableEpisode> episodes = await _sonarrClient.GetEpisodesAsync(arrInstance, seriesId);
@@ -592,9 +659,26 @@ public sealed class Seeker : IHandler
// Find unsearched seasons first
var unsearched = seasonGroups.Where(s => s.LastSearched is null).ToList();
// Exclude seasons already in the download queue
if (queuedSeasons is { Count: > 0 })
{
int beforeCount = unsearched.Count;
unsearched = unsearched
.Where(s => !queuedSeasons.Contains((seriesId, (long)s.SeasonNumber)))
.ToList();
int skipped = beforeCount - unsearched.Count;
if (skipped > 0)
{
_logger.LogDebug("Excluded {Count} seasons already in queue for '{SeriesTitle}' on {InstanceName}",
skipped, seriesTitle, arrInstance.Name);
}
}
if (unsearched.Count == 0)
{
// All seasons searched in current cycle — this series is done
// All unsearched seasons are either searched or in the queue
return (null, null);
}
@@ -772,4 +856,5 @@ public sealed class Seeker : IHandler
deleted, arrInstanceId);
}
}
}