From e65a1356761abe8f72fbf608f56f650e30a5eb1c Mon Sep 17 00:00:00 2001 From: Nico <47644445+nicotsx@users.noreply.github.com> Date: Tue, 5 May 2026 19:34:10 +0200 Subject: [PATCH] feat(agents): create agent registry and service (#849) * feat(agents): create agent registry and service * fix: mark agent offline only if the session was removed properly * refactor: centralize agent backup lifecycle state * refactor: simplify session management * refactor: move effect / async boundary in one place * chore: regen migration * refactor: improve error handling * chore: pr feedback --- .../migration.sql | 16 + .../snapshot.json | 2521 +++++++++++++++++ app/server/db/relations.ts | 8 + app/server/db/schema.ts | 30 + .../__tests__/agents-manager.backups.test.ts | 5 +- .../__tests__/agents-manager.events.test.ts | 235 ++ .../agents/__tests__/agents.service.test.ts | 72 + .../__tests__/controller-runtime.test.ts | 280 ++ .../modules/agents/__tests__/session.test.ts | 208 +- app/server/modules/agents/agents-manager.ts | 154 +- app/server/modules/agents/agents.service.ts | 129 + app/server/modules/agents/constants.ts | 6 + .../modules/agents/controller/server.ts | 375 +-- .../modules/agents/controller/session.ts | 173 +- .../modules/agents/helpers/runtime-state.ts | 1 + app/server/modules/agents/helpers/tokens.ts | 3 +- app/server/modules/backups/backup-executor.ts | 3 +- app/server/modules/lifecycle/bootstrap.ts | 2 + app/test/setup-shared.ts | 7 + apps/agent/src/commands/backup-run.ts | 2 +- packages/core/src/node/logger.ts | 7 + packages/core/test/setup.ts | 7 + 22 files changed, 3780 insertions(+), 464 deletions(-) create mode 100644 app/drizzle/20260505165117_early_purple_man/migration.sql create mode 100644 app/drizzle/20260505165117_early_purple_man/snapshot.json create mode 100644 app/server/modules/agents/__tests__/agents-manager.events.test.ts create mode 100644 app/server/modules/agents/__tests__/agents.service.test.ts create mode 100644 app/server/modules/agents/__tests__/controller-runtime.test.ts create mode 100644 app/server/modules/agents/agents.service.ts create mode 100644 app/server/modules/agents/constants.ts diff --git a/app/drizzle/20260505165117_early_purple_man/migration.sql b/app/drizzle/20260505165117_early_purple_man/migration.sql new file mode 100644 index 00000000..db99a6c0 --- /dev/null +++ b/app/drizzle/20260505165117_early_purple_man/migration.sql @@ -0,0 +1,16 @@ +CREATE TABLE `agents_table` ( + `id` text PRIMARY KEY, + `organization_id` text, + `name` text NOT NULL, + `kind` text NOT NULL, + `status` text DEFAULT 'offline' NOT NULL, + `capabilities` text DEFAULT '{}' NOT NULL, + `last_seen_at` integer, + `last_ready_at` integer, + `created_at` integer DEFAULT (unixepoch() * 1000) NOT NULL, + `updated_at` integer DEFAULT (unixepoch() * 1000) NOT NULL, + CONSTRAINT `fk_agents_table_organization_id_organization_id_fk` FOREIGN KEY (`organization_id`) REFERENCES `organization`(`id`) ON DELETE CASCADE +); +--> statement-breakpoint +CREATE INDEX `agents_table_organization_id_idx` ON `agents_table` (`organization_id`);--> statement-breakpoint +CREATE INDEX `agents_table_status_idx` ON `agents_table` (`status`); \ No newline at end of file diff --git a/app/drizzle/20260505165117_early_purple_man/snapshot.json b/app/drizzle/20260505165117_early_purple_man/snapshot.json new file mode 100644 index 00000000..524d847f --- /dev/null +++ b/app/drizzle/20260505165117_early_purple_man/snapshot.json @@ -0,0 +1,2521 @@ +{ + "version": "7", + "dialect": "sqlite", + "id": "cea82b0d-bdb9-4d89-b190-614b90efd1f5", + "prevIds": ["b644a331-6c76-4d56-b4c4-c5b0701183f2"], + "ddl": [ + { + "name": "account", + "entityType": "tables" + }, + { + "name": "agents_table", + "entityType": "tables" + }, + { + "name": "app_metadata", + "entityType": "tables" + }, + { + "name": "backup_schedule_mirrors_table", + "entityType": "tables" + }, + { + "name": "backup_schedule_notifications_table", + "entityType": "tables" + }, + { + "name": "backup_schedules_table", + "entityType": "tables" + }, + { + "name": "invitation", + "entityType": "tables" + }, + { + "name": "member", + "entityType": "tables" + }, + { + "name": "notification_destinations_table", + "entityType": "tables" + }, + { + "name": "organization", + "entityType": "tables" + }, + { + "name": "repositories_table", + "entityType": "tables" + }, + { + "name": "sessions_table", + "entityType": "tables" + }, + { + "name": "sso_provider", + "entityType": "tables" + }, + { + "name": "two_factor", + "entityType": "tables" + }, + { + "name": "users_table", + "entityType": "tables" + }, + { + "name": "verification", + "entityType": "tables" + }, + { + "name": "volumes_table", + "entityType": "tables" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "id", + "entityType": "columns", + "table": "account" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "account_id", + "entityType": "columns", + "table": "account" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "provider_id", + "entityType": "columns", + "table": "account" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "user_id", + "entityType": "columns", + "table": "account" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "access_token", + "entityType": "columns", + "table": "account" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "refresh_token", + "entityType": "columns", + "table": "account" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "id_token", + "entityType": "columns", + "table": "account" + }, + { + "type": "integer", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "access_token_expires_at", + "entityType": "columns", + "table": "account" + }, + { + "type": "integer", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "refresh_token_expires_at", + "entityType": "columns", + "table": "account" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "scope", + "entityType": "columns", + "table": "account" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "password", + "entityType": "columns", + "table": "account" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "created_at", + "entityType": "columns", + "table": "account" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "updated_at", + "entityType": "columns", + "table": "account" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "id", + "entityType": "columns", + "table": "agents_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "organization_id", + "entityType": "columns", + "table": "agents_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "name", + "entityType": "columns", + "table": "agents_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "kind", + "entityType": "columns", + "table": "agents_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": "'offline'", + "generated": null, + "name": "status", + "entityType": "columns", + "table": "agents_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": "'{}'", + "generated": null, + "name": "capabilities", + "entityType": "columns", + "table": "agents_table" + }, + { + "type": "integer", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "last_seen_at", + "entityType": "columns", + "table": "agents_table" + }, + { + "type": "integer", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "last_ready_at", + "entityType": "columns", + "table": "agents_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "created_at", + "entityType": "columns", + "table": "agents_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "updated_at", + "entityType": "columns", + "table": "agents_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "key", + "entityType": "columns", + "table": "app_metadata" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "value", + "entityType": "columns", + "table": "app_metadata" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "created_at", + "entityType": "columns", + "table": "app_metadata" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "updated_at", + "entityType": "columns", + "table": "app_metadata" + }, + { + "type": "integer", + "notNull": false, + "autoincrement": true, + "default": null, + "generated": null, + "name": "id", + "entityType": "columns", + "table": "backup_schedule_mirrors_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "schedule_id", + "entityType": "columns", + "table": "backup_schedule_mirrors_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "repository_id", + "entityType": "columns", + "table": "backup_schedule_mirrors_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "true", + "generated": null, + "name": "enabled", + "entityType": "columns", + "table": "backup_schedule_mirrors_table" + }, + { + "type": "integer", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "last_copy_at", + "entityType": "columns", + "table": "backup_schedule_mirrors_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "last_copy_status", + "entityType": "columns", + "table": "backup_schedule_mirrors_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "last_copy_error", + "entityType": "columns", + "table": "backup_schedule_mirrors_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "created_at", + "entityType": "columns", + "table": "backup_schedule_mirrors_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "schedule_id", + "entityType": "columns", + "table": "backup_schedule_notifications_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "destination_id", + "entityType": "columns", + "table": "backup_schedule_notifications_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "false", + "generated": null, + "name": "notify_on_start", + "entityType": "columns", + "table": "backup_schedule_notifications_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "false", + "generated": null, + "name": "notify_on_success", + "entityType": "columns", + "table": "backup_schedule_notifications_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "true", + "generated": null, + "name": "notify_on_warning", + "entityType": "columns", + "table": "backup_schedule_notifications_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "true", + "generated": null, + "name": "notify_on_failure", + "entityType": "columns", + "table": "backup_schedule_notifications_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "created_at", + "entityType": "columns", + "table": "backup_schedule_notifications_table" + }, + { + "type": "integer", + "notNull": false, + "autoincrement": true, + "default": null, + "generated": null, + "name": "id", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "short_id", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "name", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "volume_id", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "repository_id", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "true", + "generated": null, + "name": "enabled", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "cron_expression", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "retention_policy", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": "'[]'", + "generated": null, + "name": "exclude_patterns", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": "'[]'", + "generated": null, + "name": "exclude_if_present", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": "'[]'", + "generated": null, + "name": "include_paths", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": "'[]'", + "generated": null, + "name": "include_patterns", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "integer", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "last_backup_at", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "last_backup_status", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "last_backup_error", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "integer", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "next_backup_at", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "false", + "generated": null, + "name": "one_file_system", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": "'[]'", + "generated": null, + "name": "custom_restic_params", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "backup_webhooks", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "0", + "generated": null, + "name": "sort_order", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "0", + "generated": null, + "name": "failure_retry_count", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "2", + "generated": null, + "name": "max_retries", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "900000", + "generated": null, + "name": "retry_delay", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "created_at", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "updated_at", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "organization_id", + "entityType": "columns", + "table": "backup_schedules_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "id", + "entityType": "columns", + "table": "invitation" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "organization_id", + "entityType": "columns", + "table": "invitation" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "email", + "entityType": "columns", + "table": "invitation" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "role", + "entityType": "columns", + "table": "invitation" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": "'pending'", + "generated": null, + "name": "status", + "entityType": "columns", + "table": "invitation" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "expires_at", + "entityType": "columns", + "table": "invitation" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "created_at", + "entityType": "columns", + "table": "invitation" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "inviter_id", + "entityType": "columns", + "table": "invitation" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "id", + "entityType": "columns", + "table": "member" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "organization_id", + "entityType": "columns", + "table": "member" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "user_id", + "entityType": "columns", + "table": "member" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": "'member'", + "generated": null, + "name": "role", + "entityType": "columns", + "table": "member" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "created_at", + "entityType": "columns", + "table": "member" + }, + { + "type": "integer", + "notNull": false, + "autoincrement": true, + "default": null, + "generated": null, + "name": "id", + "entityType": "columns", + "table": "notification_destinations_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "name", + "entityType": "columns", + "table": "notification_destinations_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "true", + "generated": null, + "name": "enabled", + "entityType": "columns", + "table": "notification_destinations_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": "'unknown'", + "generated": null, + "name": "status", + "entityType": "columns", + "table": "notification_destinations_table" + }, + { + "type": "integer", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "last_checked", + "entityType": "columns", + "table": "notification_destinations_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "last_error", + "entityType": "columns", + "table": "notification_destinations_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "type", + "entityType": "columns", + "table": "notification_destinations_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "config", + "entityType": "columns", + "table": "notification_destinations_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "created_at", + "entityType": "columns", + "table": "notification_destinations_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "updated_at", + "entityType": "columns", + "table": "notification_destinations_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "organization_id", + "entityType": "columns", + "table": "notification_destinations_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "id", + "entityType": "columns", + "table": "organization" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "name", + "entityType": "columns", + "table": "organization" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "slug", + "entityType": "columns", + "table": "organization" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "logo", + "entityType": "columns", + "table": "organization" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "created_at", + "entityType": "columns", + "table": "organization" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "metadata", + "entityType": "columns", + "table": "organization" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "id", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "short_id", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "provisioning_id", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "name", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "type", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "config", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": "'auto'", + "generated": null, + "name": "compression_mode", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": "'unknown'", + "generated": null, + "name": "status", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "integer", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "last_checked", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "last_error", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "doctor_result", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "stats", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "integer", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "stats_updated_at", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "false", + "generated": null, + "name": "upload_limit_enabled", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "real", + "notNull": true, + "autoincrement": false, + "default": "1", + "generated": null, + "name": "upload_limit_value", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": "'Mbps'", + "generated": null, + "name": "upload_limit_unit", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "false", + "generated": null, + "name": "download_limit_enabled", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "real", + "notNull": true, + "autoincrement": false, + "default": "1", + "generated": null, + "name": "download_limit_value", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": "'Mbps'", + "generated": null, + "name": "download_limit_unit", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "created_at", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "updated_at", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "organization_id", + "entityType": "columns", + "table": "repositories_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "id", + "entityType": "columns", + "table": "sessions_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "user_id", + "entityType": "columns", + "table": "sessions_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "token", + "entityType": "columns", + "table": "sessions_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "expires_at", + "entityType": "columns", + "table": "sessions_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "created_at", + "entityType": "columns", + "table": "sessions_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "updated_at", + "entityType": "columns", + "table": "sessions_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "ip_address", + "entityType": "columns", + "table": "sessions_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "user_agent", + "entityType": "columns", + "table": "sessions_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "impersonated_by", + "entityType": "columns", + "table": "sessions_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "active_organization_id", + "entityType": "columns", + "table": "sessions_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "id", + "entityType": "columns", + "table": "sso_provider" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "provider_id", + "entityType": "columns", + "table": "sso_provider" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "organization_id", + "entityType": "columns", + "table": "sso_provider" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "user_id", + "entityType": "columns", + "table": "sso_provider" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "issuer", + "entityType": "columns", + "table": "sso_provider" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "domain", + "entityType": "columns", + "table": "sso_provider" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "false", + "generated": null, + "name": "auto_link_matching_emails", + "entityType": "columns", + "table": "sso_provider" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "oidc_config", + "entityType": "columns", + "table": "sso_provider" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "saml_config", + "entityType": "columns", + "table": "sso_provider" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "created_at", + "entityType": "columns", + "table": "sso_provider" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "updated_at", + "entityType": "columns", + "table": "sso_provider" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "id", + "entityType": "columns", + "table": "two_factor" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "secret", + "entityType": "columns", + "table": "two_factor" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "backup_codes", + "entityType": "columns", + "table": "two_factor" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "user_id", + "entityType": "columns", + "table": "two_factor" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "id", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "username", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "password_hash", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "false", + "generated": null, + "name": "has_downloaded_restic_password", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": "'MM/DD/YYYY'", + "generated": null, + "name": "date_format", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": "'12h'", + "generated": null, + "name": "time_format", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "created_at", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "updated_at", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "name", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "email", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "false", + "generated": null, + "name": "email_verified", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "image", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "display_username", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "false", + "generated": null, + "name": "two_factor_enabled", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": "'user'", + "generated": null, + "name": "role", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "false", + "generated": null, + "name": "banned", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "ban_reason", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "integer", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "ban_expires", + "entityType": "columns", + "table": "users_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "id", + "entityType": "columns", + "table": "verification" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "identifier", + "entityType": "columns", + "table": "verification" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "value", + "entityType": "columns", + "table": "verification" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "expires_at", + "entityType": "columns", + "table": "verification" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "created_at", + "entityType": "columns", + "table": "verification" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "updated_at", + "entityType": "columns", + "table": "verification" + }, + { + "type": "integer", + "notNull": false, + "autoincrement": true, + "default": null, + "generated": null, + "name": "id", + "entityType": "columns", + "table": "volumes_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "short_id", + "entityType": "columns", + "table": "volumes_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "provisioning_id", + "entityType": "columns", + "table": "volumes_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "name", + "entityType": "columns", + "table": "volumes_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "type", + "entityType": "columns", + "table": "volumes_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": "'unmounted'", + "generated": null, + "name": "status", + "entityType": "columns", + "table": "volumes_table" + }, + { + "type": "text", + "notNull": false, + "autoincrement": false, + "default": null, + "generated": null, + "name": "last_error", + "entityType": "columns", + "table": "volumes_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "last_health_check", + "entityType": "columns", + "table": "volumes_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "created_at", + "entityType": "columns", + "table": "volumes_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "(unixepoch() * 1000)", + "generated": null, + "name": "updated_at", + "entityType": "columns", + "table": "volumes_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "config", + "entityType": "columns", + "table": "volumes_table" + }, + { + "type": "integer", + "notNull": true, + "autoincrement": false, + "default": "true", + "generated": null, + "name": "auto_remount", + "entityType": "columns", + "table": "volumes_table" + }, + { + "type": "text", + "notNull": true, + "autoincrement": false, + "default": null, + "generated": null, + "name": "organization_id", + "entityType": "columns", + "table": "volumes_table" + }, + { + "columns": ["user_id"], + "tableTo": "users_table", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "account_user_id_users_table_id_fk", + "entityType": "fks", + "table": "account" + }, + { + "columns": ["organization_id"], + "tableTo": "organization", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "fk_agents_table_organization_id_organization_id_fk", + "entityType": "fks", + "table": "agents_table" + }, + { + "columns": ["schedule_id"], + "tableTo": "backup_schedules_table", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "backup_schedule_mirrors_table_schedule_id_backup_schedules_table_id_fk", + "entityType": "fks", + "table": "backup_schedule_mirrors_table" + }, + { + "columns": ["repository_id"], + "tableTo": "repositories_table", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "backup_schedule_mirrors_table_repository_id_repositories_table_id_fk", + "entityType": "fks", + "table": "backup_schedule_mirrors_table" + }, + { + "columns": ["schedule_id"], + "tableTo": "backup_schedules_table", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "backup_schedule_notifications_table_schedule_id_backup_schedules_table_id_fk", + "entityType": "fks", + "table": "backup_schedule_notifications_table" + }, + { + "columns": ["destination_id"], + "tableTo": "notification_destinations_table", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "backup_schedule_notifications_table_destination_id_notification_destinations_table_id_fk", + "entityType": "fks", + "table": "backup_schedule_notifications_table" + }, + { + "columns": ["volume_id"], + "tableTo": "volumes_table", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "backup_schedules_table_volume_id_volumes_table_id_fk", + "entityType": "fks", + "table": "backup_schedules_table" + }, + { + "columns": ["repository_id"], + "tableTo": "repositories_table", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "backup_schedules_table_repository_id_repositories_table_id_fk", + "entityType": "fks", + "table": "backup_schedules_table" + }, + { + "columns": ["organization_id"], + "tableTo": "organization", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "backup_schedules_table_organization_id_organization_id_fk", + "entityType": "fks", + "table": "backup_schedules_table" + }, + { + "columns": ["organization_id"], + "tableTo": "organization", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "invitation_organization_id_organization_id_fk", + "entityType": "fks", + "table": "invitation" + }, + { + "columns": ["inviter_id"], + "tableTo": "users_table", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "invitation_inviter_id_users_table_id_fk", + "entityType": "fks", + "table": "invitation" + }, + { + "columns": ["organization_id"], + "tableTo": "organization", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "member_organization_id_organization_id_fk", + "entityType": "fks", + "table": "member" + }, + { + "columns": ["user_id"], + "tableTo": "users_table", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "member_user_id_users_table_id_fk", + "entityType": "fks", + "table": "member" + }, + { + "columns": ["organization_id"], + "tableTo": "organization", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "notification_destinations_table_organization_id_organization_id_fk", + "entityType": "fks", + "table": "notification_destinations_table" + }, + { + "columns": ["organization_id"], + "tableTo": "organization", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "repositories_table_organization_id_organization_id_fk", + "entityType": "fks", + "table": "repositories_table" + }, + { + "columns": ["user_id"], + "tableTo": "users_table", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "sessions_table_user_id_users_table_id_fk", + "entityType": "fks", + "table": "sessions_table" + }, + { + "columns": ["organization_id"], + "tableTo": "organization", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "fk_sso_provider_organization_id_organization_id_fk", + "entityType": "fks", + "table": "sso_provider" + }, + { + "columns": ["user_id"], + "tableTo": "users_table", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "SET NULL", + "nameExplicit": false, + "name": "fk_sso_provider_user_id_users_table_id_fk", + "entityType": "fks", + "table": "sso_provider" + }, + { + "columns": ["user_id"], + "tableTo": "users_table", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "two_factor_user_id_users_table_id_fk", + "entityType": "fks", + "table": "two_factor" + }, + { + "columns": ["organization_id"], + "tableTo": "organization", + "columnsTo": ["id"], + "onUpdate": "NO ACTION", + "onDelete": "CASCADE", + "nameExplicit": false, + "name": "volumes_table_organization_id_organization_id_fk", + "entityType": "fks", + "table": "volumes_table" + }, + { + "columns": ["schedule_id", "destination_id"], + "nameExplicit": false, + "name": "backup_schedule_notifications_table_schedule_id_destination_id_pk", + "entityType": "pks", + "table": "backup_schedule_notifications_table" + }, + { + "columns": ["id"], + "nameExplicit": false, + "name": "account_pk", + "table": "account", + "entityType": "pks" + }, + { + "columns": ["id"], + "nameExplicit": false, + "name": "agents_table_pk", + "table": "agents_table", + "entityType": "pks" + }, + { + "columns": ["key"], + "nameExplicit": false, + "name": "app_metadata_pk", + "table": "app_metadata", + "entityType": "pks" + }, + { + "columns": ["id"], + "nameExplicit": false, + "name": "backup_schedule_mirrors_table_pk", + "table": "backup_schedule_mirrors_table", + "entityType": "pks" + }, + { + "columns": ["id"], + "nameExplicit": false, + "name": "backup_schedules_table_pk", + "table": "backup_schedules_table", + "entityType": "pks" + }, + { + "columns": ["id"], + "nameExplicit": false, + "name": "invitation_pk", + "table": "invitation", + "entityType": "pks" + }, + { + "columns": ["id"], + "nameExplicit": false, + "name": "member_pk", + "table": "member", + "entityType": "pks" + }, + { + "columns": ["id"], + "nameExplicit": false, + "name": "notification_destinations_table_pk", + "table": "notification_destinations_table", + "entityType": "pks" + }, + { + "columns": ["id"], + "nameExplicit": false, + "name": "organization_pk", + "table": "organization", + "entityType": "pks" + }, + { + "columns": ["id"], + "nameExplicit": false, + "name": "repositories_table_pk", + "table": "repositories_table", + "entityType": "pks" + }, + { + "columns": ["id"], + "nameExplicit": false, + "name": "sessions_table_pk", + "table": "sessions_table", + "entityType": "pks" + }, + { + "columns": ["id"], + "nameExplicit": false, + "name": "sso_provider_pk", + "table": "sso_provider", + "entityType": "pks" + }, + { + "columns": ["id"], + "nameExplicit": false, + "name": "two_factor_pk", + "table": "two_factor", + "entityType": "pks" + }, + { + "columns": ["id"], + "nameExplicit": false, + "name": "users_table_pk", + "table": "users_table", + "entityType": "pks" + }, + { + "columns": ["id"], + "nameExplicit": false, + "name": "verification_pk", + "table": "verification", + "entityType": "pks" + }, + { + "columns": ["id"], + "nameExplicit": false, + "name": "volumes_table_pk", + "table": "volumes_table", + "entityType": "pks" + }, + { + "columns": [ + { + "value": "user_id", + "isExpression": false + } + ], + "isUnique": false, + "where": null, + "origin": "manual", + "name": "account_userId_idx", + "entityType": "indexes", + "table": "account" + }, + { + "columns": [ + { + "value": "organization_id", + "isExpression": false + } + ], + "isUnique": false, + "where": null, + "origin": "manual", + "name": "agents_table_organization_id_idx", + "entityType": "indexes", + "table": "agents_table" + }, + { + "columns": [ + { + "value": "status", + "isExpression": false + } + ], + "isUnique": false, + "where": null, + "origin": "manual", + "name": "agents_table_status_idx", + "entityType": "indexes", + "table": "agents_table" + }, + { + "columns": [ + { + "value": "organization_id", + "isExpression": false + } + ], + "isUnique": false, + "where": null, + "origin": "manual", + "name": "invitation_organizationId_idx", + "entityType": "indexes", + "table": "invitation" + }, + { + "columns": [ + { + "value": "email", + "isExpression": false + } + ], + "isUnique": false, + "where": null, + "origin": "manual", + "name": "invitation_email_idx", + "entityType": "indexes", + "table": "invitation" + }, + { + "columns": [ + { + "value": "organization_id", + "isExpression": false + } + ], + "isUnique": false, + "where": null, + "origin": "manual", + "name": "member_organizationId_idx", + "entityType": "indexes", + "table": "member" + }, + { + "columns": [ + { + "value": "user_id", + "isExpression": false + } + ], + "isUnique": false, + "where": null, + "origin": "manual", + "name": "member_userId_idx", + "entityType": "indexes", + "table": "member" + }, + { + "columns": [ + { + "value": "organization_id", + "isExpression": false + }, + { + "value": "user_id", + "isExpression": false + } + ], + "isUnique": true, + "where": null, + "origin": "manual", + "name": "member_org_user_uidx", + "entityType": "indexes", + "table": "member" + }, + { + "columns": [ + { + "value": "slug", + "isExpression": false + } + ], + "isUnique": true, + "where": null, + "origin": "manual", + "name": "organization_slug_uidx", + "entityType": "indexes", + "table": "organization" + }, + { + "columns": [ + { + "value": "organization_id", + "isExpression": false + }, + { + "value": "provisioning_id", + "isExpression": false + } + ], + "isUnique": true, + "where": null, + "origin": "manual", + "name": "repositories_table_org_provisioning_id_uidx", + "entityType": "indexes", + "table": "repositories_table" + }, + { + "columns": [ + { + "value": "user_id", + "isExpression": false + } + ], + "isUnique": false, + "where": null, + "origin": "manual", + "name": "sessionsTable_userId_idx", + "entityType": "indexes", + "table": "sessions_table" + }, + { + "columns": [ + { + "value": "secret", + "isExpression": false + } + ], + "isUnique": false, + "where": null, + "origin": "manual", + "name": "twoFactor_secret_idx", + "entityType": "indexes", + "table": "two_factor" + }, + { + "columns": [ + { + "value": "user_id", + "isExpression": false + } + ], + "isUnique": false, + "where": null, + "origin": "manual", + "name": "twoFactor_userId_idx", + "entityType": "indexes", + "table": "two_factor" + }, + { + "columns": [ + { + "value": "identifier", + "isExpression": false + } + ], + "isUnique": false, + "where": null, + "origin": "manual", + "name": "verification_identifier_idx", + "entityType": "indexes", + "table": "verification" + }, + { + "columns": [ + { + "value": "organization_id", + "isExpression": false + }, + { + "value": "provisioning_id", + "isExpression": false + } + ], + "isUnique": true, + "where": null, + "origin": "manual", + "name": "volumes_table_org_provisioning_id_uidx", + "entityType": "indexes", + "table": "volumes_table" + }, + { + "columns": ["schedule_id", "repository_id"], + "nameExplicit": false, + "name": "backup_schedule_mirrors_table_schedule_id_repository_id_unique", + "entityType": "uniques", + "table": "backup_schedule_mirrors_table" + }, + { + "columns": ["name", "organization_id"], + "nameExplicit": false, + "name": "volumes_table_name_organization_id_unique", + "entityType": "uniques", + "table": "volumes_table" + }, + { + "columns": ["short_id"], + "nameExplicit": false, + "name": "backup_schedules_table_short_id_unique", + "entityType": "uniques", + "table": "backup_schedules_table" + }, + { + "columns": ["short_id"], + "nameExplicit": false, + "name": "repositories_table_short_id_unique", + "entityType": "uniques", + "table": "repositories_table" + }, + { + "columns": ["token"], + "nameExplicit": false, + "name": "sessions_table_token_unique", + "entityType": "uniques", + "table": "sessions_table" + }, + { + "columns": ["provider_id"], + "nameExplicit": false, + "name": "sso_provider_provider_id_unique", + "entityType": "uniques", + "table": "sso_provider" + }, + { + "columns": ["username"], + "nameExplicit": false, + "name": "users_table_username_unique", + "entityType": "uniques", + "table": "users_table" + }, + { + "columns": ["email"], + "nameExplicit": false, + "name": "users_table_email_unique", + "entityType": "uniques", + "table": "users_table" + }, + { + "columns": ["short_id"], + "nameExplicit": false, + "name": "volumes_table_short_id_unique", + "entityType": "uniques", + "table": "volumes_table" + } + ], + "renames": [] +} diff --git a/app/server/db/relations.ts b/app/server/db/relations.ts index 708d9356..adda025b 100644 --- a/app/server/db/relations.ts +++ b/app/server/db/relations.ts @@ -99,6 +99,7 @@ export const relations = defineRelations(schema, (r) => ({ users: r.many.usersTable({ alias: "usersTable_id_organization_id_via_member", }), + agents: r.many.agentsTable(), backupSchedules: r.many.backupSchedulesTable(), notificationDestinations: r.many.notificationDestinationsTable(), repositories: r.many.repositoriesTable(), @@ -119,6 +120,13 @@ export const relations = defineRelations(schema, (r) => ({ optional: false, }), }, + agentsTable: { + organization: r.one.organization({ + from: r.agentsTable.organizationId, + to: r.organization.id, + optional: true, + }), + }, volumesTable: { backupSchedules: r.many.backupSchedulesTable(), organization: r.one.organization({ diff --git a/app/server/db/schema.ts b/app/server/db/schema.ts index ae1cabf6..ca3f24eb 100644 --- a/app/server/db/schema.ts +++ b/app/server/db/schema.ts @@ -198,6 +198,36 @@ export const ssoProvider = sqliteTable("sso_provider", { .default(sql`(unixepoch() * 1000)`), }); +export type AgentKind = "local" | "remote"; +export type AgentStatus = "offline" | "connecting" | "online" | "degraded"; +export type AgentCapabilities = Record; + +export const agentsTable = sqliteTable( + "agents_table", + { + id: text("id").primaryKey(), + organizationId: text("organization_id").references(() => organization.id, { onDelete: "cascade" }), + name: text("name").notNull(), + kind: text("kind").$type().notNull(), + status: text("status").$type().notNull().default("offline"), + capabilities: text("capabilities", { mode: "json" }).$type().notNull().default({}), + lastSeenAt: int("last_seen_at", { mode: "number" }), + lastReadyAt: int("last_ready_at", { mode: "number" }), + createdAt: int("created_at", { mode: "number" }) + .notNull() + .default(sql`(unixepoch() * 1000)`), + updatedAt: int("updated_at", { mode: "number" }) + .notNull() + .$onUpdate(() => Date.now()) + .default(sql`(unixepoch() * 1000)`), + }, + (table) => [ + index("agents_table_organization_id_idx").on(table.organizationId), + index("agents_table_status_idx").on(table.status), + ], +); +export type Agent = typeof agentsTable.$inferSelect; + /** * Volumes Table */ diff --git a/app/server/modules/agents/__tests__/agents-manager.backups.test.ts b/app/server/modules/agents/__tests__/agents-manager.backups.test.ts index 279f9b2e..25830203 100644 --- a/app/server/modules/agents/__tests__/agents-manager.backups.test.ts +++ b/app/server/modules/agents/__tests__/agents-manager.backups.test.ts @@ -1,6 +1,7 @@ import { afterEach, expect, test, vi } from "vitest"; import waitForExpect from "wait-for-expect"; import { fromAny, fromPartial } from "@total-typescript/shoehorn"; +import { Effect } from "effect"; import { agentManager, type ProcessWithAgentRuntime } from "../agents-manager"; import type { AgentManagerRuntime } from "../controller/server"; import type { BackupRunPayload } from "@zerobyte/contracts/agent-protocol"; @@ -20,8 +21,8 @@ afterEach(() => { }); test("cancelBackup resolves a running backup when the cancel command cannot be delivered", async () => { - const sendBackup = vi.fn().mockResolvedValue(true); - const cancelBackup = vi.fn().mockResolvedValue(false); + const sendBackup = vi.fn(() => Effect.succeed(true)); + const cancelBackup = vi.fn(() => Effect.succeed(false)); setAgentRuntime({ sendBackup, cancelBackup }); const resultPromise = agentManager.runBackup("local", { diff --git a/app/server/modules/agents/__tests__/agents-manager.events.test.ts b/app/server/modules/agents/__tests__/agents-manager.events.test.ts new file mode 100644 index 00000000..2b12ef5f --- /dev/null +++ b/app/server/modules/agents/__tests__/agents-manager.events.test.ts @@ -0,0 +1,235 @@ +import { afterEach, expect, test, vi } from "vitest"; +import { Effect } from "effect"; +import { fromAny, fromPartial } from "@total-typescript/shoehorn"; +import type { BackupRunPayload } from "@zerobyte/contracts/agent-protocol"; +import type { AgentManagerEvent } from "../controller/server"; +import type { ProcessWithAgentRuntime } from "../agents-manager"; + +const controllerMock = vi.hoisted(() => ({ + onEvent: null as null | ((event: AgentManagerEvent) => void), + sendBackup: vi.fn(), + cancelBackup: vi.fn(), + stop: vi.fn(), +})); + +vi.mock("../controller/server", async () => { + const { Effect } = await import("effect"); + return { + createAgentManagerRuntime: vi.fn((onEvent: (event: AgentManagerEvent) => void) => { + controllerMock.onEvent = onEvent; + return { + start: Effect.void, + stop: Effect.sync(controllerMock.stop), + sendBackup: controllerMock.sendBackup, + cancelBackup: controllerMock.cancelBackup, + }; + }), + }; +}); + +const processWithAgentRuntime = process as ProcessWithAgentRuntime; + +const resetAgentRuntime = () => { + processWithAgentRuntime.__zerobyteAgentRuntime = { + agentManager: null, + localAgent: null, + isStoppingLocalAgent: false, + localAgentRestartTimeout: null, + activeBackupsByScheduleId: new Map(), + activeBackupScheduleIdsByJobId: new Map(), + }; +}; + +const backupPayload = fromPartial({ + jobId: "job-1", + scheduleId: "schedule-1", +}); + +afterEach(() => { + delete processWithAgentRuntime.__zerobyteAgentRuntime; + controllerMock.onEvent = null; + controllerMock.sendBackup.mockReset(); + controllerMock.cancelBackup.mockReset(); + controllerMock.stop.mockReset(); + vi.resetModules(); + vi.restoreAllMocks(); +}); + +test("backup progress is delivered to the running backup callback", async () => { + resetAgentRuntime(); + controllerMock.sendBackup.mockImplementation(() => Effect.succeed(true)); + const { agentManager, startAgentController, stopAgentController } = await import("../agents-manager"); + const onProgress = vi.fn(); + + await startAgentController(); + const resultPromise = agentManager.runBackup("local", { + scheduleId: 42, + payload: backupPayload, + signal: new AbortController().signal, + onProgress, + }); + + controllerMock.onEvent?.({ + type: "backup.progress", + agentId: "local", + agentName: "Local Agent", + payload: fromAny({ jobId: "job-1", scheduleId: "schedule-1", progress: { percentDone: 0.5 } }), + }); + controllerMock.onEvent?.({ + type: "backup.completed", + agentId: "local", + agentName: "Local Agent", + payload: { jobId: "job-1", scheduleId: "schedule-1", exitCode: 0, result: null }, + }); + + await expect(resultPromise).resolves.toEqual({ + status: "completed", + exitCode: 0, + result: null, + warningDetails: null, + }); + expect(onProgress).toHaveBeenCalledWith({ percentDone: 0.5 }); + await stopAgentController(); +}); + +test("backup failed and cancelled events resolve the matching running backup", async () => { + resetAgentRuntime(); + controllerMock.sendBackup.mockImplementation(() => Effect.succeed(true)); + const { agentManager, startAgentController, stopAgentController } = await import("../agents-manager"); + + await startAgentController(); + const failedPromise = agentManager.runBackup("local", { + scheduleId: 42, + payload: backupPayload, + signal: new AbortController().signal, + onProgress: vi.fn(), + }); + controllerMock.onEvent?.({ + type: "backup.failed", + agentId: "local", + agentName: "Local Agent", + payload: { jobId: "job-1", scheduleId: "schedule-1", error: "failed", errorDetails: "restic failed" }, + }); + await expect(failedPromise).resolves.toEqual({ status: "failed", error: "restic failed" }); + + const cancelledPromise = agentManager.runBackup("local", { + scheduleId: 43, + payload: fromPartial({ jobId: "job-2", scheduleId: "schedule-2" }), + signal: new AbortController().signal, + onProgress: vi.fn(), + }); + controllerMock.onEvent?.({ + type: "backup.cancelled", + agentId: "local", + agentName: "Local Agent", + payload: { jobId: "job-2", scheduleId: "schedule-2", message: "cancelled remotely" }, + }); + await expect(cancelledPromise).resolves.toEqual({ status: "cancelled", message: "cancelled remotely" }); + await stopAgentController(); +}); + +test("agent disconnect cancels only backups owned by that agent", async () => { + resetAgentRuntime(); + controllerMock.sendBackup.mockImplementation(() => Effect.succeed(true)); + const { agentManager, startAgentController, stopAgentController } = await import("../agents-manager"); + + await startAgentController(); + const localPromise = agentManager.runBackup("local", { + scheduleId: 42, + payload: backupPayload, + signal: new AbortController().signal, + onProgress: vi.fn(), + }); + const remotePromise = agentManager.runBackup("remote", { + scheduleId: 43, + payload: fromPartial({ jobId: "job-2", scheduleId: "schedule-2" }), + signal: new AbortController().signal, + onProgress: vi.fn(), + }); + + controllerMock.onEvent?.({ type: "agent.disconnected", agentId: "local", agentName: "Local Agent" }); + controllerMock.onEvent?.({ + type: "backup.completed", + agentId: "remote", + agentName: "Remote Agent", + payload: { jobId: "job-2", scheduleId: "schedule-2", exitCode: 0, result: null }, + }); + + await expect(localPromise).resolves.toEqual({ + status: "cancelled", + message: "The connection to the backup agent was lost. Restart the backup to ensure it completes.", + }); + await expect(remotePromise).resolves.toEqual({ + status: "completed", + exitCode: 0, + result: null, + warningDetails: null, + }); + await stopAgentController(); +}); + +test("runBackup returns unavailable and clears the active run when the command cannot be sent", async () => { + resetAgentRuntime(); + controllerMock.sendBackup.mockImplementation(() => Effect.succeed(false)); + const { agentManager, startAgentController, stopAgentController } = await import("../agents-manager"); + + await startAgentController(); + const result = await agentManager.runBackup("local", { + scheduleId: 42, + payload: backupPayload, + signal: new AbortController().signal, + onProgress: vi.fn(), + }); + + expect(result).toEqual({ + status: "unavailable", + error: new Error("Failed to send backup command to agent local"), + }); + await expect(agentManager.cancelBackup("local", 42)).resolves.toBe(false); + await stopAgentController(); +}); + +test("runBackup rejects before sending when the abort signal is already aborted", async () => { + resetAgentRuntime(); + controllerMock.sendBackup.mockImplementation(() => Effect.succeed(true)); + const { agentManager, startAgentController, stopAgentController } = await import("../agents-manager"); + const abortController = new AbortController(); + abortController.abort(new Error("cancelled before send")); + + await startAgentController(); + await expect( + agentManager.runBackup("local", { + scheduleId: 42, + payload: backupPayload, + signal: abortController.signal, + onProgress: vi.fn(), + }), + ).rejects.toThrow("cancelled before send"); + expect(controllerMock.sendBackup).not.toHaveBeenCalled(); + await stopAgentController(); +}); + +test("runBackup requests cancellation when the abort signal fires while sending", async () => { + resetAgentRuntime(); + const abortController = new AbortController(); + controllerMock.sendBackup.mockImplementation(() => + Effect.sync(() => { + abortController.abort(); + return true; + }), + ); + controllerMock.cancelBackup.mockImplementation(() => Effect.succeed(false)); + const { agentManager, startAgentController, stopAgentController } = await import("../agents-manager"); + + await startAgentController(); + const result = await agentManager.runBackup("local", { + scheduleId: 42, + payload: backupPayload, + signal: abortController.signal, + onProgress: vi.fn(), + }); + + expect(result).toEqual({ status: "cancelled" }); + expect(controllerMock.cancelBackup).toHaveBeenCalledWith("local", { jobId: "job-1", scheduleId: "schedule-1" }); + await stopAgentController(); +}); diff --git a/app/server/modules/agents/__tests__/agents.service.test.ts b/app/server/modules/agents/__tests__/agents.service.test.ts new file mode 100644 index 00000000..f6f87c8a --- /dev/null +++ b/app/server/modules/agents/__tests__/agents.service.test.ts @@ -0,0 +1,72 @@ +import { beforeEach, expect, test } from "vitest"; +import { db } from "~/server/db/db"; +import { agentsTable } from "~/server/db/schema"; +import { LOCAL_AGENT_ID, LOCAL_AGENT_KIND, LOCAL_AGENT_NAME } from "../constants"; +import { agentsService } from "../agents.service"; + +beforeEach(async () => { + await db.delete(agentsTable); +}); + +test("ensureLocalAgent seeds the built-in local agent once", async () => { + await agentsService.ensureLocalAgent(); + await agentsService.ensureLocalAgent(); + + const agents = await agentsService.listAgents(); + + expect(agents).toHaveLength(1); +}); + +test("markAgentConnecting creates and updates connection metadata", async () => { + await agentsService.markAgentConnecting({ + agentId: "remote-agent", + organizationId: null, + agentName: "Remote Agent", + agentKind: "remote", + capabilities: { restic: true }, + connectedAt: 1_000, + }); + await agentsService.markAgentConnecting({ + agentId: "remote-agent", + organizationId: null, + agentName: "Renamed Agent", + agentKind: "remote", + capabilities: { restic: true, webdav: true }, + connectedAt: 2_000, + }); + + const agent = await agentsService.getAgent("remote-agent"); + + expect(agent).toMatchObject({ + id: "remote-agent", + name: "Renamed Agent", + kind: "remote", + status: "connecting", + capabilities: { restic: true, webdav: true }, + lastSeenAt: 2_000, + updatedAt: 2_000, + }); +}); + +test("agent runtime status moves from connecting to online, seen, and offline", async () => { + await agentsService.markAgentConnecting({ + agentId: LOCAL_AGENT_ID, + organizationId: null, + agentName: LOCAL_AGENT_NAME, + agentKind: LOCAL_AGENT_KIND, + connectedAt: 1_000, + }); + await agentsService.markAgentOnline(LOCAL_AGENT_ID, 2_000); + await agentsService.markAgentSeen(LOCAL_AGENT_ID, 3_000); + await agentsService.markAgentOffline(LOCAL_AGENT_ID, 4_000); + + const agent = await agentsService.getAgent(LOCAL_AGENT_ID); + + expect(agent).toMatchObject({ + id: LOCAL_AGENT_ID, + status: "offline", + lastSeenAt: 3_000, + lastReadyAt: 2_000, + updatedAt: 4_000, + }); +}); diff --git a/app/server/modules/agents/__tests__/controller-runtime.test.ts b/app/server/modules/agents/__tests__/controller-runtime.test.ts new file mode 100644 index 00000000..66523716 --- /dev/null +++ b/app/server/modules/agents/__tests__/controller-runtime.test.ts @@ -0,0 +1,280 @@ +import { Effect } from "effect"; +import { afterEach, expect, test, vi } from "vitest"; +import waitForExpect from "wait-for-expect"; +import { fromPartial } from "@total-typescript/shoehorn"; +import { createAgentMessage } from "@zerobyte/contracts/agent-protocol"; +import { LOCAL_AGENT_ID, LOCAL_AGENT_KIND, LOCAL_AGENT_NAME } from "../constants"; + +const agentsServiceMocks = vi.hoisted(() => ({ + markAgentConnecting: vi.fn(() => Promise.resolve()), + markAgentOnline: vi.fn(() => Promise.resolve()), + markAgentSeen: vi.fn(() => Promise.resolve()), + markAgentOffline: vi.fn(() => Promise.resolve()), +})); + +const tokenMocks = vi.hoisted(() => ({ + validateAgentToken: vi.fn(), +})); + +vi.mock("../agents.service", () => ({ + agentsService: agentsServiceMocks, +})); + +vi.mock("../helpers/tokens", () => ({ + validateAgentToken: tokenMocks.validateAgentToken, +})); + +const createSocket = (id: string, agentId = LOCAL_AGENT_ID) => ({ + data: { + id, + agentId, + organizationId: null, + agentName: agentId === LOCAL_AGENT_ID ? LOCAL_AGENT_NAME : `${LOCAL_AGENT_NAME} ${agentId}`, + agentKind: LOCAL_AGENT_KIND, + }, + send: vi.fn(() => 1), + close: vi.fn(), +}); + +type CapturedFetch = NonNullable[0]["fetch"]>; + +const invokeFetch = (fetch: CapturedFetch | undefined, request: Request, srv: Parameters[1]) => { + if (!fetch) { + throw new Error("Bun.serve was not called with a fetch handler"); + } + + return Reflect.apply(fetch, fromPartial>({}), [ + request, + srv, + ]) as ReturnType; +}; + +const startRuntime = async (onEvent = vi.fn()) => { + const { createAgentManagerRuntime } = await import("../controller/server"); + const runtime = createAgentManagerRuntime(onEvent); + await Effect.runPromise(runtime.start); + return { runtime, onEvent }; +}; + +afterEach(() => { + vi.restoreAllMocks(); + tokenMocks.validateAgentToken.mockReset(); + vi.resetModules(); +}); + +test("websocket fetch rejects requests without a bearer token", async () => { + const serve = vi + .spyOn(Bun, "serve") + .mockReturnValue(fromPartial({ port: 3001, stop: vi.fn(() => Promise.resolve()) })); + const { runtime } = await startRuntime(); + const fetch = serve.mock.calls[0]?.[0].fetch; + const upgrade = vi.fn(); + const srv = fromPartial>[1]>({ upgrade }); + + const response = await invokeFetch(fetch, new Request("http://localhost:3001/agent"), srv); + await Effect.runPromise(runtime.stop); + + expect(response?.status).toBe(401); + expect(await response?.text()).toBe("Missing token"); + expect(upgrade).not.toHaveBeenCalled(); +}); + +test("websocket fetch rejects invalid bearer tokens", async () => { + tokenMocks.validateAgentToken.mockResolvedValue(undefined); + const serve = vi + .spyOn(Bun, "serve") + .mockReturnValue(fromPartial({ port: 3001, stop: vi.fn(() => Promise.resolve()) })); + const { runtime } = await startRuntime(); + const fetch = serve.mock.calls[0]?.[0].fetch; + const upgrade = vi.fn(); + const srv = fromPartial>[1]>({ upgrade }); + + const response = await invokeFetch( + fetch, + new Request("http://localhost:3001/agent", { headers: { authorization: "Bearer bad-token" } }), + srv, + ); + await Effect.runPromise(runtime.stop); + + expect(response?.status).toBe(401); + expect(await response?.text()).toBe("Invalid or revoked token"); + expect(tokenMocks.validateAgentToken).toHaveBeenCalledWith("bad-token"); + expect(upgrade).not.toHaveBeenCalled(); +}); + +test("websocket fetch upgrades valid agent tokens with connection metadata", async () => { + tokenMocks.validateAgentToken.mockResolvedValue({ + agentId: LOCAL_AGENT_ID, + organizationId: null, + agentName: LOCAL_AGENT_NAME, + agentKind: LOCAL_AGENT_KIND, + }); + const serve = vi + .spyOn(Bun, "serve") + .mockReturnValue(fromPartial({ port: 3001, stop: vi.fn(() => Promise.resolve()) })); + const { runtime } = await startRuntime(); + const fetch = serve.mock.calls[0]?.[0].fetch; + const upgrade = vi.fn(() => true); + const srv = fromPartial>[1]>({ upgrade }); + + const response = await invokeFetch( + fetch, + new Request("http://localhost:3001/agent", { headers: { authorization: "Bearer valid-token" } }), + srv, + ); + await Effect.runPromise(runtime.stop); + + expect(response).toBeUndefined(); + expect(tokenMocks.validateAgentToken).toHaveBeenCalledWith("valid-token"); + expect(upgrade).toHaveBeenCalledWith(expect.any(Request), { + data: expect.objectContaining({ + agentId: LOCAL_AGENT_ID, + organizationId: null, + agentName: LOCAL_AGENT_NAME, + agentKind: LOCAL_AGENT_KIND, + id: expect.any(String), + }), + }); +}); + +test("websocket lifecycle updates agent connection status", async () => { + const stop = vi.fn(() => Promise.resolve()); + const serve = vi.spyOn(Bun, "serve").mockReturnValue(fromPartial({ port: 3001, stop })); + const { runtime } = await startRuntime(); + const websocket = serve.mock.calls[0]?.[0].websocket; + const socket = createSocket("connection-1"); + + await websocket?.open?.(fromPartial(socket)); + await websocket?.message?.(fromPartial(socket), createAgentMessage("agent.ready", { agentId: LOCAL_AGENT_ID })); + await websocket?.message?.(fromPartial(socket), createAgentMessage("heartbeat.pong", { sentAt: 123 })); + await websocket?.close?.(fromPartial(socket), 1000, "done"); + await Effect.runPromise(runtime.stop); + + expect(agentsServiceMocks.markAgentConnecting).toHaveBeenCalledWith({ + agentId: LOCAL_AGENT_ID, + organizationId: null, + agentName: LOCAL_AGENT_NAME, + agentKind: LOCAL_AGENT_KIND, + }); + expect(agentsServiceMocks.markAgentOnline).toHaveBeenCalledWith(LOCAL_AGENT_ID, expect.any(Number)); + expect(agentsServiceMocks.markAgentSeen).toHaveBeenCalledWith(LOCAL_AGENT_ID, expect.any(Number)); + expect(agentsServiceMocks.markAgentOffline).toHaveBeenCalledWith(LOCAL_AGENT_ID); + expect(stop).toHaveBeenCalledWith(true); +}); + +test("websocket open failure closes the upgraded socket", async () => { + agentsServiceMocks.markAgentConnecting.mockRejectedValueOnce(new Error("db unavailable")); + const serve = vi + .spyOn(Bun, "serve") + .mockReturnValue(fromPartial({ port: 3001, stop: vi.fn(() => Promise.resolve()) })); + const { runtime } = await startRuntime(); + const websocket = serve.mock.calls[0]?.[0].websocket; + const socket = createSocket("connection-1"); + + await websocket?.open?.(fromPartial(socket)); + + expect(socket.close).toHaveBeenCalled(); + await Effect.runPromise(runtime.stop); +}); + +test("shutdown closes all sessions and stops the server when marking one agent offline fails", async () => { + agentsServiceMocks.markAgentOffline.mockRejectedValueOnce(new Error("db unavailable")); + const stop = vi.fn(() => Promise.resolve()); + const serve = vi.spyOn(Bun, "serve").mockReturnValue(fromPartial({ port: 3001, stop })); + const { runtime, onEvent } = await startRuntime(vi.fn()); + const websocket = serve.mock.calls[0]?.[0].websocket; + const firstSocket = createSocket("connection-1", "agent-1"); + const secondSocket = createSocket("connection-2", "agent-2"); + + await websocket?.open?.(fromPartial(firstSocket)); + await websocket?.open?.(fromPartial(secondSocket)); + await Effect.runPromise(runtime.stop); + + expect(agentsServiceMocks.markAgentOffline).toHaveBeenCalledWith("agent-1"); + expect(agentsServiceMocks.markAgentOffline).toHaveBeenCalledWith("agent-2"); + expect(onEvent).toHaveBeenCalledWith(expect.objectContaining({ type: "agent.disconnected", agentId: "agent-1" })); + expect(onEvent).toHaveBeenCalledWith(expect.objectContaining({ type: "agent.disconnected", agentId: "agent-2" })); + expect(stop).toHaveBeenCalledWith(true); +}); + +test("closing a replaced connection reports disconnect without marking the active agent offline", async () => { + const serve = vi + .spyOn(Bun, "serve") + .mockReturnValue(fromPartial({ port: 3001, stop: vi.fn(() => Promise.resolve()) })); + const { runtime, onEvent } = await startRuntime(vi.fn()); + const websocket = serve.mock.calls[0]?.[0].websocket; + const oldSocket = createSocket("connection-1"); + const newSocket = createSocket("connection-2"); + const offlineCallsBeforeClose = agentsServiceMocks.markAgentOffline.mock.calls.length; + + await websocket?.open?.(fromPartial(oldSocket)); + await websocket?.open?.(fromPartial(newSocket)); + await websocket?.message?.(fromPartial(newSocket), createAgentMessage("agent.ready", { agentId: LOCAL_AGENT_ID })); + await websocket?.close?.(fromPartial(oldSocket), 1000, "replaced"); + + expect(onEvent).toHaveBeenCalledWith( + expect.objectContaining({ type: "agent.disconnected", agentId: LOCAL_AGENT_ID }), + ); + expect(agentsServiceMocks.markAgentOffline).toHaveBeenCalledTimes(offlineCallsBeforeClose); + expect( + await Effect.runPromise( + runtime.sendBackup(LOCAL_AGENT_ID, { + jobId: "job-1", + scheduleId: "schedule-1", + organizationId: "org-1", + sourcePath: "/tmp/source", + repositoryConfig: { backend: "local" as const, path: "/tmp/repository" }, + options: {}, + runtime: { + password: "password", + cacheDir: "/tmp/cache", + passFile: "/tmp/pass", + defaultExcludes: [], + rcloneConfigFile: "/tmp/rclone.conf", + }, + webhooks: { pre: null, post: null }, + webhookAllowedOrigins: [], + webhookTimeoutMs: 60_000, + }), + ), + ).toBe(true); + await Effect.runPromise(runtime.stop); +}); + +test("sendBackup is only delivered after the agent is ready", async () => { + const serve = vi + .spyOn(Bun, "serve") + .mockReturnValue(fromPartial({ port: 3001, stop: vi.fn(() => Promise.resolve()) })); + const { runtime } = await startRuntime(); + const websocket = serve.mock.calls[0]?.[0].websocket; + const socket = createSocket("connection-1"); + const payload = { + jobId: "job-1", + scheduleId: "schedule-1", + organizationId: "org-1", + sourcePath: "/tmp/source", + repositoryConfig: { backend: "local" as const, path: "/tmp/repository" }, + options: {}, + runtime: { + password: "password", + cacheDir: "/tmp/cache", + passFile: "/tmp/pass", + defaultExcludes: [], + rcloneConfigFile: "/tmp/rclone.conf", + }, + webhooks: { pre: null, post: null }, + webhookAllowedOrigins: [], + webhookTimeoutMs: 60_000, + }; + + await websocket?.open?.(fromPartial(socket)); + await expect(Effect.runPromise(runtime.sendBackup(LOCAL_AGENT_ID, payload))).resolves.toBe(false); + + await websocket?.message?.(fromPartial(socket), createAgentMessage("agent.ready", { agentId: LOCAL_AGENT_ID })); + await expect(Effect.runPromise(runtime.sendBackup(LOCAL_AGENT_ID, payload))).resolves.toBe(true); + + await waitForExpect(() => { + expect(socket.send).toHaveBeenCalledWith(expect.stringContaining('"type":"backup.run"')); + }); + await Effect.runPromise(runtime.stop); +}); diff --git a/app/server/modules/agents/__tests__/session.test.ts b/app/server/modules/agents/__tests__/session.test.ts index b5c30ffd..5c1766da 100644 --- a/app/server/modules/agents/__tests__/session.test.ts +++ b/app/server/modules/agents/__tests__/session.test.ts @@ -1,29 +1,41 @@ -import { Effect, Exit, Scope } from "effect"; +import { Effect, Exit, Fiber, Scope } from "effect"; import { expect, test, vi } from "vitest"; import waitForExpect from "wait-for-expect"; import { fromPartial } from "@total-typescript/shoehorn"; -import { createAgentMessage } from "@zerobyte/contracts/agent-protocol"; +import { createAgentMessage, type AgentMessage } from "@zerobyte/contracts/agent-protocol"; +import { LOCAL_AGENT_ID, LOCAL_AGENT_KIND, LOCAL_AGENT_NAME } from "../constants"; import { createControllerAgentSession } from "../controller/session"; const createSocket = (overrides: Partial[0]> = {}) => { return { - data: { id: "connection-1", agentId: "local", organizationId: null, agentName: "Local Agent" }, + data: { + id: "connection-1", + agentId: LOCAL_AGENT_ID, + organizationId: null, + agentName: LOCAL_AGENT_NAME, + agentKind: LOCAL_AGENT_KIND, + }, send: vi.fn(() => 1), close: vi.fn(), ...overrides, }; }; -const createSession = (handlers: Parameters[1] = {}, socket = createSocket()) => { +const createSession = ( + onEvent: Parameters[1] = () => Effect.void, + socket = createSocket(), +) => { const scope = Effect.runSync(Scope.make()); try { - const session = Effect.runSync(Scope.extend(createControllerAgentSession(fromPartial(socket), handlers), scope)); + const session = Effect.runSync(Scope.extend(createControllerAgentSession(fromPartial(socket), onEvent), scope)); return { session, run: () => { - Effect.runFork(Scope.extend(session.run, scope)); + const fiber = Effect.runFork(Scope.extend(session.run, scope)); + Effect.runSync(Scope.addFinalizer(scope, Fiber.interrupt(fiber))); + return fiber; }, socket, close: () => { @@ -39,92 +51,33 @@ const createSession = (handlers: Parameters } }; -test("close emits a synthetic backup.cancelled for a started backup", () => { - const onBackupCancelled = vi.fn(); - const { session, close } = createSession({ - onBackupCancelled, - }); +test("closing the session scope interrupts the session runner", async () => { + const { run, closeAsync } = createSession(); + const fiber = run(); - Effect.runSync( - session.handleMessage( - createAgentMessage("backup.started", { - jobId: "job-1", - scheduleId: "schedule-1", - }), - ), - ); + await closeAsync(); + + const exit = await Effect.runPromise(Fiber.await(fiber).pipe(Effect.timeout("100 millis"))); + expect(Exit.isInterrupted(exit)).toBe(true); +}); + +test("close reports a transport disconnect", () => { + const onEvent = vi.fn(() => Effect.void); + const { close } = createSession(onEvent); close(); - expect(onBackupCancelled).toHaveBeenCalledTimes(1); - expect(onBackupCancelled).toHaveBeenCalledWith( + expect(onEvent).toHaveBeenCalledTimes(1); + expect(onEvent).toHaveBeenCalledWith( expect.objectContaining({ - jobId: "job-1", - scheduleId: "schedule-1", + type: "agent.disconnected", }), ); }); -test.each([ - { - name: "backup.completed", - jobId: "job-1", - scheduleId: "schedule-1", - terminalMessage: createAgentMessage("backup.completed", { - jobId: "job-1", - scheduleId: "schedule-1", - exitCode: 0, - result: null, - }), - expectedCancelledCalls: 0, - }, - { - name: "backup.failed", - jobId: "job-2", - scheduleId: "schedule-2", - terminalMessage: createAgentMessage("backup.failed", { - jobId: "job-2", - scheduleId: "schedule-2", - error: "backup failed", - }), - expectedCancelledCalls: 0, - }, - { - name: "backup.cancelled", - jobId: "job-3", - scheduleId: "schedule-3", - terminalMessage: createAgentMessage("backup.cancelled", { - jobId: "job-3", - scheduleId: "schedule-3", - message: "Backup was cancelled", - }), - expectedCancelledCalls: 1, - }, -])("close does not emit an extra synthetic backup.cancelled after $name", (testCase) => { - const onBackupCancelled = vi.fn(); - const { session, close } = createSession({ - onBackupCancelled, - }); - - Effect.runSync( - session.handleMessage( - createAgentMessage("backup.started", { - jobId: testCase.jobId, - scheduleId: testCase.scheduleId, - }), - ), - ); - Effect.runSync(session.handleMessage(testCase.terminalMessage)); - close(); - - expect(onBackupCancelled).toHaveBeenCalledTimes(testCase.expectedCancelledCalls); -}); - -test("close emits a synthetic backup.cancelled for a queued backup", () => { - const onBackupCancelled = vi.fn(); - const { session, close } = createSession({ - onBackupCancelled, - }); +test("sendBackup only queues the transport message", () => { + const onEvent = vi.fn(() => Effect.void); + const { session, close } = createSession(onEvent); Effect.runSync( session.sendBackup({ @@ -152,31 +105,77 @@ test("close emits a synthetic backup.cancelled for a queued backup", () => { close(); - expect(onBackupCancelled).toHaveBeenCalledTimes(1); - expect(onBackupCancelled).toHaveBeenLastCalledWith( - expect.objectContaining({ - jobId: "job-queued", - scheduleId: "schedule-queued", - }), - ); + expect(onEvent).not.toHaveBeenCalledWith(expect.objectContaining({ type: "backup.cancelled" })); }); -test("a dropped backup.cancel closes the session and emits a synthetic backup.cancelled", async () => { +test("invalid inbound messages are ignored", () => { + const onEvent = vi.fn(() => Effect.void); + const { session, close } = createSession(onEvent); + + Effect.runSync(session.handleMessage("not json")); + Effect.runSync(session.handleMessage(JSON.stringify({ type: "backup.progress", payload: {} }))); + + expect(onEvent).not.toHaveBeenCalled(); + close(); +}); + +test("agent.ready marks the session ready and forwards the event", () => { + const onEvent = vi.fn(() => Effect.void); + const { session, close } = createSession(onEvent); + + expect(Effect.runSync(session.isReady())).toBe(false); + Effect.runSync(session.handleMessage(createAgentMessage("agent.ready", { agentId: LOCAL_AGENT_ID }))); + + expect(Effect.runSync(session.isReady())).toBe(true); + expect(onEvent).toHaveBeenCalledWith({ type: "agent.ready", payload: { agentId: LOCAL_AGENT_ID } }); + close(); +}); + +test("backup agent messages are forwarded unchanged", () => { + const onEvent = vi.fn(() => Effect.void); + const { session, close } = createSession(onEvent); + const message = { + type: "backup.progress" as const, + payload: { + jobId: "job-1", + scheduleId: "schedule-1", + progress: { + message_type: "status" as const, + seconds_elapsed: 0, + seconds_remaining: 0, + percent_done: 0.5, + total_files: 0, + files_done: 0, + total_bytes: 0, + bytes_done: 0, + current_files: [], + }, + }, + } satisfies Extract; + + Effect.runSync(session.handleMessage(createAgentMessage(message.type, message.payload))); + + expect(onEvent).toHaveBeenCalledWith( + expect.objectContaining({ + type: message.type, + payload: expect.objectContaining({ + jobId: message.payload.jobId, + scheduleId: message.payload.scheduleId, + progress: expect.objectContaining(message.payload.progress), + }), + }), + ); + close(); +}); + +test("a dropped backup.cancel closes the session and reports a transport disconnect", async () => { const send = vi.fn(() => 0); const socket = createSocket({ send, close: vi.fn() }); - const onBackupCancelled = vi.fn(); - const { session, run, closeAsync } = createSession({ onBackupCancelled }, socket); + const onEvent = vi.fn(() => Effect.void); + const { session, run, closeAsync } = createSession(onEvent, socket); try { run(); - Effect.runSync( - session.handleMessage( - createAgentMessage("backup.started", { - jobId: "job-1", - scheduleId: "schedule-1", - }), - ), - ); Effect.runSync( session.sendBackupCancel({ jobId: "job-1", @@ -187,11 +186,10 @@ test("a dropped backup.cancel closes the session and emits a synthetic backup.ca await waitForExpect(() => { expect(send).toHaveBeenCalledTimes(1); expect(socket.close).toHaveBeenCalledTimes(1); - expect(onBackupCancelled).toHaveBeenCalledTimes(1); - expect(onBackupCancelled).toHaveBeenCalledWith( + expect(onEvent).toHaveBeenCalledTimes(1); + expect(onEvent).toHaveBeenCalledWith( expect.objectContaining({ - jobId: "job-1", - scheduleId: "schedule-1", + type: "agent.disconnected", }), ); }); diff --git a/app/server/modules/agents/agents-manager.ts b/app/server/modules/agents/agents-manager.ts index 9342b072..e82b8dde 100644 --- a/app/server/modules/agents/agents-manager.ts +++ b/app/server/modules/agents/agents-manager.ts @@ -1,7 +1,8 @@ import { logger } from "@zerobyte/core/node"; import type { BackupRunPayload } from "@zerobyte/contracts/agent-protocol"; +import { Effect } from "effect"; import { config } from "../../core/config"; -import type { AgentBackupEventHandlers } from "./controller/server"; +import { createAgentManagerRuntime, type AgentManagerEvent } from "./controller/server"; import { spawnLocalAgentProcess, stopLocalAgentProcess } from "./local/process"; import type { BackupExecutionProgress, BackupExecutionResult } from "./helpers/runtime-state"; import { createAgentRuntimeState } from "./helpers/runtime-state"; @@ -46,6 +47,17 @@ const resolveActiveBackupRun = (scheduleId: number, result: BackupExecutionResul return true; }; +const cancelActiveBackupRunsForAgent = (agentId: string, message: string) => { + const activeBackupsByScheduleId = getActiveBackupsByScheduleId(); + const matchingScheduleIds = [...activeBackupsByScheduleId.values()] + .filter((activeBackupRun) => activeBackupRun.agentId === agentId) + .map((activeBackupRun) => activeBackupRun.scheduleId); + + for (const scheduleId of matchingScheduleIds) { + resolveActiveBackupRun(scheduleId, { status: "cancelled", message }); + } +}; + const getActiveBackupRun = (jobId: string, scheduleId: string, eventName: string, agentId: string) => { const trackedScheduleId = getActiveBackupScheduleIdsByJobId().get(jobId); if (trackedScheduleId === undefined) { @@ -86,10 +98,12 @@ const requestBackupCancellation = async (agentId: string, scheduleId: number) => } if ( - await runtime.cancelBackup(agentId, { - jobId: activeBackupRun.jobId, - scheduleId: activeBackupRun.scheduleShortId, - }) + await Effect.runPromise( + runtime.cancelBackup(agentId, { + jobId: activeBackupRun.jobId, + scheduleId: activeBackupRun.scheduleShortId, + }), + ) ) { return true; } @@ -98,68 +112,99 @@ const requestBackupCancellation = async (agentId: string, scheduleId: number) => return true; }; -const backupEventHandlers: AgentBackupEventHandlers = { - onBackupStarted: ({ agentId, payload }) => { - getActiveBackupRun(payload.jobId, payload.scheduleId, "backup.started", agentId); - }, - onBackupProgress: ({ agentId, payload }) => { - const activeBackupRun = getActiveBackupRun(payload.jobId, payload.scheduleId, "backup.progress", agentId); - if (!activeBackupRun) { - return; +const handleAgentManagerEvent = (event: AgentManagerEvent) => { + switch (event.type) { + case "agent.disconnected": { + cancelActiveBackupRunsForAgent( + event.agentId, + "The connection to the backup agent was lost. Restart the backup to ensure it completes.", + ); + break; } - - activeBackupRun.onProgress(payload.progress); - }, - onBackupCompleted: ({ agentId, payload }) => { - const activeBackupRun = getActiveBackupRun(payload.jobId, payload.scheduleId, "backup.completed", agentId); - if (!activeBackupRun) { - return; + case "backup.started": { + getActiveBackupRun(event.payload.jobId, event.payload.scheduleId, event.type, event.agentId); + break; } + case "backup.progress": { + const activeBackupRun = getActiveBackupRun( + event.payload.jobId, + event.payload.scheduleId, + event.type, + event.agentId, + ); + if (!activeBackupRun) { + break; + } - resolveActiveBackupRun(activeBackupRun.scheduleId, { - status: "completed", - exitCode: payload.exitCode, - result: payload.result, - warningDetails: payload.warningDetails ?? null, - }); - }, - onBackupFailed: ({ agentId, payload }) => { - const activeBackupRun = getActiveBackupRun(payload.jobId, payload.scheduleId, "backup.failed", agentId); - if (!activeBackupRun) { - return; + activeBackupRun.onProgress(event.payload.progress); + break; } + case "backup.completed": { + const activeBackupRun = getActiveBackupRun( + event.payload.jobId, + event.payload.scheduleId, + event.type, + event.agentId, + ); + if (!activeBackupRun) { + break; + } - resolveActiveBackupRun(activeBackupRun.scheduleId, { - status: "failed", - error: payload.errorDetails ?? payload.error, - }); - }, - onBackupCancelled: ({ agentId, payload }) => { - const activeBackupRun = getActiveBackupRun(payload.jobId, payload.scheduleId, "backup.cancelled", agentId); - if (!activeBackupRun) { - return; + resolveActiveBackupRun(activeBackupRun.scheduleId, { + status: "completed", + exitCode: event.payload.exitCode, + result: event.payload.result, + warningDetails: event.payload.warningDetails ?? null, + }); + break; } + case "backup.failed": { + const activeBackupRun = getActiveBackupRun( + event.payload.jobId, + event.payload.scheduleId, + event.type, + event.agentId, + ); + if (!activeBackupRun) { + break; + } - resolveActiveBackupRun(activeBackupRun.scheduleId, { - status: "cancelled", - message: activeBackupRun.cancellationRequested ? undefined : payload.message, - }); - }, + resolveActiveBackupRun(activeBackupRun.scheduleId, { + status: "failed", + error: event.payload.errorDetails ?? event.payload.error, + }); + break; + } + case "backup.cancelled": { + const activeBackupRun = getActiveBackupRun( + event.payload.jobId, + event.payload.scheduleId, + event.type, + event.agentId, + ); + if (!activeBackupRun) { + break; + } + + resolveActiveBackupRun(activeBackupRun.scheduleId, { + status: "cancelled", + message: activeBackupRun.cancellationRequested ? undefined : event.payload.message, + }); + break; + } + } }; export const startAgentController = async () => { const runtime = getAgentRuntimeState(); if (runtime.agentManager) { - await runtime.agentManager.stop(); + await Effect.runPromise(runtime.agentManager.stop); runtime.agentManager = null; } - const { createAgentManagerRuntime } = await import("./controller/server"); - const nextAgentManager = createAgentManagerRuntime(); - nextAgentManager.setBackupEventHandlers(backupEventHandlers); - - await nextAgentManager.start(); + const nextAgentManager = createAgentManagerRuntime(handleAgentManagerEvent); + await Effect.runPromise(nextAgentManager.start); runtime.agentManager = nextAgentManager; }; @@ -167,7 +212,9 @@ export const stopAgentController = async () => { const runtime = getAgentRuntimeState(); const agentManagerRuntime = runtime.agentManager; runtime.agentManager = null; - await agentManagerRuntime?.stop(); + if (agentManagerRuntime) { + await Effect.runPromise(agentManagerRuntime.stop); + } }; export const agentManager = { @@ -186,6 +233,7 @@ export const agentManager = { const completion = new Promise((resolve) => { getActiveBackupsByScheduleId().set(request.scheduleId, { + agentId, scheduleId: request.scheduleId, jobId: request.payload.jobId, scheduleShortId: request.payload.scheduleId, @@ -197,7 +245,7 @@ export const agentManager = { }); try { - if (!(await runtime.sendBackup(agentId, request.payload))) { + if (!(await Effect.runPromise(runtime.sendBackup(agentId, request.payload)))) { clearActiveBackupRun(request.scheduleId); return { status: "unavailable", diff --git a/app/server/modules/agents/agents.service.ts b/app/server/modules/agents/agents.service.ts new file mode 100644 index 00000000..43e093d9 --- /dev/null +++ b/app/server/modules/agents/agents.service.ts @@ -0,0 +1,129 @@ +import { eq } from "drizzle-orm"; +import { db } from "../../db/db"; +import { agentsTable, type Agent, type AgentCapabilities, type AgentKind } from "../../db/schema"; +import { LOCAL_AGENT_CAPABILITIES, LOCAL_AGENT_ID, LOCAL_AGENT_KIND, LOCAL_AGENT_NAME } from "./constants"; + +type AgentConnectionRegistration = { + agentId: string; + organizationId: string | null; + agentName: string; + agentKind: AgentKind; + capabilities?: AgentCapabilities; + connectedAt?: number; +}; + +const listAgents = async (organizationId?: string | null) => { + if (organizationId === undefined) { + return db.query.agentsTable.findMany({ orderBy: { createdAt: "asc" } }); + } + + if (organizationId === null) { + return db.query.agentsTable.findMany({ + where: { organizationId: { isNull: true } }, + orderBy: { createdAt: "asc" }, + }); + } + + return db.query.agentsTable.findMany({ + where: { organizationId }, + orderBy: { createdAt: "asc" }, + }); +}; + +const getAgent = async (agentId: string) => { + return db.query.agentsTable.findFirst({ where: { id: agentId } }); +}; + +const ensureLocalAgent = async () => { + const existing = await getAgent(LOCAL_AGENT_ID); + + if (existing) { + return existing; + } + + await db.insert(agentsTable).values({ + id: LOCAL_AGENT_ID, + organizationId: null, + name: LOCAL_AGENT_NAME, + kind: LOCAL_AGENT_KIND, + status: "offline", + capabilities: LOCAL_AGENT_CAPABILITIES, + updatedAt: Date.now(), + }); + + return getAgent(LOCAL_AGENT_ID); +}; + +const markAgentConnecting = async (params: AgentConnectionRegistration) => { + const { agentId, organizationId, agentName, agentKind, capabilities, connectedAt = Date.now() } = params; + + await db + .insert(agentsTable) + .values({ + id: agentId, + organizationId, + name: agentName, + kind: agentKind, + status: "connecting", + capabilities: capabilities ?? {}, + lastSeenAt: connectedAt, + updatedAt: connectedAt, + }) + .onConflictDoUpdate({ + target: agentsTable.id, + set: { + organizationId, + name: agentName, + kind: agentKind, + status: "connecting", + lastSeenAt: connectedAt, + updatedAt: connectedAt, + capabilities: capabilities ?? {}, + }, + }); + + return getAgent(agentId); +}; + +const updateAgentRuntime = async (agentId: string, values: Partial) => { + const [updatedAgent] = await db.update(agentsTable).set(values).where(eq(agentsTable.id, agentId)).returning(); + + if (!updatedAgent) { + throw new Error(`Agent ${agentId} not found`); + } + + return updatedAgent; +}; + +const markAgentOnline = async (agentId: string, readyAt = Date.now()) => { + return updateAgentRuntime(agentId, { + status: "online", + lastSeenAt: readyAt, + lastReadyAt: readyAt, + updatedAt: readyAt, + }); +}; + +const markAgentSeen = async (agentId: string, seenAt = Date.now()) => { + return updateAgentRuntime(agentId, { + lastSeenAt: seenAt, + updatedAt: seenAt, + }); +}; + +const markAgentOffline = async (agentId: string, disconnectedAt = Date.now()) => { + return updateAgentRuntime(agentId, { + status: "offline", + updatedAt: disconnectedAt, + }); +}; + +export const agentsService = { + listAgents, + getAgent, + ensureLocalAgent, + markAgentConnecting, + markAgentOnline, + markAgentSeen, + markAgentOffline, +}; diff --git a/app/server/modules/agents/constants.ts b/app/server/modules/agents/constants.ts new file mode 100644 index 00000000..7af03331 --- /dev/null +++ b/app/server/modules/agents/constants.ts @@ -0,0 +1,6 @@ +import type { AgentCapabilities, AgentKind } from "../../db/schema"; + +export const LOCAL_AGENT_ID = "local"; +export const LOCAL_AGENT_NAME = "Local Agent"; +export const LOCAL_AGENT_KIND: AgentKind = "local"; +export const LOCAL_AGENT_CAPABILITIES: AgentCapabilities = {}; diff --git a/app/server/modules/agents/controller/server.ts b/app/server/modules/agents/controller/server.ts index ef7663b1..d5201fa4 100644 --- a/app/server/modules/agents/controller/server.ts +++ b/app/server/modules/agents/controller/server.ts @@ -1,40 +1,35 @@ import { Data, Effect, Exit, Fiber, Scope } from "effect"; import { logger } from "@zerobyte/core/node"; import { toMessage } from "@zerobyte/core/utils"; -import type { - BackupCancelPayload, - BackupCancelledPayload, - BackupCompletedPayload, - BackupFailedPayload, - BackupProgressPayload, - BackupRunPayload, - BackupStartedPayload, -} from "@zerobyte/contracts/agent-protocol"; -import { createControllerAgentSession, type AgentConnectionData, type ControllerAgentSession } from "./session"; +import type { AgentMessage, BackupCancelPayload, BackupRunPayload } from "@zerobyte/contracts/agent-protocol"; +import { + createControllerAgentSession, + type AgentConnectionData, + type ControllerAgentSession, + type ControllerAgentSessionEvent, +} from "./session"; +import { agentsService } from "../agents.service"; import { validateAgentToken } from "../helpers/tokens"; -type AgentBackupEventContext = { +type AgentEventContext = { agentId: string; agentName: string; - payload: - | BackupStartedPayload - | BackupProgressPayload - | BackupCompletedPayload - | BackupFailedPayload - | BackupCancelledPayload; }; -export type AgentBackupEventHandlers = { - onBackupStarted?: (context: AgentBackupEventContext & { payload: BackupStartedPayload }) => void; - onBackupProgress?: (context: AgentBackupEventContext & { payload: BackupProgressPayload }) => void; - onBackupCompleted?: (context: AgentBackupEventContext & { payload: BackupCompletedPayload }) => void; - onBackupFailed?: (context: AgentBackupEventContext & { payload: BackupFailedPayload }) => void; - onBackupCancelled?: (context: AgentBackupEventContext & { payload: BackupCancelledPayload }) => void; -}; +type AgentBackupMessage = Extract< + AgentMessage, + { + type: "backup.started" | "backup.progress" | "backup.completed" | "backup.failed" | "backup.cancelled"; + } +>; + +export type AgentManagerEvent = + | (AgentEventContext & { type: "agent.disconnected" }) + | (AgentEventContext & AgentBackupMessage); type ControllerAgentSessionHandle = { + agentId: string; session: ControllerAgentSession; - runFiber: Fiber.RuntimeFiber; scope: Scope.CloseableScope; }; @@ -42,89 +37,163 @@ class StopAgentManagerServerError extends Data.TaggedError("StopAgentManagerServ cause: unknown; }> {} -export function createAgentManagerRuntime() { +export function createAgentManagerRuntime(onEvent: (event: AgentManagerEvent) => void) { let sessions = new Map(); - let backupHandlers: AgentBackupEventHandlers = {}; let runtimeScope: Scope.CloseableScope | null = null; const closeSession = (sessionHandle: ControllerAgentSessionHandle) => Effect.gen(function* () { - yield* Fiber.interrupt(sessionHandle.runFiber); yield* Scope.close(sessionHandle.scope, Exit.succeed(undefined)); + yield* Effect.sync(() => { + if (sessions.get(sessionHandle.agentId) === sessionHandle) { + sessions.delete(sessionHandle.agentId); + } + }); }); + const markAgentOfflineForShutdown = (agentId: string) => + Effect.tryPromise({ + try: () => agentsService.markAgentOffline(agentId), + catch: (error) => new StopAgentManagerServerError({ cause: error }), + }).pipe( + Effect.catchAll((error) => + logger.effect.error(`Failed to mark agent ${agentId} offline during shutdown: ${toMessage(error)}`), + ), + ); + const closeAllSessions = Effect.gen(function* () { - const currentSessions = sessions; - sessions = new Map(); - for (const sessionHandle of currentSessions.values()) { + const currentSessions = [...sessions.entries()]; + for (const [agentId, sessionHandle] of currentSessions) { + yield* markAgentOfflineForShutdown(agentId); yield* closeSession(sessionHandle); } + sessions = new Map(); }); const getSessionHandle = (agentId: string) => sessions.get(agentId); - const getSession = (agentId: string) => getSessionHandle(agentId)?.session; - const createSessionHandlers = (ws: Bun.ServerWebSocket) => { - const agentId = ws.data.agentId; - const agentName = ws.data.agentName; + const handleSessionEvent = (params: { agentId: string; agentName: string; sessionId: string }) => { + const { agentId, agentName } = params; - return { - onBackupStarted: (payload: BackupStartedPayload) => { - backupHandlers.onBackupStarted?.({ agentId, agentName, payload }); - }, - onBackupProgress: (payload: BackupProgressPayload) => { - backupHandlers.onBackupProgress?.({ agentId, agentName, payload }); - }, - onBackupCompleted: (payload: BackupCompletedPayload) => { - backupHandlers.onBackupCompleted?.({ agentId, agentName, payload }); - }, - onBackupFailed: (payload: BackupFailedPayload) => { - backupHandlers.onBackupFailed?.({ agentId, agentName, payload }); - }, - onBackupCancelled: (payload: BackupCancelledPayload) => { - backupHandlers.onBackupCancelled?.({ agentId, agentName, payload }); - }, + return (event: ControllerAgentSessionEvent) => { + switch (event.type) { + case "agent.ready": { + const at = Date.now(); + return Effect.promise(async () => { + await agentsService.markAgentOnline(agentId, at); + }); + } + case "heartbeat.pong": { + const at = Date.now(); + return Effect.promise(() => agentsService.markAgentSeen(agentId, at)); + } + case "agent.disconnected": { + return Effect.sync(() => onEvent({ type: "agent.disconnected", agentId, agentName })); + } + default: { + return Effect.sync(() => onEvent({ ...event, agentId, agentName })); + } + } }; }; - const createSession = (ws: Bun.ServerWebSocket) => { - // Manual scope management because we are out of Effect - const scope = Effect.runSync(Scope.make()); + const createSession = (ws: Bun.ServerWebSocket) => + Effect.gen(function* () { + const scope = yield* Scope.make(); - try { - const session = Effect.runSync(Scope.extend(createControllerAgentSession(ws, createSessionHandlers(ws)), scope)); - const runFiber = Effect.runFork(Scope.extend(session.run, scope)); + const session = yield* Scope.extend( + createControllerAgentSession( + ws, + handleSessionEvent({ + agentId: ws.data.agentId, + agentName: ws.data.agentName, + sessionId: ws.data.id, + }), + ), + scope, + ); + const runFiber = yield* Effect.forkDaemon(Scope.extend(session.run, scope)); + yield* Scope.addFinalizer(scope, Fiber.interrupt(runFiber)); - return { session, runFiber, scope }; - } catch (error) { - Effect.runSync(Scope.close(scope, Exit.fail(error))); - throw error; - } - }; - - const setSession = (agentId: string, sessionHandle: ControllerAgentSessionHandle) => { - const existingSession = getSessionHandle(agentId); - if (existingSession) { - void Effect.runPromise(closeSession(existingSession)).catch((error) => { - logger.error(`Failed to close existing agent session for ${agentId}: ${toMessage(error)}`); - }); - } - - sessions.set(agentId, sessionHandle); - }; - - const removeSession = (agentId: string, connectionId: string) => { - const sessionHandle = getSessionHandle(agentId); - if (!sessionHandle || sessionHandle.session.connectionId !== connectionId) { - return; - } - - sessions.delete(agentId); - void Effect.runPromise(closeSession(sessionHandle)).catch((error) => { - logger.error(`Failed to close agent session for ${agentId}: ${toMessage(error)}`); + return { agentId: ws.data.agentId, session, scope }; }); - }; + + const setSession = (sessionHandle: ControllerAgentSessionHandle) => + Effect.gen(function* () { + const existingSession = sessions.get(sessionHandle.agentId); + sessions.set(sessionHandle.agentId, sessionHandle); + + if (existingSession) { + yield* closeSession(existingSession); + } + }); + + const removeSession = (agentId: string, connectionId: string) => + Effect.gen(function* () { + const handle = sessions.get(agentId); + if (!handle || handle.session.connectionId !== connectionId) { + return false; + } + + yield* closeSession(handle); + + yield* Effect.promise(() => agentsService.markAgentOffline(agentId)); + return true; + }); + + const handleMessage = (ws: Bun.ServerWebSocket, data: unknown) => + Effect.gen(function* () { + if (typeof data !== "string") { + yield* logger.effect.warn(`Ignoring non-text message from agent ${ws.data.agentId}`); + return; + } + + const session = getSession(ws.data.agentId); + if (!session || session.connectionId !== ws.data.id) { + yield* logger.effect.warn(`No active session for agent ${ws.data.agentId} on ${ws.data.id}`); + return; + } + + yield* session.handleMessage(data); + }); + + const handleOpen = (ws: Bun.ServerWebSocket) => + Effect.gen(function* () { + yield* Effect.promise(() => + agentsService.markAgentConnecting({ + agentId: ws.data.agentId, + organizationId: ws.data.organizationId, + agentName: ws.data.agentName, + agentKind: ws.data.agentKind, + }), + ); + + const sessionHandle = yield* createSession(ws); + yield* setSession(sessionHandle); + yield* logger.effect.info(`Agent "${ws.data.agentName}" (${ws.data.agentId}) connected on ${ws.data.id}`); + }); + + const handleClose = (ws: Bun.ServerWebSocket) => + Effect.gen(function* () { + yield* removeSession(ws.data.agentId, ws.data.id); + yield* logger.effect.info(`Agent "${ws.data.agentName}" (${ws.data.agentId}) disconnected`); + }); + + const runWebSocketHandler = ( + ws: Bun.ServerWebSocket, + event: string, + effect: Effect.Effect, + ) => + Effect.runPromise( + effect.pipe( + Effect.catchAllCause((cause) => + logger.effect.error( + `Agent websocket ${event} failed for ${ws.data.agentId} on ${ws.data.id}: ${toMessage(cause)}`, + ), + ), + ), + ); const acquireServer = Effect.acquireRelease( Effect.sync(() => @@ -149,37 +218,24 @@ export function createAgentManagerRuntime() { agentId: result.agentId, organizationId: result.organizationId, agentName: result.agentName, + agentKind: result.agentKind, }, }); if (upgraded) return undefined; return new Response("WebSocket upgrade failed", { status: 400 }); }, websocket: { - open: (ws) => { - setSession(ws.data.agentId, createSession(ws)); - logger.info(`Agent "${ws.data.agentName}" (${ws.data.agentId}) connected on ${ws.data.id}`); - }, - message: (ws, data) => { - if (typeof data !== "string") { - logger.warn(`Ignoring non-text message from agent ${ws.data.agentId}`); - return; + open: async (ws) => { + await runWebSocketHandler(ws, "open", handleOpen(ws)); + if (getSession(ws.data.agentId)?.connectionId !== ws.data.id) { + ws.close(); } - - const session = getSession(ws.data.agentId); - if (!session || session.connectionId !== ws.data.id) { - logger.warn(`No active session for agent ${ws.data.agentId} on ${ws.data.id}`); - return; - } - - void Effect.runPromise(session.handleMessage(data)).catch((error) => { - logger.error( - `Failed to handle message from agent ${ws.data.agentId} on ${ws.data.id}: ${toMessage(error)}`, - ); - }); }, - close: (ws) => { - removeSession(ws.data.agentId, ws.data.id); - logger.info(`Agent "${ws.data.agentName}" (${ws.data.agentId}) disconnected`); + message: async (ws, data) => { + await runWebSocketHandler(ws, "message", handleMessage(ws, data)); + }, + close: async (ws) => { + await runWebSocketHandler(ws, "close", handleClose(ws)); }, }, }), @@ -192,15 +248,13 @@ export function createAgentManagerRuntime() { catch: (error) => new StopAgentManagerServerError({ cause: error }), }), ), - Effect.catchAll((error) => - Effect.sync(() => { - logger.error(`Failed to stop Agent Manager server: ${toMessage(error.cause)}`); - }), - ), + Effect.catchAll((error) => { + return logger.effect.error(`Failed to stop Agent Manager server: ${toMessage(error.cause)}`); + }), ), ); - const stop = async () => { + const stop = Effect.gen(function* () { if (!runtimeScope) { return; } @@ -208,70 +262,67 @@ export function createAgentManagerRuntime() { logger.info("Stopping Agent Manager..."); const scope = runtimeScope; runtimeScope = null; - await Effect.runPromise(Scope.close(scope, Exit.succeed(undefined))); - }; + yield* Scope.close(scope, Exit.succeed(undefined)); + }); - const start = async () => { + const start = Effect.gen(function* () { if (runtimeScope) { - await stop(); + yield* stop; } logger.info("Starting Agent Manager..."); - const scope = Effect.runSync(Scope.make()); + const scope = yield* Scope.make(); - try { - const server = Effect.runSync(Scope.extend(acquireServer, scope)); - runtimeScope = scope; - logger.info(`Agent Manager listening on port ${server.port}`); - } catch (error) { - await Effect.runPromise(Scope.close(scope, Exit.fail(error))); - throw error; - } - }; + const server = yield* Scope.extend(acquireServer, scope).pipe( + Effect.catchAllCause((cause) => + Scope.close(scope, Exit.failCause(cause)).pipe(Effect.andThen(Effect.failCause(cause))), + ), + ); + runtimeScope = scope; + logger.info(`Agent Manager listening on port ${server.port}`); + }); return { start, - sendBackup: async (agentId: string, payload: BackupRunPayload) => { - const session = getSession(agentId); + sendBackup: (agentId: string, payload: BackupRunPayload) => + Effect.gen(function* () { + const session = getSession(agentId); - if (!session) { - logger.warn(`Cannot send backup command. Agent ${agentId} is not connected.`); - return false; - } + if (!session) { + logger.warn(`Cannot send backup command. Agent ${agentId} is not connected.`); + return false; + } - if (!Effect.runSync(session.isReady())) { - logger.warn(`Cannot send backup command. Agent ${agentId} is not ready.`); - return false; - } + if (!(yield* session.isReady())) { + logger.warn(`Cannot send backup command. Agent ${agentId} is not ready.`); + return false; + } - if (!(await Effect.runPromise(session.sendBackup(payload)))) { - logger.warn(`Cannot send backup command. Agent ${agentId} is no longer accepting commands.`); - return false; - } + if (!(yield* session.sendBackup(payload))) { + logger.warn(`Cannot send backup command. Agent ${agentId} is no longer accepting commands.`); + return false; + } - logger.info(`Sent backup command ${payload.jobId} to agent ${agentId} for schedule ${payload.scheduleId}`); - return true; - }, - cancelBackup: async (agentId: string, payload: BackupCancelPayload) => { - const session = getSession(agentId); + logger.info(`Sent backup command ${payload.jobId} to agent ${agentId} for schedule ${payload.scheduleId}`); + return true; + }), + cancelBackup: (agentId: string, payload: BackupCancelPayload) => + Effect.gen(function* () { + const session = getSession(agentId); - if (!session) { - logger.warn(`Cannot cancel backup command. Agent ${agentId} is not connected.`); - return false; - } + if (!session) { + logger.warn(`Cannot cancel backup command. Agent ${agentId} is not connected.`); + return false; + } - if (!(await Effect.runPromise(session.sendBackupCancel(payload)))) { - logger.warn(`Cannot cancel backup command. Agent ${agentId} is no longer accepting commands.`); - return false; - } + if (!(yield* session.sendBackupCancel(payload))) { + logger.warn(`Cannot cancel backup command. Agent ${agentId} is no longer accepting commands.`); + return false; + } - logger.info(`Sent backup cancel for command ${payload.jobId} to agent ${agentId}`); - return true; - }, - setBackupEventHandlers: (handlers: AgentBackupEventHandlers) => { - backupHandlers = handlers; - }, - getBackupEventHandlers: () => backupHandlers, + logger.info(`Sent backup cancel for command ${payload.jobId} to agent ${agentId}`); + return true; + }), stop, }; } diff --git a/app/server/modules/agents/controller/session.ts b/app/server/modules/agents/controller/session.ts index 999cec41..c2c3d9c7 100644 --- a/app/server/modules/agents/controller/session.ts +++ b/app/server/modules/agents/controller/session.ts @@ -1,15 +1,11 @@ import { Effect, Queue, Ref, type Scope } from "effect"; +import type { AgentKind } from "../../../db/schema"; import { createControllerMessage, parseAgentMessage, type AgentMessage, type BackupCancelPayload, - type BackupCancelledPayload, - type BackupCompletedPayload, - type BackupFailedPayload, - type BackupProgressPayload, type BackupRunPayload, - type BackupStartedPayload, type ControllerWireMessage, } from "@zerobyte/contracts/agent-protocol"; import { logger } from "@zerobyte/core/node"; @@ -20,6 +16,7 @@ export type AgentConnectionData = { agentId: string; organizationId: string | null; agentName: string; + agentKind: AgentKind; }; type AgentSocket = Bun.ServerWebSocket; @@ -30,18 +27,7 @@ type SessionState = { lastPongAt: number | null; }; -type TrackedBackupJob = { - scheduleId: string; - state: "pending" | "active"; -}; - -type ControllerAgentSessionHandlers = { - onBackupStarted?: (payload: BackupStartedPayload) => void; - onBackupProgress?: (payload: BackupProgressPayload) => void; - onBackupCompleted?: (payload: BackupCompletedPayload) => void; - onBackupFailed?: (payload: BackupFailedPayload) => void; - onBackupCancelled?: (payload: BackupCancelledPayload) => void; -}; +export type ControllerAgentSessionEvent = AgentMessage | { type: "agent.disconnected" }; export type ControllerAgentSession = { readonly connectionId: string; @@ -54,12 +40,11 @@ export type ControllerAgentSession = { export const createControllerAgentSession = ( socket: AgentSocket, - handlers: ControllerAgentSessionHandlers = {}, + onEvent: (event: ControllerAgentSessionEvent) => Effect.Effect, ): Effect.Effect => Effect.gen(function* () { let isClosed = false; const outboundQueue = yield* Queue.bounded(64); - const trackedBackupJobs = yield* Ref.make>(new Map()); const state = yield* Ref.make({ isReady: false, lastSeenAt: null, @@ -78,37 +63,10 @@ export const createControllerAgentSession = ( const updateState = (update: (current: SessionState) => SessionState) => Ref.update(state, update); - const setTrackedBackupJob = (jobId: string, trackedBackupJob: TrackedBackupJob) => { - return Ref.update(trackedBackupJobs, (current) => { - const next = new Map(current); - next.set(jobId, trackedBackupJob); - return next; - }); - }; - - const deleteTrackedBackupJob = (jobId: string) => { - return Ref.update(trackedBackupJobs, (current) => { - const next = new Map(current); - next.delete(jobId); - return next; - }); - }; - - const takeTrackedBackupJobs = Ref.modify( - trackedBackupJobs, - (current) => [current, new Map()] as const, - ); - const releaseSession = Effect.gen(function* () { - yield* updateState((current) => ({ ...current, isReady: false })); - const trackedJobs = yield* takeTrackedBackupJobs; - for (const [jobId, trackedJob] of trackedJobs) { - const message = "The connection to the backup agent was lost. Restart the backup to ensure it completes."; - - yield* Effect.sync(() => { - handlers.onBackupCancelled?.({ jobId, scheduleId: trackedJob.scheduleId, message }); - }); - } + const disconnectedAt = Date.now(); + yield* updateState((current) => ({ ...current, isReady: false, lastSeenAt: disconnectedAt })); + yield* onEvent({ type: "agent.disconnected" }); yield* Queue.shutdown(outboundQueue); }); @@ -126,16 +84,13 @@ export const createControllerAgentSession = ( yield* Effect.addFinalizer(() => closeSession()); const handleSendFailure = (reason: string) => { - logger.error( - `Closing session for agent ${socket.data.agentId} on ${socket.data.id} after an outbound websocket send failed: ${reason}`, - ); - - socket.close(); - - void Effect.runPromise(closeSession()).catch((error) => { + return Effect.gen(function* () { logger.error( - `Failed to close session for agent ${socket.data.agentId} on ${socket.data.id}: ${toMessage(error)}`, + `Closing session for agent ${socket.data.agentId} on ${socket.data.id} after an outbound websocket send failed: ${reason}`, ); + + yield* Effect.sync(() => socket.close()); + yield* closeSession(); }); }; @@ -144,17 +99,16 @@ export const createControllerAgentSession = ( Effect.forever( Effect.gen(function* () { const message = yield* Queue.take(outboundQueue); - yield* Effect.sync(() => { - try { - const sendResult = socket.send(message); - if (sendResult === 0) { - handleSendFailure("connection issue"); - } - } catch (error) { - handleSendFailure(toMessage(error)); - } + + const sendResult = yield* Effect.try({ + try: () => socket.send(message), + catch: (error) => toMessage(error), }); - }), + + if (sendResult === 0) { + yield* handleSendFailure("connection issue"); + } + }).pipe(Effect.catchAll((reason) => handleSendFailure(reason))), ), ); @@ -177,61 +131,18 @@ export const createControllerAgentSession = ( const handleAgentMessage = (message: AgentMessage) => Effect.gen(function* () { - yield* updateState((current) => ({ ...current, lastSeenAt: Date.now() })); - - switch (message.type) { - case "agent.ready": { - yield* updateState((current) => ({ ...current, isReady: true })); - yield* Effect.sync(() => { - logger.info(`Agent "${socket.data.agentName}" (${socket.data.agentId}) is ready`); - }); - break; - } - case "backup.started": { - yield* setTrackedBackupJob(message.payload.jobId, { - scheduleId: message.payload.scheduleId, - state: "active", - }); - yield* Effect.sync(() => { - logger.info( - `Backup ${message.payload.jobId} started on agent ${socket.data.agentId} for schedule ${message.payload.scheduleId}`, - ); - handlers.onBackupStarted?.(message.payload); - }); - break; - } - case "backup.progress": { - yield* Effect.sync(() => { - handlers.onBackupProgress?.(message.payload); - }); - break; - } - case "backup.completed": { - yield* deleteTrackedBackupJob(message.payload.jobId); - yield* Effect.sync(() => { - handlers.onBackupCompleted?.(message.payload); - }); - break; - } - case "backup.failed": { - yield* deleteTrackedBackupJob(message.payload.jobId); - yield* Effect.sync(() => { - handlers.onBackupFailed?.(message.payload); - }); - break; - } - case "backup.cancelled": { - yield* deleteTrackedBackupJob(message.payload.jobId); - yield* Effect.sync(() => { - handlers.onBackupCancelled?.(message.payload); - }); - break; - } - case "heartbeat.pong": { - yield* updateState((current) => ({ ...current, lastPongAt: message.payload.sentAt })); - break; - } + if (message.type === "agent.ready") { + const readyAt = Date.now(); + yield* updateState((current) => ({ ...current, isReady: true, lastSeenAt: readyAt })); + yield* logger.effect.info(`Agent "${socket.data.agentName}" (${socket.data.agentId}) is ready`); } + + if (message.type === "heartbeat.pong") { + const seenAt = Date.now(); + yield* updateState((current) => ({ ...current, lastSeenAt: seenAt, lastPongAt: message.payload.sentAt })); + } + + yield* onEvent(message); }); return { @@ -241,33 +152,19 @@ export const createControllerAgentSession = ( const parsed = parseAgentMessage(data); if (parsed === null) { - yield* Effect.sync(() => { - logger.warn(`Invalid JSON from agent ${socket.data.agentId}`); - }); + yield* logger.effect.warn(`Invalid JSON from agent ${socket.data.agentId}`); return; } if (!parsed.success) { - yield* Effect.sync(() => { - logger.warn(`Invalid agent message from ${socket.data.agentId}: ${parsed.error.message}`); - }); + yield* logger.effect.warn(`Invalid agent message from ${socket.data.agentId}: ${parsed.error.message}`); return; } yield* handleAgentMessage(parsed.data); }); }, - sendBackup: (payload) => { - return Effect.gen(function* () { - const queued = yield* offerOutbound(createControllerMessage("backup.run", payload)); - - if (queued) { - yield* setTrackedBackupJob(payload.jobId, { scheduleId: payload.scheduleId, state: "pending" }); - } - - return queued; - }); - }, + sendBackup: (payload) => offerOutbound(createControllerMessage("backup.run", payload)), sendBackupCancel: (payload) => offerOutbound(createControllerMessage("backup.cancel", payload)), isReady: () => Ref.get(state).pipe(Effect.map((current) => current.isReady)), run, diff --git a/app/server/modules/agents/helpers/runtime-state.ts b/app/server/modules/agents/helpers/runtime-state.ts index 51de3fb4..13fc2a15 100644 --- a/app/server/modules/agents/helpers/runtime-state.ts +++ b/app/server/modules/agents/helpers/runtime-state.ts @@ -11,6 +11,7 @@ export type BackupExecutionResult = | { status: "cancelled"; message?: string }; type ActiveBackupRun = { + agentId: string; scheduleId: number; jobId: string; scheduleShortId: string; diff --git a/app/server/modules/agents/helpers/tokens.ts b/app/server/modules/agents/helpers/tokens.ts index febb9d94..431c3900 100644 --- a/app/server/modules/agents/helpers/tokens.ts +++ b/app/server/modules/agents/helpers/tokens.ts @@ -1,4 +1,5 @@ import { cryptoUtils } from "~/server/utils/crypto"; +import { LOCAL_AGENT_ID, LOCAL_AGENT_KIND, LOCAL_AGENT_NAME } from "../constants"; export const deriveLocalAgentToken = async () => { return cryptoUtils.deriveSecret("zerobyte:local-agent-token"); @@ -7,6 +8,6 @@ export const deriveLocalAgentToken = async () => { export const validateAgentToken = async (token: string) => { const localToken = await deriveLocalAgentToken(); if (token === localToken) { - return { agentId: "local", organizationId: null, agentName: "local" }; + return { agentId: LOCAL_AGENT_ID, organizationId: null, agentName: LOCAL_AGENT_NAME, agentKind: LOCAL_AGENT_KIND }; } }; diff --git a/app/server/modules/backups/backup-executor.ts b/app/server/modules/backups/backup-executor.ts index 7fce27ea..3a9e0214 100644 --- a/app/server/modules/backups/backup-executor.ts +++ b/app/server/modules/backups/backup-executor.ts @@ -5,15 +5,14 @@ import { config } from "../../core/config"; import { restic, resticDeps } from "../../core/restic"; import type { BackupRunPayload } from "@zerobyte/contracts/agent-protocol"; import { agentManager, type BackupExecutionProgress } from "../agents/agents-manager"; +import { LOCAL_AGENT_ID } from "../agents/constants"; import { getVolumePath } from "../volumes/helpers"; import { decryptRepositoryConfig } from "../repositories/repository-config-secrets"; import { createBackupOptions } from "./backup.helpers"; import { toErrorDetails } from "../../utils/errors"; -const LOCAL_AGENT_ID = "local"; const FUSE_VOLUME_BACKENDS = new Set(["rclone", "sftp", "webdav"]); const IGNORE_INODE_FLAG = "--ignore-inode"; - type BackupExecutionRequest = { scheduleId: number; schedule: BackupSchedule; diff --git a/app/server/modules/lifecycle/bootstrap.ts b/app/server/modules/lifecycle/bootstrap.ts index a1cbb8a2..51efcde9 100644 --- a/app/server/modules/lifecycle/bootstrap.ts +++ b/app/server/modules/lifecycle/bootstrap.ts @@ -1,6 +1,7 @@ import { runDbMigrations } from "../../db/db"; import { config } from "../../core/config"; import { startAgentController, startLocalAgent, stopAgentController, stopLocalAgent } from "../agents/agents-manager"; +import { agentsService } from "../agents/agents.service"; import { runMigrations } from "./migrations"; import { startup } from "./startup"; @@ -9,6 +10,7 @@ let bootstrapPromise: Promise | undefined; const runBootstrap = async () => { await runDbMigrations(); await runMigrations(); + await agentsService.ensureLocalAgent(); try { await startAgentController(); diff --git a/app/test/setup-shared.ts b/app/test/setup-shared.ts index b9e12c55..81320690 100644 --- a/app/test/setup-shared.ts +++ b/app/test/setup-shared.ts @@ -1,4 +1,5 @@ import { vi } from "vitest"; +import { Effect } from "effect"; process.env.BASE_URL = "http://localhost:3000"; process.env.TRUSTED_ORIGINS = "http://localhost:3000"; @@ -13,6 +14,12 @@ vi.mock(import("@zerobyte/core/node"), async () => { info: () => {}, warn: () => {}, error: () => {}, + effect: { + debug: () => Effect.void, + info: () => Effect.void, + warn: () => Effect.void, + error: () => Effect.void, + }, }, }; }); diff --git a/apps/agent/src/commands/backup-run.ts b/apps/agent/src/commands/backup-run.ts index 1112fe2b..500ef2a3 100644 --- a/apps/agent/src/commands/backup-run.ts +++ b/apps/agent/src/commands/backup-run.ts @@ -21,7 +21,7 @@ export const handleBackupRunCommand = (context: ControllerCommandContext, payloa return; } - logger.info(`Starting backup ${payload.jobId} for schedule ${payload.scheduleId}`); + yield* logger.effect.info(`Starting backup ${payload.jobId} for schedule ${payload.scheduleId}`); const abortController = new AbortController(); yield* context.setRunningJob(payload.jobId, { scheduleId: payload.scheduleId, abortController }); diff --git a/packages/core/src/node/logger.ts b/packages/core/src/node/logger.ts index f9d35719..f1f163d2 100644 --- a/packages/core/src/node/logger.ts +++ b/packages/core/src/node/logger.ts @@ -2,6 +2,7 @@ import { format } from "date-fns"; import { createConsola, type ConsolaReporter } from "consola"; import { formatWithOptions } from "node:util"; import { sanitizeSensitiveData } from "../utils/sanitize"; +import { Effect } from "effect"; type LogLevel = "debug" | "info" | "warn" | "error"; @@ -102,4 +103,10 @@ export const logger = { info: (...messages: unknown[]) => consola.info(formatMessages(messages).join(" ")), warn: (...messages: unknown[]) => consola.warn(formatMessages(messages).join(" ")), error: (...messages: unknown[]) => consola.error(formatMessages(messages).join(" ")), + effect: { + debug: (...messages: unknown[]) => Effect.sync(() => consola.debug(formatMessages(messages).join(" "))), + info: (...messages: unknown[]) => Effect.sync(() => consola.info(formatMessages(messages).join(" "))), + warn: (...messages: unknown[]) => Effect.sync(() => consola.warn(formatMessages(messages).join(" "))), + error: (...messages: unknown[]) => Effect.sync(() => consola.error(formatMessages(messages).join(" "))), + }, }; diff --git a/packages/core/test/setup.ts b/packages/core/test/setup.ts index 37612ad0..5ccc1b2d 100644 --- a/packages/core/test/setup.ts +++ b/packages/core/test/setup.ts @@ -1,4 +1,5 @@ import { vi } from "vitest"; +import { Effect } from "effect"; vi.mock(import("../src/node/logger.ts"), () => ({ logger: { @@ -6,5 +7,11 @@ vi.mock(import("../src/node/logger.ts"), () => ({ info: () => {}, warn: () => {}, error: () => {}, + effect: { + debug: () => Effect.void, + info: () => Effect.void, + warn: () => Effect.void, + error: () => Effect.void, + }, }, }));