From 62fc39251b7dd90ab0df5b180366b9d51aaad53a Mon Sep 17 00:00:00 2001 From: Flaminel Date: Wed, 28 May 2025 19:01:54 +0300 Subject: [PATCH] added unified signalr hub --- code/Executable/DependencyInjection/ApiDI.cs | 6 + code/Infrastructure/Events/EventPublisher.cs | 21 +- code/Infrastructure/Hubs/AppHub.cs | 81 ++++++ code/Infrastructure/Logging/SignalRLogSink.cs | 16 +- .../src/app/core/services/app-hub.service.ts | 254 ++++++++++++++++++ .../dashboard-page.component.html | 16 +- .../dashboard-page.component.ts | 55 ++-- .../events-viewer/events-viewer.component.ts | 16 +- .../logs-viewer/logs-viewer.component.ts | 16 +- 9 files changed, 406 insertions(+), 75 deletions(-) create mode 100644 code/Infrastructure/Hubs/AppHub.cs create mode 100644 code/UI/src/app/core/services/app-hub.service.ts diff --git a/code/Executable/DependencyInjection/ApiDI.cs b/code/Executable/DependencyInjection/ApiDI.cs index 896d17ac..e3df4dd9 100644 --- a/code/Executable/DependencyInjection/ApiDI.cs +++ b/code/Executable/DependencyInjection/ApiDI.cs @@ -2,6 +2,7 @@ using System.Text.Json.Serialization; using Infrastructure.Health; using Infrastructure.Logging; using Infrastructure.Events; +using Infrastructure.Hubs; using Microsoft.OpenApi.Models; namespace Executable.DependencyInjection; @@ -73,8 +74,13 @@ public static class ApiDI // Map SignalR hubs app.MapHub("/api/hubs/health"); + + // Legacy hubs (for backward compatibility) app.MapHub("/api/hubs/logs"); app.MapHub("/api/hubs/events"); + + // New unified hub + app.MapHub("/api/hubs/app"); return app; } diff --git a/code/Infrastructure/Events/EventPublisher.cs b/code/Infrastructure/Events/EventPublisher.cs index c7190b0e..221985b6 100644 --- a/code/Infrastructure/Events/EventPublisher.cs +++ b/code/Infrastructure/Events/EventPublisher.cs @@ -8,6 +8,7 @@ using Infrastructure.Verticals.Notifications; using Infrastructure.Verticals.Context; using Infrastructure.Interceptors; using Common.Attributes; +using Infrastructure.Hubs; namespace Infrastructure.Events; @@ -17,20 +18,23 @@ namespace Infrastructure.Events; public class EventPublisher { private readonly DataContext _context; - private readonly IHubContext _hubContext; + private readonly IHubContext _eventHubContext; + private readonly IHubContext _appHubContext; private readonly ILogger _logger; private readonly INotificationPublisher _notificationPublisher; private readonly IDryRunInterceptor _dryRunInterceptor; public EventPublisher( DataContext context, - IHubContext hubContext, + IHubContext eventHubContext, + IHubContext appHubContext, ILogger logger, INotificationPublisher notificationPublisher, IDryRunInterceptor dryRunInterceptor) { _context = context; - _hubContext = hubContext; + _eventHubContext = eventHubContext; + _appHubContext = appHubContext; _logger = logger; _notificationPublisher = notificationPublisher; _dryRunInterceptor = dryRunInterceptor; @@ -111,8 +115,8 @@ public class EventPublisher public async Task PublishDownloadCleaned(double ratio, TimeSpan seedingTime, string categoryName, CleanReason reason) { // Get context data for the event - string downloadName = ContextProvider.Get("downloadName") ?? "Unknown"; - string hash = ContextProvider.Get("hash") ?? "Unknown"; + string downloadName = ContextProvider.Get("downloadName"); + string hash = ContextProvider.Get("hash"); // Publish the event await PublishAsync( @@ -156,8 +160,11 @@ public class EventPublisher { try { - // Send to all connected clients (self-hosted app with single client) - await _hubContext.Clients.All.SendAsync("EventReceived", appEventEntity); + // Send to all connected clients via the legacy EventHub + await _eventHubContext.Clients.All.SendAsync("EventReceived", appEventEntity); + + // Send to all connected clients via the new unified AppHub + await _appHubContext.Clients.All.SendAsync("EventReceived", appEventEntity); } catch (Exception ex) { diff --git a/code/Infrastructure/Hubs/AppHub.cs b/code/Infrastructure/Hubs/AppHub.cs new file mode 100644 index 00000000..51a388bb --- /dev/null +++ b/code/Infrastructure/Hubs/AppHub.cs @@ -0,0 +1,81 @@ +using Data; +using Data.Models.Events; +using Infrastructure.Logging; +using Microsoft.AspNetCore.SignalR; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; + +namespace Infrastructure.Hubs; + +/// +/// Unified SignalR hub for logs and events +/// +public class AppHub : Hub +{ + private readonly DataContext _context; + private readonly ILogger _logger; + private readonly SignalRLogSink _logSink; + + public AppHub(DataContext context, ILogger logger, SignalRLogSink logSink) + { + _context = context; + _logger = logger; + _logSink = logSink; + } + + /// + /// Client requests recent logs + /// + public async Task GetRecentLogs() + { + try + { + var logs = _logSink.GetRecentLogs(); + await Clients.Caller.SendAsync("LogsReceived", logs); + _logger.LogDebug("Sent {count} recent logs to client {connectionId}", logs.Count(), Context.ConnectionId); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to send recent logs to client {connectionId}", Context.ConnectionId); + } + } + + /// + /// Client requests recent events + /// + public async Task GetRecentEvents(int count = 50) + { + try + { + var events = await _context.Events + .OrderByDescending(e => e.Timestamp) + .Take(Math.Min(count, 100)) // Cap at 100 + .ToListAsync(); + + await Clients.Caller.SendAsync("EventsReceived", events); + _logger.LogDebug("Sent {count} recent events to client {connectionId}", events.Count, Context.ConnectionId); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to send recent events to client {connectionId}", Context.ConnectionId); + } + } + + /// + /// Client connection established + /// + public override async Task OnConnectedAsync() + { + _logger.LogInformation("Client connected to AppHub: {ConnectionId}", Context.ConnectionId); + await base.OnConnectedAsync(); + } + + /// + /// Client disconnected + /// + public override async Task OnDisconnectedAsync(Exception? exception) + { + _logger.LogInformation("Client disconnected from AppHub: {ConnectionId}", Context.ConnectionId); + await base.OnDisconnectedAsync(exception); + } +} diff --git a/code/Infrastructure/Logging/SignalRLogSink.cs b/code/Infrastructure/Logging/SignalRLogSink.cs index f4de35dd..2a0bd7e1 100644 --- a/code/Infrastructure/Logging/SignalRLogSink.cs +++ b/code/Infrastructure/Logging/SignalRLogSink.cs @@ -1,5 +1,6 @@ using System.Collections.Concurrent; using System.Globalization; +using Infrastructure.Hubs; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Logging; using Serilog.Core; @@ -16,12 +17,14 @@ public class SignalRLogSink : ILogEventSink private readonly ILogger _logger; private readonly ConcurrentQueue _logBuffer; private readonly int _bufferSize; - private readonly IHubContext _hubContext; + private readonly IHubContext _logHubContext; + private readonly IHubContext _appHubContext; private readonly MessageTemplateTextFormatter _formatter = new("{Message:l}", CultureInfo.InvariantCulture); - public SignalRLogSink(ILogger logger, IHubContext hubContext) + public SignalRLogSink(ILogger logger, IHubContext logHubContext, IHubContext appHubContext) { - _hubContext = hubContext; + _logHubContext = logHubContext; + _appHubContext = appHubContext; _logger = logger; _bufferSize = 100; _logBuffer = new ConcurrentQueue(); @@ -50,8 +53,11 @@ public class SignalRLogSink : ILogEventSink // Add to buffer for new clients AddToBuffer(logData); - // Send to connected clients - _ = _hubContext.Clients.All.SendAsync("ReceiveLog", logData); + // Send to connected clients (legacy hub) + _ = _logHubContext.Clients.All.SendAsync("ReceiveLog", logData); + + // Send to connected clients (new unified hub) + _ = _appHubContext.Clients.All.SendAsync("LogReceived", logData); } catch (Exception ex) { diff --git a/code/UI/src/app/core/services/app-hub.service.ts b/code/UI/src/app/core/services/app-hub.service.ts new file mode 100644 index 00000000..b0c573d6 --- /dev/null +++ b/code/UI/src/app/core/services/app-hub.service.ts @@ -0,0 +1,254 @@ +import { Injectable } from '@angular/core'; +import { BehaviorSubject, Observable } from 'rxjs'; +import * as signalR from '@microsoft/signalr'; +import { SignalRHubConfig } from '../models/signalr.models'; +import { environment } from '../../../environments/environment'; +import { LogEntry } from '../models/signalr.models'; +import { AppEvent } from '../models/event.models'; + +/** + * Unified SignalR hub service + */ +@Injectable({ + providedIn: 'root' +}) +export class AppHubService { + private hubConnection!: signalR.HubConnection; + private connectionStatusSubject = new BehaviorSubject(false); + private logsSubject = new BehaviorSubject([]); + private eventsSubject = new BehaviorSubject([]); + + private logBuffer: LogEntry[] = []; + private eventBuffer: AppEvent[] = []; + private readonly bufferSize = 1000; + + constructor() { } + + /** + * Start the SignalR connection + */ + public startConnection(): Promise { + if (this.hubConnection && + this.hubConnection.state !== signalR.HubConnectionState.Disconnected) { + return Promise.resolve(); + } + + // Build a new connection + this.hubConnection = new signalR.HubConnectionBuilder() + .withUrl(`${environment.apiUrl}/api/hubs/app`) + .withAutomaticReconnect({ + nextRetryDelayInMilliseconds: (retryContext) => { + // Implement exponential backoff with max 30 seconds + return Math.min(2000 * Math.pow(2, retryContext.previousRetryCount), 30000); + } + }) + .build(); + + this.registerSignalREvents(); + + return this.hubConnection.start() + .then(() => { + console.log('AppHub connection started'); + this.connectionStatusSubject.next(true); + this.requestInitialData(); + }) + .catch(err => { + console.error('Error connecting to AppHub:', err); + this.connectionStatusSubject.next(false); + throw err; + }); + } + + /** + * Register SignalR event handlers + */ + private registerSignalREvents(): void { + // Handle connection events + this.hubConnection.onreconnected(() => { + console.log('AppHub reconnected'); + this.connectionStatusSubject.next(true); + this.requestInitialData(); + }); + + this.hubConnection.onreconnecting(() => { + console.log('AppHub reconnecting...'); + this.connectionStatusSubject.next(false); + }); + + this.hubConnection.onclose(() => { + console.log('AppHub connection closed'); + this.connectionStatusSubject.next(false); + }); + + // Handle individual log messages + this.hubConnection.on('LogReceived', (log: LogEntry) => { + this.addLogToBuffer(log); + const currentLogs = this.logsSubject.value; + this.logsSubject.next([...currentLogs, log]); + }); + + // Handle bulk log messages (initial load) + this.hubConnection.on('LogsReceived', (logs: LogEntry[]) => { + if (logs && logs.length > 0) { + // Set all logs at once + this.logsSubject.next(logs); + // Update buffer + this.logBuffer = [...logs]; + this.trimBuffer(this.logBuffer, this.bufferSize); + } + }); + + // Handle individual event messages + this.hubConnection.on('EventReceived', (event: AppEvent) => { + this.addEventToBuffer(event); + const currentEvents = this.eventsSubject.value; + this.eventsSubject.next([...currentEvents, event]); + }); + + // Handle bulk event messages (initial load) + this.hubConnection.on('EventsReceived', (events: AppEvent[]) => { + if (events && events.length > 0) { + // Set all events at once + this.eventsSubject.next(events); + // Update buffer + this.eventBuffer = [...events]; + this.trimBuffer(this.eventBuffer, this.bufferSize); + } + }); + } + + /** + * Request initial data from the server + */ + private requestInitialData(): void { + this.requestRecentLogs(); + this.requestRecentEvents(); + } + + /** + * Request recent logs from the server + */ + public requestRecentLogs(): void { + if (this.isConnected()) { + this.hubConnection.invoke('GetRecentLogs') + .catch(err => console.error('Error requesting recent logs:', err)); + } + } + + /** + * Request recent events from the server + */ + public requestRecentEvents(count: number = 100): void { + if (this.isConnected()) { + this.hubConnection.invoke('GetRecentEvents', count) + .catch(err => console.error('Error requesting recent events:', err)); + } + } + + /** + * Check if the connection is established + */ + private isConnected(): boolean { + return this.hubConnection && + this.hubConnection.state === signalR.HubConnectionState.Connected; + } + + /** + * Stop the SignalR connection + */ + public stopConnection(): Promise { + if (!this.hubConnection) { + return Promise.resolve(); + } + + return this.hubConnection.stop() + .then(() => { + console.log('AppHub connection stopped'); + this.connectionStatusSubject.next(false); + }) + .catch(err => { + console.error('Error stopping AppHub connection:', err); + throw err; + }); + } + + /** + * Add a log to the buffer + */ + private addLogToBuffer(log: LogEntry): void { + this.logBuffer.push(log); + this.trimBuffer(this.logBuffer, this.bufferSize); + } + + /** + * Add an event to the buffer + */ + private addEventToBuffer(event: AppEvent): void { + this.eventBuffer.push(event); + this.trimBuffer(this.eventBuffer, this.bufferSize); + } + + /** + * Trim a buffer to the specified size + */ + private trimBuffer(buffer: T[], maxSize: number): void { + while (buffer.length > maxSize) { + buffer.shift(); + } + } + + // PUBLIC API METHODS + + /** + * Get logs as an observable + */ + public getLogs(): Observable { + return this.logsSubject.asObservable(); + } + + /** + * Get events as an observable + */ + public getEvents(): Observable { + return this.eventsSubject.asObservable(); + } + + /** + * Get connection status as an observable + */ + public getConnectionStatus(): Observable { + return this.connectionStatusSubject.asObservable(); + } + + /** + * Get logs connection status as an observable + * For backward compatibility with components expecting separate connection statuses + */ + public getLogsConnectionStatus(): Observable { + return this.connectionStatusSubject.asObservable(); + } + + /** + * Get events connection status as an observable + * For backward compatibility with components expecting separate connection statuses + */ + public getEventsConnectionStatus(): Observable { + return this.connectionStatusSubject.asObservable(); + } + + /** + * Clear events + */ + public clearEvents(): void { + this.eventsSubject.next([]); + this.eventBuffer = []; + } + + /** + * Clear logs + */ + public clearLogs(): void { + this.logsSubject.next([]); + this.logBuffer = []; + } +} diff --git a/code/UI/src/app/dashboard/dashboard-page/dashboard-page.component.html b/code/UI/src/app/dashboard/dashboard-page/dashboard-page.component.html index 53a58fe2..7ca89ae3 100644 --- a/code/UI/src/app/dashboard/dashboard-page/dashboard-page.component.html +++ b/code/UI/src/app/dashboard/dashboard-page/dashboard-page.component.html @@ -13,9 +13,9 @@

Recent Logs

@@ -60,7 +60,7 @@

No recent logs available

- +
@@ -79,9 +79,9 @@

Recent Events

@@ -126,7 +126,7 @@

No recent events available

- +
diff --git a/code/UI/src/app/dashboard/dashboard-page/dashboard-page.component.ts b/code/UI/src/app/dashboard/dashboard-page/dashboard-page.component.ts index b4015dfa..7e785d01 100644 --- a/code/UI/src/app/dashboard/dashboard-page/dashboard-page.component.ts +++ b/code/UI/src/app/dashboard/dashboard-page/dashboard-page.component.ts @@ -1,7 +1,7 @@ import { Component, OnInit, OnDestroy, signal, computed, inject } from '@angular/core'; import { CommonModule, NgClass, DatePipe } from '@angular/common'; import { RouterLink } from '@angular/router'; -import { Subject, takeUntil, throttleTime } from 'rxjs'; +import { Subject, takeUntil } from 'rxjs'; // PrimeNG Components import { CardModule } from 'primeng/card'; @@ -11,8 +11,7 @@ import { TooltipModule } from 'primeng/tooltip'; import { ProgressSpinnerModule } from 'primeng/progressspinner'; // Services & Models -import { LogHubService } from '../../core/services/log-hub.service'; -import { EventHubService } from '../../core/services/event-hub.service'; +import { AppHubService } from '../../core/services/app-hub.service'; import { LogEntry } from '../../core/models/signalr.models'; import { AppEvent } from '../../core/models/event.models'; @@ -34,15 +33,13 @@ import { AppEvent } from '../../core/models/event.models'; styleUrl: './dashboard-page.component.scss' }) export class DashboardPageComponent implements OnInit, OnDestroy { - private logHubService = inject(LogHubService); - private eventHubService = inject(EventHubService); + private appHubService = inject(AppHubService); private destroy$ = new Subject(); // Signals for reactive state recentLogs = signal([]); recentEvents = signal([]); - logsConnected = signal(false); - eventsConnected = signal(false); + connected = signal(false); // Computed values for display displayLogs = computed(() => { @@ -58,8 +55,7 @@ export class DashboardPageComponent implements OnInit, OnDestroy { }); ngOnInit() { - this.initializeLogHub(); - this.initializeEventHub(); + this.initializeHub(); } ngOnDestroy(): void { @@ -67,49 +63,30 @@ export class DashboardPageComponent implements OnInit, OnDestroy { this.destroy$.complete(); } - private initializeLogHub(): void { - // Connect to logs hub - this.logHubService.startConnection() - .catch((error: Error) => console.error('Failed to connect to log hub:', error)); + private initializeHub(): void { + // Connect to unified hub + this.appHubService.startConnection() + .catch((error: Error) => console.error('Failed to connect to app hub:', error)); - // Subscribe to logs with throttling to prevent UI overwhelming - this.logHubService.getLogs() - .pipe( - takeUntil(this.destroy$), - throttleTime(1000) // Max 1 update per second - ) + // Subscribe to logs + this.appHubService.getLogs() + .pipe(takeUntil(this.destroy$)) .subscribe((logs: LogEntry[]) => { this.recentLogs.set(logs); }); - // Subscribe to connection status - this.logHubService.getConnectionStatus() + // Subscribe to events + this.appHubService.getEvents() .pipe(takeUntil(this.destroy$)) - .subscribe((status: boolean) => { - this.logsConnected.set(status); - }); - } - - private initializeEventHub(): void { - // Connect to events hub - this.eventHubService.startConnection() - .catch((error: Error) => console.error('Failed to connect to event hub:', error)); - - // Subscribe to events with throttling - this.eventHubService.getEvents() - .pipe( - takeUntil(this.destroy$), - throttleTime(1000) // Max 1 update per second - ) .subscribe((events: AppEvent[]) => { this.recentEvents.set(events); }); // Subscribe to connection status - this.eventHubService.getConnectionStatus() + this.appHubService.getConnectionStatus() .pipe(takeUntil(this.destroy$)) .subscribe((status: boolean) => { - this.eventsConnected.set(status); + this.connected.set(status); }); } diff --git a/code/UI/src/app/events/events-viewer/events-viewer.component.ts b/code/UI/src/app/events/events-viewer/events-viewer.component.ts index f1fa2475..b733a987 100644 --- a/code/UI/src/app/events/events-viewer/events-viewer.component.ts +++ b/code/UI/src/app/events/events-viewer/events-viewer.component.ts @@ -19,7 +19,7 @@ import { MenuModule } from 'primeng/menu'; import { MenuItem } from 'primeng/api'; // Services & Models -import { EventHubService } from '../../core/services/event-hub.service'; +import { AppHubService } from '../../core/services/app-hub.service'; import { AppEvent } from '../../core/models/event.models'; @Component({ @@ -42,12 +42,12 @@ import { AppEvent } from '../../core/models/event.models'; MenuModule, InputSwitchModule ], - providers: [EventHubService], + providers: [AppHubService], templateUrl: './events-viewer.component.html', styleUrl: './events-viewer.component.scss' }) export class EventsViewerComponent implements OnInit, OnDestroy { - private eventHubService = inject(EventHubService); + private appHubService = inject(AppHubService); private destroy$ = new Subject(); private clipboard = inject(Clipboard); private search$ = new Subject(); @@ -111,11 +111,11 @@ export class EventsViewerComponent implements OnInit, OnDestroy { ngOnInit(): void { // Connect to SignalR hub - this.eventHubService.startConnection() - .catch((error: Error) => console.error('Failed to connect to event hub:', error)); + this.appHubService.startConnection() + .catch((error: Error) => console.error('Failed to connect to app hub:', error)); // Subscribe to events - this.eventHubService.getEvents() + this.appHubService.getEvents() .pipe(takeUntil(this.destroy$)) .subscribe((events: AppEvent[]) => { this.events.set(events); @@ -125,7 +125,7 @@ export class EventsViewerComponent implements OnInit, OnDestroy { }); // Subscribe to connection status - this.eventHubService.getConnectionStatus() + this.appHubService.getEventsConnectionStatus() .pipe(takeUntil(this.destroy$)) .subscribe((status: boolean) => { this.isConnected.set(status); @@ -193,7 +193,7 @@ export class EventsViewerComponent implements OnInit, OnDestroy { } refresh(): void { - this.eventHubService.requestRecentEvents(); + this.appHubService.requestRecentEvents(); } hasDataInfo(): boolean { diff --git a/code/UI/src/app/logging/logs-viewer/logs-viewer.component.ts b/code/UI/src/app/logging/logs-viewer/logs-viewer.component.ts index e313d17d..a507266b 100644 --- a/code/UI/src/app/logging/logs-viewer/logs-viewer.component.ts +++ b/code/UI/src/app/logging/logs-viewer/logs-viewer.component.ts @@ -17,7 +17,7 @@ import { ProgressSpinnerModule } from 'primeng/progressspinner'; import { InputSwitchModule } from 'primeng/inputswitch'; // Services & Models -import { LogHubService } from '../../core/services/log-hub.service'; +import { AppHubService } from '../../core/services/app-hub.service'; import { LogEntry } from '../../core/models/signalr.models'; import { MenuModule } from 'primeng/menu'; import { MenuItem } from 'primeng/api'; @@ -43,12 +43,12 @@ import { MenuItem } from 'primeng/api'; MenuModule, InputSwitchModule ], - providers: [LogHubService], + providers: [AppHubService], templateUrl: './logs-viewer.component.html', styleUrl: './logs-viewer.component.scss' }) export class LogsViewerComponent implements OnInit, OnDestroy { - private logHubService = inject(LogHubService); + private appHubService = inject(AppHubService); private destroy$ = new Subject(); private clipboard = inject(Clipboard); private search$ = new Subject(); @@ -110,11 +110,11 @@ export class LogsViewerComponent implements OnInit, OnDestroy { ngOnInit(): void { // Connect to SignalR hub - this.logHubService.startConnection() - .catch((error: Error) => console.error('Failed to connect to log hub:', error)); + this.appHubService.startConnection() + .catch((error: Error) => console.error('Failed to connect to app hub:', error)); // Subscribe to logs - this.logHubService.getLogs() + this.appHubService.getLogs() .pipe(takeUntil(this.destroy$)) .subscribe((logs: LogEntry[]) => { this.logs.set(logs); @@ -124,7 +124,7 @@ export class LogsViewerComponent implements OnInit, OnDestroy { }); // Subscribe to connection status - this.logHubService.getConnectionStatus() + this.appHubService.getLogsConnectionStatus() .pipe(takeUntil(this.destroy$)) .subscribe((status: boolean) => { this.isConnected.set(status); @@ -195,7 +195,7 @@ export class LogsViewerComponent implements OnInit, OnDestroy { } refresh(): void { - this.logHubService.requestRecentLogs(); + this.appHubService.requestRecentLogs(); } hasJobInfo(): boolean {