Compare commits

...

19 Commits

Author SHA1 Message Date
Deluan
a91fe5ab5b refactor(plugins): use migrateDB for task queue schema and fix constant name collision
Replaced the raw db.Exec call in createTaskQueueSchema with migrateDB,
matching the pattern used by createKVStoreSchema. This enables version-tracked
schema migrations via SQLite's PRAGMA user_version, allowing future schema
changes to be appended incrementally. Also renamed cleanupInterval to
taskCleanupInterval to resolve a redeclaration conflict with host_kvstore.go.
2026-02-28 23:17:42 -05:00
Deluan
da9408958e feat(plugins): add ClearQueue function to remove pending tasks from a specified queue
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-28 23:12:43 -05:00
Deluan
f93a869368 feat(plugins): update TaskWorker interface to return status messages and refactor task queue service
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-28 23:12:43 -05:00
Deluan
dc81fcdff4 refactor(plugins): simplify goroutine management in task queue service
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-28 23:12:43 -05:00
Deluan
d40c6f4faa feat(plugins): increase maxConcurrency for task queue and handle budget exhaustion
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-28 23:12:43 -05:00
Deluan
421c0732bd refactor(plugins): streamline task queue configuration and error handling
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-28 23:12:43 -05:00
Deluan
69f5ede61d fix(plugins): use context-aware database execution in TaskQueue host service
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-28 23:12:43 -05:00
Deluan
7bb5e5d38b refactor(plugins): remove capability check for TaskWorker in TaskQueue host service
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-28 23:12:43 -05:00
Deluan
be538fcec8 fix(plugins): harden TaskQueue host service with validation and safety improvements
Add input validation (queue name length, payload size limits), extract
status string constants to eliminate raw SQL literals, make CreateQueue
idempotent via upsert for crash recovery, fix RetentionMs default check
for negative values, cap exponential backoff at 1 hour to prevent
overflow, and replace manual mutex-based delay enforcement with
rate.Limiter from golang.org/x/time/rate for correct concurrent worker
serialization.
2026-02-28 23:12:43 -05:00
Deluan
bf850d600e docs: document TaskQueue module for persistent task queues
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-28 23:12:43 -05:00
Deluan
e9f265d7c5 feat(plugins): add integration tests for TaskQueue host service 2026-02-28 23:12:43 -05:00
Deluan
f076a60def feat(plugins): add test-taskqueue plugin for integration testing 2026-02-28 23:12:43 -05:00
Deluan
ae8bd09bb3 feat(plugins): register TaskQueue host service in manager 2026-02-28 23:12:43 -05:00
Deluan
60a9fd61ba feat(plugins): require TaskWorker capability for taskqueue permission 2026-02-28 23:12:43 -05:00
Deluan
690203f192 feat(plugins): implement TaskQueue service with SQLite persistence and workers
Per-plugin SQLite database with queues and tasks tables. Worker goroutines
dequeue tasks and invoke nd_task_execute callback. Exponential backoff
retries, rate limiting via delayMs, automatic cleanup of terminal tasks.
2026-02-28 23:12:43 -05:00
Deluan
36f17920cd feat(plugins): add taskqueue permission to manifest schema
Add TaskQueuePermission with maxConcurrency option.
2026-02-28 23:12:43 -05:00
Deluan
d666b9ed0e feat(plugins): define TaskWorker capability for task execution callbacks 2026-02-28 23:12:43 -05:00
Deluan
b187312bce feat(plugins): define TaskQueue host service interface
Add the TaskQueueService interface with CreateQueue, Enqueue,
GetTaskStatus, and CancelTask methods plus QueueConfig struct.
2026-02-28 23:12:43 -05:00
Deluan Quintão
2471bb9cf6 feat(plugins): add TTL support, batch operations, and hardening to kvstore (#5127)
* feat(plugins): add expires_at column to kvstore schema

* feat(plugins): filter expired keys in kvstore Get, Has, List

* feat(plugins): add periodic cleanup of expired kvstore keys

* feat(plugins): add SetWithTTL, DeleteByPrefix, and GetMany to kvstore

Add three new methods to the KVStore host service:

- SetWithTTL: store key-value pairs with automatic expiration
- DeleteByPrefix: remove all keys matching a prefix in one operation
- GetMany: retrieve multiple values in a single call

All methods include comprehensive unit tests covering edge cases,
expiration behavior, size tracking, and LIKE-special characters.

* feat(plugins): regenerate code and update test plugin for new kvstore methods

Regenerate host function wrappers and PDK bindings for Go, Python,
and Rust. Update the test-kvstore plugin to exercise SetWithTTL,
DeleteByPrefix, and GetMany.

* feat(plugins): add integration tests for new kvstore methods

Add WASM integration tests for SetWithTTL, DeleteByPrefix, and GetMany
operations through the plugin boundary, verifying end-to-end behavior
including TTL expiration, prefix deletion, and batch retrieval.

* fix(plugins): address lint issues in kvstore implementation

Handle tx.Rollback error return and suppress gosec false positive
for parameterized SQL query construction in GetMany.

* fix(plugins): Set clears expires_at when overwriting a TTL'd key

Previously, calling Set() on a key that was stored with SetWithTTL()
would leave the expires_at value intact, causing the key to silently
expire even though Set implies permanent storage.

Also excludes expired keys from currentSize calculation at startup.

* refactor(plugins): simplify kvstore by removing in-memory size cache

Replaced the in-memory currentSize cache (atomic.Int64), periodic cleanup
timer, and mutex with direct database queries for storage accounting.
This eliminates race conditions and cache drift issues at negligible
performance cost for plugin-sized datasets. Also unified Set and
SetWithTTL into a shared setValue method, simplified DeleteByPrefix to
use RowsAffected instead of a transaction, and added an index on
expires_at for efficient expiration filtering.

* feat(plugins): add generic SQLite migration helper and refactor kvstore schema

Add a reusable migrateDB helper that tracks schema versions via SQLite's
PRAGMA user_version and applies pending migrations transactionally. Replace
the ad-hoc createKVStoreSchema function in kvstore with a declarative
migrations slice, making it easy to add future schema changes. Remove the
now-redundant schema migration test since migrateDB has its own test suite
and every kvstore test exercises the migrations implicitly.

Signed-off-by: Deluan <deluan@navidrome.org>

* fix(plugins): harden kvstore with explicit NULL handling, prefix validation, and cleanup timeout

- Use sql.NullString for expires_at to explicitly send NULL instead of
  relying on datetime('now', '') returning NULL by accident
- Reject empty prefix in DeleteByPrefix to prevent accidental data wipe
- Add 5s timeout context to cleanupExpired on Close
- Replace time.Sleep in unit tests with pre-expired timestamps

Signed-off-by: Deluan <deluan@navidrome.org>

* refactor(plugins): use batch processing in GetMany

Process keys in chunks of 200 using slice.CollectChunks to avoid
hitting SQLite's SQLITE_MAX_VARIABLE_NUMBER limit with large key sets.

* feat(plugins): add periodic cleanup goroutine for expired kvstore keys

Use the manager's context to control a background goroutine that purges
expired keys every hour, stopping naturally on shutdown when the context
is cancelled.

---------

Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-28 23:12:17 -05:00
35 changed files with 5027 additions and 158 deletions

View File

@@ -0,0 +1,27 @@
package capabilities
// TaskWorker provides task execution handling.
// This capability allows plugins to receive callbacks when their queued tasks
// are ready to execute. Plugins that use the taskqueue host service must
// implement this capability.
//
//nd:capability name=taskworker
type TaskWorker interface {
// OnTaskExecute is called when a queued task is ready to run.
// The returned string is a status/result message stored in the tasks table.
// Return an error to trigger retry (if retries are configured).
//nd:export name=nd_task_execute
OnTaskExecute(TaskExecuteRequest) (string, error)
}
// TaskExecuteRequest is the request provided when a task is ready to execute.
type TaskExecuteRequest struct {
// QueueName is the name of the queue this task belongs to.
QueueName string `json:"queueName"`
// TaskID is the unique identifier for this task.
TaskID string `json:"taskId"`
// Payload is the opaque data provided when the task was enqueued.
Payload []byte `json:"payload"`
// Attempt is the current attempt number (1-based: first attempt = 1).
Attempt int32 `json:"attempt"`
}

View File

@@ -0,0 +1,38 @@
version: v1-draft
exports:
nd_task_execute:
description: |-
OnTaskExecute is called when a queued task is ready to run.
The returned string is a status/result message stored in the tasks table.
Return an error to trigger retry (if retries are configured).
input:
$ref: '#/components/schemas/TaskExecuteRequest'
contentType: application/json
output:
type: string
contentType: application/json
components:
schemas:
TaskExecuteRequest:
description: TaskExecuteRequest is the request provided when a task is ready to execute.
properties:
queueName:
type: string
description: QueueName is the name of the queue this task belongs to.
taskId:
type: string
description: TaskID is the unique identifier for this task.
payload:
type: array
description: Payload is the opaque data provided when the task was enqueued.
items:
type: object
attempt:
type: integer
format: int32
description: 'Attempt is the current attempt number (1-based: first attempt = 1).'
required:
- queueName
- taskId
- payload
- attempt

View File

@@ -23,6 +23,20 @@ type KVStoreService interface {
//nd:hostfunc //nd:hostfunc
Set(ctx context.Context, key string, value []byte) error Set(ctx context.Context, key string, value []byte) error
// SetWithTTL stores a byte value with the given key and a time-to-live.
//
// After ttlSeconds, the key is treated as non-existent and will be
// cleaned up lazily. ttlSeconds must be greater than 0.
//
// Parameters:
// - key: The storage key (max 256 bytes, UTF-8)
// - value: The byte slice to store
// - ttlSeconds: Time-to-live in seconds (must be > 0)
//
// Returns an error if the storage limit would be exceeded or the operation fails.
//nd:hostfunc
SetWithTTL(ctx context.Context, key string, value []byte, ttlSeconds int64) error
// Get retrieves a byte value from storage. // Get retrieves a byte value from storage.
// //
// Parameters: // Parameters:
@@ -32,14 +46,15 @@ type KVStoreService interface {
//nd:hostfunc //nd:hostfunc
Get(ctx context.Context, key string) (value []byte, exists bool, err error) Get(ctx context.Context, key string) (value []byte, exists bool, err error)
// Delete removes a value from storage. // GetMany retrieves multiple values in a single call.
// //
// Parameters: // Parameters:
// - key: The storage key // - keys: The storage keys to retrieve
// //
// Returns an error if the operation fails. Does not return an error if the key doesn't exist. // Returns a map of key to value for keys that exist and have not expired.
// Missing or expired keys are omitted from the result.
//nd:hostfunc //nd:hostfunc
Delete(ctx context.Context, key string) error GetMany(ctx context.Context, keys []string) (values map[string][]byte, err error)
// Has checks if a key exists in storage. // Has checks if a key exists in storage.
// //
@@ -59,6 +74,24 @@ type KVStoreService interface {
//nd:hostfunc //nd:hostfunc
List(ctx context.Context, prefix string) (keys []string, err error) List(ctx context.Context, prefix string) (keys []string, err error)
// Delete removes a value from storage.
//
// Parameters:
// - key: The storage key
//
// Returns an error if the operation fails. Does not return an error if the key doesn't exist.
//nd:hostfunc
Delete(ctx context.Context, key string) error
// DeleteByPrefix removes all keys matching the given prefix.
//
// Parameters:
// - prefix: Key prefix to match (must not be empty)
//
// Returns the number of keys deleted. Includes expired keys.
//nd:hostfunc
DeleteByPrefix(ctx context.Context, prefix string) (deletedCount int64, err error)
// GetStorageUsed returns the total storage used by this plugin in bytes. // GetStorageUsed returns the total storage used by this plugin in bytes.
//nd:hostfunc //nd:hostfunc
GetStorageUsed(ctx context.Context) (bytes int64, err error) GetStorageUsed(ctx context.Context) (bytes int64, err error)

View File

@@ -20,6 +20,18 @@ type KVStoreSetResponse struct {
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
// KVStoreSetWithTTLRequest is the request type for KVStore.SetWithTTL.
type KVStoreSetWithTTLRequest struct {
Key string `json:"key"`
Value []byte `json:"value"`
TtlSeconds int64 `json:"ttlSeconds"`
}
// KVStoreSetWithTTLResponse is the response type for KVStore.SetWithTTL.
type KVStoreSetWithTTLResponse struct {
Error string `json:"error,omitempty"`
}
// KVStoreGetRequest is the request type for KVStore.Get. // KVStoreGetRequest is the request type for KVStore.Get.
type KVStoreGetRequest struct { type KVStoreGetRequest struct {
Key string `json:"key"` Key string `json:"key"`
@@ -32,14 +44,15 @@ type KVStoreGetResponse struct {
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
// KVStoreDeleteRequest is the request type for KVStore.Delete. // KVStoreGetManyRequest is the request type for KVStore.GetMany.
type KVStoreDeleteRequest struct { type KVStoreGetManyRequest struct {
Key string `json:"key"` Keys []string `json:"keys"`
} }
// KVStoreDeleteResponse is the response type for KVStore.Delete. // KVStoreGetManyResponse is the response type for KVStore.GetMany.
type KVStoreDeleteResponse struct { type KVStoreGetManyResponse struct {
Error string `json:"error,omitempty"` Values map[string][]byte `json:"values,omitempty"`
Error string `json:"error,omitempty"`
} }
// KVStoreHasRequest is the request type for KVStore.Has. // KVStoreHasRequest is the request type for KVStore.Has.
@@ -64,6 +77,27 @@ type KVStoreListResponse struct {
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
// KVStoreDeleteRequest is the request type for KVStore.Delete.
type KVStoreDeleteRequest struct {
Key string `json:"key"`
}
// KVStoreDeleteResponse is the response type for KVStore.Delete.
type KVStoreDeleteResponse struct {
Error string `json:"error,omitempty"`
}
// KVStoreDeleteByPrefixRequest is the request type for KVStore.DeleteByPrefix.
type KVStoreDeleteByPrefixRequest struct {
Prefix string `json:"prefix"`
}
// KVStoreDeleteByPrefixResponse is the response type for KVStore.DeleteByPrefix.
type KVStoreDeleteByPrefixResponse struct {
DeletedCount int64 `json:"deletedCount,omitempty"`
Error string `json:"error,omitempty"`
}
// KVStoreGetStorageUsedResponse is the response type for KVStore.GetStorageUsed. // KVStoreGetStorageUsedResponse is the response type for KVStore.GetStorageUsed.
type KVStoreGetStorageUsedResponse struct { type KVStoreGetStorageUsedResponse struct {
Bytes int64 `json:"bytes,omitempty"` Bytes int64 `json:"bytes,omitempty"`
@@ -75,10 +109,13 @@ type KVStoreGetStorageUsedResponse struct {
func RegisterKVStoreHostFunctions(service KVStoreService) []extism.HostFunction { func RegisterKVStoreHostFunctions(service KVStoreService) []extism.HostFunction {
return []extism.HostFunction{ return []extism.HostFunction{
newKVStoreSetHostFunction(service), newKVStoreSetHostFunction(service),
newKVStoreSetWithTTLHostFunction(service),
newKVStoreGetHostFunction(service), newKVStoreGetHostFunction(service),
newKVStoreDeleteHostFunction(service), newKVStoreGetManyHostFunction(service),
newKVStoreHasHostFunction(service), newKVStoreHasHostFunction(service),
newKVStoreListHostFunction(service), newKVStoreListHostFunction(service),
newKVStoreDeleteHostFunction(service),
newKVStoreDeleteByPrefixHostFunction(service),
newKVStoreGetStorageUsedHostFunction(service), newKVStoreGetStorageUsedHostFunction(service),
} }
} }
@@ -114,6 +151,37 @@ func newKVStoreSetHostFunction(service KVStoreService) extism.HostFunction {
) )
} }
func newKVStoreSetWithTTLHostFunction(service KVStoreService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"kvstore_setwithttl",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
kvstoreWriteError(p, stack, err)
return
}
var req KVStoreSetWithTTLRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
kvstoreWriteError(p, stack, err)
return
}
// Call the service method
if svcErr := service.SetWithTTL(ctx, req.Key, req.Value, req.TtlSeconds); svcErr != nil {
kvstoreWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := KVStoreSetWithTTLResponse{}
kvstoreWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newKVStoreGetHostFunction(service KVStoreService) extism.HostFunction { func newKVStoreGetHostFunction(service KVStoreService) extism.HostFunction {
return extism.NewHostFunctionWithStack( return extism.NewHostFunctionWithStack(
"kvstore_get", "kvstore_get",
@@ -149,9 +217,9 @@ func newKVStoreGetHostFunction(service KVStoreService) extism.HostFunction {
) )
} }
func newKVStoreDeleteHostFunction(service KVStoreService) extism.HostFunction { func newKVStoreGetManyHostFunction(service KVStoreService) extism.HostFunction {
return extism.NewHostFunctionWithStack( return extism.NewHostFunctionWithStack(
"kvstore_delete", "kvstore_getmany",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory // Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0]) reqBytes, err := p.ReadBytes(stack[0])
@@ -159,20 +227,23 @@ func newKVStoreDeleteHostFunction(service KVStoreService) extism.HostFunction {
kvstoreWriteError(p, stack, err) kvstoreWriteError(p, stack, err)
return return
} }
var req KVStoreDeleteRequest var req KVStoreGetManyRequest
if err := json.Unmarshal(reqBytes, &req); err != nil { if err := json.Unmarshal(reqBytes, &req); err != nil {
kvstoreWriteError(p, stack, err) kvstoreWriteError(p, stack, err)
return return
} }
// Call the service method // Call the service method
if svcErr := service.Delete(ctx, req.Key); svcErr != nil { values, svcErr := service.GetMany(ctx, req.Keys)
if svcErr != nil {
kvstoreWriteError(p, stack, svcErr) kvstoreWriteError(p, stack, svcErr)
return return
} }
// Write JSON response to plugin memory // Write JSON response to plugin memory
resp := KVStoreDeleteResponse{} resp := KVStoreGetManyResponse{
Values: values,
}
kvstoreWriteResponse(p, stack, resp) kvstoreWriteResponse(p, stack, resp)
}, },
[]extism.ValueType{extism.ValueTypePTR}, []extism.ValueType{extism.ValueTypePTR},
@@ -248,6 +319,71 @@ func newKVStoreListHostFunction(service KVStoreService) extism.HostFunction {
) )
} }
func newKVStoreDeleteHostFunction(service KVStoreService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"kvstore_delete",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
kvstoreWriteError(p, stack, err)
return
}
var req KVStoreDeleteRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
kvstoreWriteError(p, stack, err)
return
}
// Call the service method
if svcErr := service.Delete(ctx, req.Key); svcErr != nil {
kvstoreWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := KVStoreDeleteResponse{}
kvstoreWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newKVStoreDeleteByPrefixHostFunction(service KVStoreService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"kvstore_deletebyprefix",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
kvstoreWriteError(p, stack, err)
return
}
var req KVStoreDeleteByPrefixRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
kvstoreWriteError(p, stack, err)
return
}
// Call the service method
deletedcount, svcErr := service.DeleteByPrefix(ctx, req.Prefix)
if svcErr != nil {
kvstoreWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := KVStoreDeleteByPrefixResponse{
DeletedCount: deletedcount,
}
kvstoreWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newKVStoreGetStorageUsedHostFunction(service KVStoreService) extism.HostFunction { func newKVStoreGetStorageUsedHostFunction(service KVStoreService) extism.HostFunction {
return extism.NewHostFunctionWithStack( return extism.NewHostFunctionWithStack(
"kvstore_getstorageused", "kvstore_getstorageused",

73
plugins/host/task.go Normal file
View File

@@ -0,0 +1,73 @@
package host
import "context"
// TaskInfo holds the current state of a task.
type TaskInfo struct {
// Status is the current task status: "pending", "running",
// "completed", "failed", or "cancelled".
Status string `json:"status"`
// Message is the status/result message returned by the plugin callback.
Message string `json:"message"`
// Attempt is the current or last attempt number (1-based).
Attempt int32 `json:"attempt"`
}
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
// Concurrency is the max number of parallel workers. Default: 1.
// Capped by the plugin's manifest maxConcurrency.
Concurrency int32 `json:"concurrency"`
// MaxRetries is the number of times to retry a failed task. Default: 0.
MaxRetries int32 `json:"maxRetries"`
// BackoffMs is the initial backoff between retries in milliseconds.
// Doubles each retry (exponential: backoffMs * 2^(attempt-1)). Default: 1000.
BackoffMs int64 `json:"backoffMs"`
// DelayMs is the minimum delay between starting consecutive tasks
// in milliseconds. Useful for rate limiting. Default: 0.
DelayMs int64 `json:"delayMs"`
// RetentionMs is how long completed/failed/cancelled tasks are kept
// in milliseconds. Default: 3600000 (1h). Min: 60000 (1m). Max: 604800000 (1w).
RetentionMs int64 `json:"retentionMs"`
}
// TaskService provides persistent task queues for plugins.
//
// This service allows plugins to create named queues with configurable concurrency,
// retry policies, and rate limiting. Tasks are persisted to SQLite and survive
// server restarts. When a task is ready to execute, the host calls the plugin's
// nd_task_execute callback function.
//
//nd:hostservice name=Task permission=taskqueue
type TaskService interface {
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
//nd:hostfunc
CreateQueue(ctx context.Context, name string, config QueueConfig) error
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
//nd:hostfunc
Enqueue(ctx context.Context, queueName string, payload []byte) (string, error)
// Get returns the current state of a task including its status,
// message, and attempt count.
//nd:hostfunc
Get(ctx context.Context, taskID string) (*TaskInfo, error)
// Cancel cancels a pending task. Returns error if already
// running, completed, or failed.
//nd:hostfunc
Cancel(ctx context.Context, taskID string) error
// ClearQueue removes all pending tasks from the named queue.
// Running tasks are not affected. Returns the number of tasks removed.
//nd:hostfunc
ClearQueue(ctx context.Context, queueName string) (int64, error)
}

266
plugins/host/task_gen.go Normal file
View File

@@ -0,0 +1,266 @@
// Code generated by ndpgen. DO NOT EDIT.
package host
import (
"context"
"encoding/json"
extism "github.com/extism/go-sdk"
)
// TaskCreateQueueRequest is the request type for Task.CreateQueue.
type TaskCreateQueueRequest struct {
Name string `json:"name"`
Config QueueConfig `json:"config"`
}
// TaskCreateQueueResponse is the response type for Task.CreateQueue.
type TaskCreateQueueResponse struct {
Error string `json:"error,omitempty"`
}
// TaskEnqueueRequest is the request type for Task.Enqueue.
type TaskEnqueueRequest struct {
QueueName string `json:"queueName"`
Payload []byte `json:"payload"`
}
// TaskEnqueueResponse is the response type for Task.Enqueue.
type TaskEnqueueResponse struct {
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
// TaskGetRequest is the request type for Task.Get.
type TaskGetRequest struct {
TaskID string `json:"taskId"`
}
// TaskGetResponse is the response type for Task.Get.
type TaskGetResponse struct {
Result *TaskInfo `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
// TaskCancelRequest is the request type for Task.Cancel.
type TaskCancelRequest struct {
TaskID string `json:"taskId"`
}
// TaskCancelResponse is the response type for Task.Cancel.
type TaskCancelResponse struct {
Error string `json:"error,omitempty"`
}
// TaskClearQueueRequest is the request type for Task.ClearQueue.
type TaskClearQueueRequest struct {
QueueName string `json:"queueName"`
}
// TaskClearQueueResponse is the response type for Task.ClearQueue.
type TaskClearQueueResponse struct {
Result int64 `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
// RegisterTaskHostFunctions registers Task service host functions.
// The returned host functions should be added to the plugin's configuration.
func RegisterTaskHostFunctions(service TaskService) []extism.HostFunction {
return []extism.HostFunction{
newTaskCreateQueueHostFunction(service),
newTaskEnqueueHostFunction(service),
newTaskGetHostFunction(service),
newTaskCancelHostFunction(service),
newTaskClearQueueHostFunction(service),
}
}
func newTaskCreateQueueHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_createqueue",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskCreateQueueRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
if svcErr := service.CreateQueue(ctx, req.Name, req.Config); svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskCreateQueueResponse{}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskEnqueueHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_enqueue",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskEnqueueRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
result, svcErr := service.Enqueue(ctx, req.QueueName, req.Payload)
if svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskEnqueueResponse{
Result: result,
}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskGetHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_get",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskGetRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
result, svcErr := service.Get(ctx, req.TaskID)
if svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskGetResponse{
Result: result,
}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskCancelHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_cancel",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskCancelRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
if svcErr := service.Cancel(ctx, req.TaskID); svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskCancelResponse{}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskClearQueueHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_clearqueue",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskClearQueueRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
result, svcErr := service.ClearQueue(ctx, req.QueueName)
if svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskClearQueueResponse{
Result: result,
}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
// taskWriteResponse writes a JSON response to plugin memory.
func taskWriteResponse(p *extism.CurrentPlugin, stack []uint64, resp any) {
respBytes, err := json.Marshal(resp)
if err != nil {
taskWriteError(p, stack, err)
return
}
respPtr, err := p.WriteBytes(respBytes)
if err != nil {
stack[0] = 0
return
}
stack[0] = respPtr
}
// taskWriteError writes an error response to plugin memory.
func taskWriteError(p *extism.CurrentPlugin, stack []uint64, err error) {
errResp := struct {
Error string `json:"error"`
}{Error: err.Error()}
respBytes, _ := json.Marshal(errResp)
respPtr, _ := p.WriteBytes(respBytes)
stack[0] = respPtr
}

View File

@@ -7,14 +7,16 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"slices"
"strings" "strings"
"sync/atomic" "time"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
"github.com/navidrome/navidrome/conf" "github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/log" "github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/plugins/host" "github.com/navidrome/navidrome/plugins/host"
"github.com/navidrome/navidrome/utils/slice"
) )
const ( const (
@@ -22,17 +24,22 @@ const (
maxKeyLength = 256 // Max key length in bytes maxKeyLength = 256 // Max key length in bytes
) )
// notExpiredFilter is the SQL condition to exclude expired keys.
const notExpiredFilter = "(expires_at IS NULL OR expires_at > datetime('now'))"
const cleanupInterval = 1 * time.Hour
// kvstoreServiceImpl implements the host.KVStoreService interface. // kvstoreServiceImpl implements the host.KVStoreService interface.
// Each plugin gets its own SQLite database for isolation. // Each plugin gets its own SQLite database for isolation.
type kvstoreServiceImpl struct { type kvstoreServiceImpl struct {
pluginName string pluginName string
db *sql.DB db *sql.DB
maxSize int64 maxSize int64
currentSize atomic.Int64 // cached total size, updated on Set/Delete
} }
// newKVStoreService creates a new kvstoreServiceImpl instance with its own SQLite database. // newKVStoreService creates a new kvstoreServiceImpl instance with its own SQLite database.
func newKVStoreService(pluginName string, perm *KVStorePermission) (*kvstoreServiceImpl, error) { // The provided context controls the lifetime of the background cleanup goroutine.
func newKVStoreService(ctx context.Context, pluginName string, perm *KVStorePermission) (*kvstoreServiceImpl, error) {
// Parse max size from permission, default to 1MB // Parse max size from permission, default to 1MB
maxSize := int64(defaultMaxKVStoreSize) maxSize := int64(defaultMaxKVStoreSize)
if perm != nil && perm.MaxSize != nil && *perm.MaxSize != "" { if perm != nil && perm.MaxSize != nil && *perm.MaxSize != "" {
@@ -59,46 +66,69 @@ func newKVStoreService(pluginName string, perm *KVStorePermission) (*kvstoreServ
db.SetMaxOpenConns(3) db.SetMaxOpenConns(3)
db.SetMaxIdleConns(1) db.SetMaxIdleConns(1)
// Create schema // Apply schema migrations
if err := createKVStoreSchema(db); err != nil { if err := createKVStoreSchema(db); err != nil {
db.Close() db.Close()
return nil, fmt.Errorf("creating kvstore schema: %w", err) return nil, fmt.Errorf("migrating kvstore schema: %w", err)
} }
// Load current storage size from database log.Debug("Initialized plugin kvstore", "plugin", pluginName, "path", dbPath, "maxSize", humanize.Bytes(uint64(maxSize)))
var currentSize int64
if err := db.QueryRow(`SELECT COALESCE(SUM(size), 0) FROM kvstore`).Scan(&currentSize); err != nil {
db.Close()
return nil, fmt.Errorf("loading storage size: %w", err)
}
log.Debug("Initialized plugin kvstore", "plugin", pluginName, "path", dbPath, "maxSize", humanize.Bytes(uint64(maxSize)), "currentSize", humanize.Bytes(uint64(currentSize)))
svc := &kvstoreServiceImpl{ svc := &kvstoreServiceImpl{
pluginName: pluginName, pluginName: pluginName,
db: db, db: db,
maxSize: maxSize, maxSize: maxSize,
} }
svc.currentSize.Store(currentSize) go svc.cleanupLoop(ctx)
return svc, nil return svc, nil
} }
// createKVStoreSchema applies schema migrations to the kvstore database.
// New migrations must be appended at the end of the slice.
func createKVStoreSchema(db *sql.DB) error { func createKVStoreSchema(db *sql.DB) error {
_, err := db.Exec(` return migrateDB(db, []string{
CREATE TABLE IF NOT EXISTS kvstore ( `CREATE TABLE IF NOT EXISTS kvstore (
key TEXT PRIMARY KEY NOT NULL, key TEXT PRIMARY KEY NOT NULL,
value BLOB NOT NULL, value BLOB NOT NULL,
size INTEGER NOT NULL, size INTEGER NOT NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
) )`,
`) `ALTER TABLE kvstore ADD COLUMN expires_at DATETIME DEFAULT NULL`,
return err `CREATE INDEX idx_kvstore_expires_at ON kvstore(expires_at)`,
})
} }
// Set stores a byte value with the given key. // storageUsed returns the current total storage used by non-expired keys.
func (s *kvstoreServiceImpl) Set(ctx context.Context, key string, value []byte) error { func (s *kvstoreServiceImpl) storageUsed(ctx context.Context) (int64, error) {
// Validate key var used int64
err := s.db.QueryRowContext(ctx, `SELECT COALESCE(SUM(size), 0) FROM kvstore WHERE `+notExpiredFilter).Scan(&used)
if err != nil {
return 0, fmt.Errorf("calculating storage used: %w", err)
}
return used, nil
}
// checkStorageLimit verifies that adding delta bytes would not exceed the storage limit.
func (s *kvstoreServiceImpl) checkStorageLimit(ctx context.Context, delta int64) error {
if delta <= 0 {
return nil
}
used, err := s.storageUsed(ctx)
if err != nil {
return err
}
newTotal := used + delta
if newTotal > s.maxSize {
return fmt.Errorf("storage limit exceeded: would use %s of %s allowed",
humanize.Bytes(uint64(newTotal)), humanize.Bytes(uint64(s.maxSize)))
}
return nil
}
// setValue is the shared implementation for Set and SetWithTTL.
// A ttlSeconds of 0 means no expiration.
func (s *kvstoreServiceImpl) setValue(ctx context.Context, key string, value []byte, ttlSeconds int64) error {
if len(key) == 0 { if len(key) == 0 {
return fmt.Errorf("key cannot be empty") return fmt.Errorf("key cannot be empty")
} }
@@ -108,46 +138,59 @@ func (s *kvstoreServiceImpl) Set(ctx context.Context, key string, value []byte)
newValueSize := int64(len(value)) newValueSize := int64(len(value))
// Get current size of this key (if it exists) to calculate delta // Get current size of this key (if it exists and not expired) to calculate delta
var oldSize int64 var oldSize int64
err := s.db.QueryRowContext(ctx, `SELECT COALESCE(size, 0) FROM kvstore WHERE key = ?`, key).Scan(&oldSize) err := s.db.QueryRowContext(ctx, `SELECT COALESCE(size, 0) FROM kvstore WHERE key = ? AND `+notExpiredFilter, key).Scan(&oldSize)
if err != nil && !errors.Is(err, sql.ErrNoRows) { if err != nil && !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("checking existing key: %w", err) return fmt.Errorf("checking existing key: %w", err)
} }
// Check size limits using cached total if err := s.checkStorageLimit(ctx, newValueSize-oldSize); err != nil {
delta := newValueSize - oldSize return err
newTotal := s.currentSize.Load() + delta }
if newTotal > s.maxSize {
return fmt.Errorf("storage limit exceeded: would use %s of %s allowed", // Compute expires_at: sql.NullString{Valid:false} sends NULL (no expiration),
humanize.Bytes(uint64(newTotal)), humanize.Bytes(uint64(s.maxSize))) // otherwise we send a concrete timestamp.
var expiresAt sql.NullString
if ttlSeconds > 0 {
expiresAt = sql.NullString{String: fmt.Sprintf("+%d seconds", ttlSeconds), Valid: true}
} }
// Upsert the value
_, err = s.db.ExecContext(ctx, ` _, err = s.db.ExecContext(ctx, `
INSERT INTO kvstore (key, value, size, created_at, updated_at) INSERT INTO kvstore (key, value, size, created_at, updated_at, expires_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, datetime('now', ?))
ON CONFLICT(key) DO UPDATE SET ON CONFLICT(key) DO UPDATE SET
value = excluded.value, value = excluded.value,
size = excluded.size, size = excluded.size,
updated_at = CURRENT_TIMESTAMP updated_at = CURRENT_TIMESTAMP,
`, key, value, newValueSize) expires_at = excluded.expires_at
`, key, value, newValueSize, expiresAt)
if err != nil { if err != nil {
return fmt.Errorf("storing value: %w", err) return fmt.Errorf("storing value: %w", err)
} }
// Update cached size log.Trace(ctx, "KVStore.Set", "plugin", s.pluginName, "key", key, "size", newValueSize, "ttlSeconds", ttlSeconds)
s.currentSize.Add(delta)
log.Trace(ctx, "KVStore.Set", "plugin", s.pluginName, "key", key, "size", newValueSize)
return nil return nil
} }
// Set stores a byte value with the given key.
func (s *kvstoreServiceImpl) Set(ctx context.Context, key string, value []byte) error {
return s.setValue(ctx, key, value, 0)
}
// SetWithTTL stores a byte value with the given key and a time-to-live.
func (s *kvstoreServiceImpl) SetWithTTL(ctx context.Context, key string, value []byte, ttlSeconds int64) error {
if ttlSeconds <= 0 {
return fmt.Errorf("ttlSeconds must be greater than 0")
}
return s.setValue(ctx, key, value, ttlSeconds)
}
// Get retrieves a byte value from storage. // Get retrieves a byte value from storage.
func (s *kvstoreServiceImpl) Get(ctx context.Context, key string) ([]byte, bool, error) { func (s *kvstoreServiceImpl) Get(ctx context.Context, key string) ([]byte, bool, error) {
var value []byte var value []byte
err := s.db.QueryRowContext(ctx, `SELECT value FROM kvstore WHERE key = ?`, key).Scan(&value) err := s.db.QueryRowContext(ctx, `SELECT value FROM kvstore WHERE key = ? AND `+notExpiredFilter, key).Scan(&value)
if err == sql.ErrNoRows { if errors.Is(err, sql.ErrNoRows) {
return nil, false, nil return nil, false, nil
} }
if err != nil { if err != nil {
@@ -160,25 +203,11 @@ func (s *kvstoreServiceImpl) Get(ctx context.Context, key string) ([]byte, bool,
// Delete removes a value from storage. // Delete removes a value from storage.
func (s *kvstoreServiceImpl) Delete(ctx context.Context, key string) error { func (s *kvstoreServiceImpl) Delete(ctx context.Context, key string) error {
// Get size of the key being deleted to update cache _, err := s.db.ExecContext(ctx, `DELETE FROM kvstore WHERE key = ?`, key)
var oldSize int64
err := s.db.QueryRowContext(ctx, `SELECT size FROM kvstore WHERE key = ?`, key).Scan(&oldSize)
if errors.Is(err, sql.ErrNoRows) {
// Key doesn't exist, nothing to delete
return nil
}
if err != nil {
return fmt.Errorf("checking key size: %w", err)
}
_, err = s.db.ExecContext(ctx, `DELETE FROM kvstore WHERE key = ?`, key)
if err != nil { if err != nil {
return fmt.Errorf("deleting value: %w", err) return fmt.Errorf("deleting value: %w", err)
} }
// Update cached size
s.currentSize.Add(-oldSize)
log.Trace(ctx, "KVStore.Delete", "plugin", s.pluginName, "key", key) log.Trace(ctx, "KVStore.Delete", "plugin", s.pluginName, "key", key)
return nil return nil
} }
@@ -186,7 +215,7 @@ func (s *kvstoreServiceImpl) Delete(ctx context.Context, key string) error {
// Has checks if a key exists in storage. // Has checks if a key exists in storage.
func (s *kvstoreServiceImpl) Has(ctx context.Context, key string) (bool, error) { func (s *kvstoreServiceImpl) Has(ctx context.Context, key string) (bool, error) {
var count int var count int
err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM kvstore WHERE key = ?`, key).Scan(&count) err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM kvstore WHERE key = ? AND `+notExpiredFilter, key).Scan(&count)
if err != nil { if err != nil {
return false, fmt.Errorf("checking key: %w", err) return false, fmt.Errorf("checking key: %w", err)
} }
@@ -200,12 +229,12 @@ func (s *kvstoreServiceImpl) List(ctx context.Context, prefix string) ([]string,
var err error var err error
if prefix == "" { if prefix == "" {
rows, err = s.db.QueryContext(ctx, `SELECT key FROM kvstore ORDER BY key`) rows, err = s.db.QueryContext(ctx, `SELECT key FROM kvstore WHERE `+notExpiredFilter+` ORDER BY key`)
} else { } else {
// Escape special LIKE characters in prefix // Escape special LIKE characters in prefix
escapedPrefix := strings.ReplaceAll(prefix, "%", "\\%") escapedPrefix := strings.ReplaceAll(prefix, "%", "\\%")
escapedPrefix = strings.ReplaceAll(escapedPrefix, "_", "\\_") escapedPrefix = strings.ReplaceAll(escapedPrefix, "_", "\\_")
rows, err = s.db.QueryContext(ctx, `SELECT key FROM kvstore WHERE key LIKE ? ESCAPE '\' ORDER BY key`, escapedPrefix+"%") rows, err = s.db.QueryContext(ctx, `SELECT key FROM kvstore WHERE key LIKE ? ESCAPE '\' AND `+notExpiredFilter+` ORDER BY key`, escapedPrefix+"%")
} }
if err != nil { if err != nil {
return nil, fmt.Errorf("listing keys: %w", err) return nil, fmt.Errorf("listing keys: %w", err)
@@ -231,16 +260,113 @@ func (s *kvstoreServiceImpl) List(ctx context.Context, prefix string) ([]string,
// GetStorageUsed returns the total storage used by this plugin in bytes. // GetStorageUsed returns the total storage used by this plugin in bytes.
func (s *kvstoreServiceImpl) GetStorageUsed(ctx context.Context) (int64, error) { func (s *kvstoreServiceImpl) GetStorageUsed(ctx context.Context) (int64, error) {
used := s.currentSize.Load() used, err := s.storageUsed(ctx)
if err != nil {
return 0, err
}
log.Trace(ctx, "KVStore.GetStorageUsed", "plugin", s.pluginName, "bytes", used) log.Trace(ctx, "KVStore.GetStorageUsed", "plugin", s.pluginName, "bytes", used)
return used, nil return used, nil
} }
// Close closes the SQLite database connection. // DeleteByPrefix removes all keys matching the given prefix.
// This is called when the plugin is unloaded. func (s *kvstoreServiceImpl) DeleteByPrefix(ctx context.Context, prefix string) (int64, error) {
if prefix == "" {
return 0, fmt.Errorf("prefix cannot be empty")
}
escapedPrefix := strings.ReplaceAll(prefix, "%", "\\%")
escapedPrefix = strings.ReplaceAll(escapedPrefix, "_", "\\_")
result, err := s.db.ExecContext(ctx, `DELETE FROM kvstore WHERE key LIKE ? ESCAPE '\'`, escapedPrefix+"%")
if err != nil {
return 0, fmt.Errorf("deleting keys: %w", err)
}
count, err := result.RowsAffected()
if err != nil {
return 0, fmt.Errorf("getting deleted count: %w", err)
}
log.Trace(ctx, "KVStore.DeleteByPrefix", "plugin", s.pluginName, "prefix", prefix, "deletedCount", count)
return count, nil
}
// GetMany retrieves multiple values in a single call, processing keys in batches.
func (s *kvstoreServiceImpl) GetMany(ctx context.Context, keys []string) (map[string][]byte, error) {
if len(keys) == 0 {
return map[string][]byte{}, nil
}
const batchSize = 200
result := make(map[string][]byte)
for chunk := range slice.CollectChunks(slices.Values(keys), batchSize) {
placeholders := make([]string, len(chunk))
args := make([]any, len(chunk))
for i, key := range chunk {
placeholders[i] = "?"
args[i] = key
}
query := `SELECT key, value FROM kvstore WHERE key IN (` + strings.Join(placeholders, ",") + `) AND ` + notExpiredFilter //nolint:gosec // placeholders are always "?"
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("querying values: %w", err)
}
for rows.Next() {
var key string
var value []byte
if err := rows.Scan(&key, &value); err != nil {
rows.Close()
return nil, fmt.Errorf("scanning value: %w", err)
}
result[key] = value
}
if err := rows.Err(); err != nil {
rows.Close()
return nil, fmt.Errorf("iterating values: %w", err)
}
rows.Close()
}
log.Trace(ctx, "KVStore.GetMany", "plugin", s.pluginName, "requested", len(keys), "found", len(result))
return result, nil
}
// cleanupLoop periodically removes expired keys from the database.
// It stops when the provided context is cancelled.
func (s *kvstoreServiceImpl) cleanupLoop(ctx context.Context) {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.cleanupExpired(ctx)
}
}
}
// cleanupExpired removes all expired keys from the database to reclaim disk space.
func (s *kvstoreServiceImpl) cleanupExpired(ctx context.Context) {
result, err := s.db.ExecContext(ctx, `DELETE FROM kvstore WHERE expires_at IS NOT NULL AND expires_at <= datetime('now')`)
if err != nil {
log.Error(ctx, "KVStore cleanup: failed to delete expired keys", "plugin", s.pluginName, err)
return
}
if count, err := result.RowsAffected(); err == nil && count > 0 {
log.Debug("KVStore cleanup completed", "plugin", s.pluginName, "deletedKeys", count)
}
}
// Close runs a final cleanup and closes the SQLite database connection.
// The cleanup goroutine is stopped by the context passed to newKVStoreService.
func (s *kvstoreServiceImpl) Close() error { func (s *kvstoreServiceImpl) Close() error {
if s.db != nil { if s.db != nil {
log.Debug("Closing plugin kvstore", "plugin", s.pluginName) log.Debug("Closing plugin kvstore", "plugin", s.pluginName)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
s.cleanupExpired(ctx)
return s.db.Close() return s.db.Close()
} }
return nil return nil

View File

@@ -12,6 +12,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
"github.com/navidrome/navidrome/conf" "github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/conf/configtest" "github.com/navidrome/navidrome/conf/configtest"
@@ -37,7 +38,7 @@ var _ = Describe("KVStoreService", func() {
// Create service with 1KB limit for testing // Create service with 1KB limit for testing
maxSize := "1KB" maxSize := "1KB"
service, err = newKVStoreService("test_plugin", &KVStorePermission{MaxSize: &maxSize}) service, err = newKVStoreService(ctx, "test_plugin", &KVStorePermission{MaxSize: &maxSize})
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
@@ -253,7 +254,7 @@ var _ = Describe("KVStoreService", func() {
Expect(service.Close()).To(Succeed()) Expect(service.Close()).To(Succeed())
maxSize := "1KB" maxSize := "1KB"
service2, err := newKVStoreService("test_plugin", &KVStorePermission{MaxSize: &maxSize}) service2, err := newKVStoreService(ctx, "test_plugin", &KVStorePermission{MaxSize: &maxSize})
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
defer service2.Close() defer service2.Close()
@@ -302,7 +303,7 @@ var _ = Describe("KVStoreService", func() {
Describe("Plugin Isolation", func() { Describe("Plugin Isolation", func() {
It("isolates data between plugins", func() { It("isolates data between plugins", func() {
service2, err := newKVStoreService("other_plugin", &KVStorePermission{}) service2, err := newKVStoreService(ctx, "other_plugin", &KVStorePermission{})
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
defer service2.Close() defer service2.Close()
@@ -321,7 +322,7 @@ var _ = Describe("KVStoreService", func() {
}) })
It("creates separate database files per plugin", func() { It("creates separate database files per plugin", func() {
service2, err := newKVStoreService("other_plugin", &KVStorePermission{}) service2, err := newKVStoreService(ctx, "other_plugin", &KVStorePermission{})
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
defer service2.Close() defer service2.Close()
@@ -343,6 +344,309 @@ var _ = Describe("KVStoreService", func() {
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Describe("TTL Expiration", func() {
It("Get returns not-exists for expired keys", func() {
_, err := service.db.Exec(`
INSERT INTO kvstore (key, value, size, expires_at)
VALUES ('expired_key', 'old', 3, datetime('now', '-1 seconds'))
`)
Expect(err).ToNot(HaveOccurred())
value, exists, err := service.Get(ctx, "expired_key")
Expect(err).ToNot(HaveOccurred())
Expect(exists).To(BeFalse())
Expect(value).To(BeNil())
})
It("Has returns false for expired keys", func() {
_, err := service.db.Exec(`
INSERT INTO kvstore (key, value, size, expires_at)
VALUES ('expired_has', 'old', 3, datetime('now', '-1 seconds'))
`)
Expect(err).ToNot(HaveOccurred())
exists, err := service.Has(ctx, "expired_has")
Expect(err).ToNot(HaveOccurred())
Expect(exists).To(BeFalse())
})
It("List excludes expired keys", func() {
Expect(service.Set(ctx, "live:1", []byte("alive"))).To(Succeed())
_, err := service.db.Exec(`
INSERT INTO kvstore (key, value, size, expires_at)
VALUES ('live:expired', 'dead', 4, datetime('now', '-1 seconds'))
`)
Expect(err).ToNot(HaveOccurred())
keys, err := service.List(ctx, "live:")
Expect(err).ToNot(HaveOccurred())
Expect(keys).To(HaveLen(1))
Expect(keys).To(ContainElement("live:1"))
})
It("Get returns value for non-expired keys with TTL", func() {
_, err := service.db.Exec(`
INSERT INTO kvstore (key, value, size, expires_at)
VALUES ('future_key', 'still alive', 11, datetime('now', '+3600 seconds'))
`)
Expect(err).ToNot(HaveOccurred())
value, exists, err := service.Get(ctx, "future_key")
Expect(err).ToNot(HaveOccurred())
Expect(exists).To(BeTrue())
Expect(value).To(Equal([]byte("still alive")))
})
It("Set clears expires_at from a key previously set with TTL", func() {
// Insert a key with a TTL that has already expired
_, err := service.db.Exec(`
INSERT INTO kvstore (key, value, size, expires_at)
VALUES ('ttl_then_set', 'temp', 4, datetime('now', '-1 seconds'))
`)
Expect(err).ToNot(HaveOccurred())
// Overwrite with Set (no TTL) — should become permanent
err = service.Set(ctx, "ttl_then_set", []byte("permanent"))
Expect(err).ToNot(HaveOccurred())
// Should exist because Set cleared expires_at
value, exists, err := service.Get(ctx, "ttl_then_set")
Expect(err).ToNot(HaveOccurred())
Expect(exists).To(BeTrue())
Expect(value).To(Equal([]byte("permanent")))
// Verify expires_at is actually NULL
var expiresAt *string
Expect(service.db.QueryRow(`SELECT expires_at FROM kvstore WHERE key = 'ttl_then_set'`).Scan(&expiresAt)).To(Succeed())
Expect(expiresAt).To(BeNil())
})
It("expired keys are not counted in storage used", func() {
_, err := service.db.Exec(`
INSERT INTO kvstore (key, value, size, expires_at)
VALUES ('expired_key', '12345', 5, datetime('now', '-1 seconds'))
`)
Expect(err).ToNot(HaveOccurred())
// Expired keys should not be counted
used, err := service.GetStorageUsed(ctx)
Expect(err).ToNot(HaveOccurred())
Expect(used).To(Equal(int64(0)))
})
It("cleanup removes expired rows from disk", func() {
_, err := service.db.Exec(`
INSERT INTO kvstore (key, value, size, expires_at)
VALUES ('cleanup_me', '12345', 5, datetime('now', '-1 seconds'))
`)
Expect(err).ToNot(HaveOccurred())
// Row exists in DB but is logically expired
var count int
Expect(service.db.QueryRow(`SELECT COUNT(*) FROM kvstore`).Scan(&count)).To(Succeed())
Expect(count).To(Equal(1))
service.cleanupExpired(ctx)
// Row should be physically deleted
Expect(service.db.QueryRow(`SELECT COUNT(*) FROM kvstore`).Scan(&count)).To(Succeed())
Expect(count).To(Equal(0))
})
})
Describe("SetWithTTL", func() {
It("stores value that is retrievable before expiry", func() {
err := service.SetWithTTL(ctx, "ttl_key", []byte("ttl_value"), 3600)
Expect(err).ToNot(HaveOccurred())
value, exists, err := service.Get(ctx, "ttl_key")
Expect(err).ToNot(HaveOccurred())
Expect(exists).To(BeTrue())
Expect(value).To(Equal([]byte("ttl_value")))
})
It("value is not retrievable after expiry", func() {
// Insert a key with an already-expired TTL
_, err := service.db.Exec(`
INSERT INTO kvstore (key, value, size, expires_at)
VALUES ('short_ttl', 'gone_soon', 9, datetime('now', '-1 seconds'))
`)
Expect(err).ToNot(HaveOccurred())
_, exists, err := service.Get(ctx, "short_ttl")
Expect(err).ToNot(HaveOccurred())
Expect(exists).To(BeFalse())
})
It("rejects ttlSeconds <= 0", func() {
err := service.SetWithTTL(ctx, "bad_ttl", []byte("value"), 0)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("ttlSeconds must be greater than 0"))
err = service.SetWithTTL(ctx, "bad_ttl", []byte("value"), -5)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("ttlSeconds must be greater than 0"))
})
It("validates key same as Set", func() {
err := service.SetWithTTL(ctx, "", []byte("value"), 60)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("key cannot be empty"))
})
It("enforces size limits same as Set", func() {
bigValue := make([]byte, 2048)
err := service.SetWithTTL(ctx, "big_ttl", bigValue, 60)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("storage limit exceeded"))
})
It("overwrites existing key and updates TTL", func() {
// Insert a key with an already-expired TTL
_, err := service.db.Exec(`
INSERT INTO kvstore (key, value, size, expires_at)
VALUES ('overwrite_ttl', 'first', 5, datetime('now', '-1 seconds'))
`)
Expect(err).ToNot(HaveOccurred())
// Overwrite with a long TTL — should be retrievable
err = service.SetWithTTL(ctx, "overwrite_ttl", []byte("second"), 3600)
Expect(err).ToNot(HaveOccurred())
value, exists, err := service.Get(ctx, "overwrite_ttl")
Expect(err).ToNot(HaveOccurred())
Expect(exists).To(BeTrue())
Expect(value).To(Equal([]byte("second")))
})
It("tracks storage correctly", func() {
err := service.SetWithTTL(ctx, "sized_ttl", []byte("12345"), 3600)
Expect(err).ToNot(HaveOccurred())
used, err := service.GetStorageUsed(ctx)
Expect(err).ToNot(HaveOccurred())
Expect(used).To(Equal(int64(5)))
})
})
Describe("DeleteByPrefix", func() {
BeforeEach(func() {
Expect(service.Set(ctx, "cache:user:1", []byte("Alice"))).To(Succeed())
Expect(service.Set(ctx, "cache:user:2", []byte("Bob"))).To(Succeed())
Expect(service.Set(ctx, "cache:item:1", []byte("Widget"))).To(Succeed())
Expect(service.Set(ctx, "data:important", []byte("keep"))).To(Succeed())
})
It("deletes all keys with the given prefix", func() {
deleted, err := service.DeleteByPrefix(ctx, "cache:user:")
Expect(err).ToNot(HaveOccurred())
Expect(deleted).To(Equal(int64(2)))
keys, err := service.List(ctx, "")
Expect(err).ToNot(HaveOccurred())
Expect(keys).To(HaveLen(2))
Expect(keys).To(ContainElements("cache:item:1", "data:important"))
})
It("rejects empty prefix", func() {
_, err := service.DeleteByPrefix(ctx, "")
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("prefix cannot be empty"))
})
It("returns 0 when no keys match", func() {
deleted, err := service.DeleteByPrefix(ctx, "nonexistent:")
Expect(err).ToNot(HaveOccurred())
Expect(deleted).To(Equal(int64(0)))
})
It("updates storage size correctly", func() {
usedBefore, _ := service.GetStorageUsed(ctx)
Expect(usedBefore).To(BeNumerically(">", 0))
_, err := service.DeleteByPrefix(ctx, "cache:")
Expect(err).ToNot(HaveOccurred())
usedAfter, _ := service.GetStorageUsed(ctx)
Expect(usedAfter).To(Equal(int64(4)))
})
It("handles special LIKE characters in prefix", func() {
Expect(service.Set(ctx, "test%special", []byte("v1"))).To(Succeed())
Expect(service.Set(ctx, "test_special", []byte("v2"))).To(Succeed())
Expect(service.Set(ctx, "testXspecial", []byte("v3"))).To(Succeed())
deleted, err := service.DeleteByPrefix(ctx, "test%")
Expect(err).ToNot(HaveOccurred())
Expect(deleted).To(Equal(int64(1)))
exists, _ := service.Has(ctx, "test_special")
Expect(exists).To(BeTrue())
exists, _ = service.Has(ctx, "testXspecial")
Expect(exists).To(BeTrue())
})
It("also deletes expired keys matching prefix", func() {
_, err := service.db.Exec(`
INSERT INTO kvstore (key, value, size, expires_at)
VALUES ('cache:expired', 'old', 3, datetime('now', '-1 seconds'))
`)
Expect(err).ToNot(HaveOccurred())
deleted, err := service.DeleteByPrefix(ctx, "cache:")
Expect(err).ToNot(HaveOccurred())
Expect(deleted).To(Equal(int64(4)))
})
})
Describe("GetMany", func() {
BeforeEach(func() {
Expect(service.Set(ctx, "key1", []byte("value1"))).To(Succeed())
Expect(service.Set(ctx, "key2", []byte("value2"))).To(Succeed())
Expect(service.Set(ctx, "key3", []byte("value3"))).To(Succeed())
})
It("retrieves multiple values at once", func() {
values, err := service.GetMany(ctx, []string{"key1", "key2", "key3"})
Expect(err).ToNot(HaveOccurred())
Expect(values).To(HaveLen(3))
Expect(values["key1"]).To(Equal([]byte("value1")))
Expect(values["key2"]).To(Equal([]byte("value2")))
Expect(values["key3"]).To(Equal([]byte("value3")))
})
It("omits missing keys from result", func() {
values, err := service.GetMany(ctx, []string{"key1", "missing", "key3"})
Expect(err).ToNot(HaveOccurred())
Expect(values).To(HaveLen(2))
Expect(values["key1"]).To(Equal([]byte("value1")))
Expect(values["key3"]).To(Equal([]byte("value3")))
_, hasMissing := values["missing"]
Expect(hasMissing).To(BeFalse())
})
It("returns empty map for empty keys slice", func() {
values, err := service.GetMany(ctx, []string{})
Expect(err).ToNot(HaveOccurred())
Expect(values).To(BeEmpty())
})
It("returns empty map for nil keys slice", func() {
values, err := service.GetMany(ctx, nil)
Expect(err).ToNot(HaveOccurred())
Expect(values).To(BeEmpty())
})
It("excludes expired keys", func() {
_, err := service.db.Exec(`
INSERT INTO kvstore (key, value, size, expires_at)
VALUES ('expired_many', 'old', 3, datetime('now', '-1 seconds'))
`)
Expect(err).ToNot(HaveOccurred())
values, err := service.GetMany(ctx, []string{"key1", "expired_many"})
Expect(err).ToNot(HaveOccurred())
Expect(values).To(HaveLen(1))
Expect(values["key1"]).To(Equal([]byte("value1")))
})
It("handles all keys missing", func() {
values, err := service.GetMany(ctx, []string{"nope1", "nope2"})
Expect(err).ToNot(HaveOccurred())
Expect(values).To(BeEmpty())
})
})
}) })
var _ = Describe("KVStoreService Integration", Ordered, func() { var _ = Describe("KVStoreService Integration", Ordered, func() {
@@ -416,17 +720,21 @@ var _ = Describe("KVStoreService Integration", Ordered, func() {
Describe("KVStore Operations via Plugin", func() { Describe("KVStore Operations via Plugin", func() {
type testKVStoreInput struct { type testKVStoreInput struct {
Operation string `json:"operation"` Operation string `json:"operation"`
Key string `json:"key"` Key string `json:"key"`
Value []byte `json:"value,omitempty"` Value []byte `json:"value,omitempty"`
Prefix string `json:"prefix,omitempty"` Prefix string `json:"prefix,omitempty"`
TTLSeconds int64 `json:"ttl_seconds,omitempty"`
Keys []string `json:"keys,omitempty"`
} }
type testKVStoreOutput struct { type testKVStoreOutput struct {
Value []byte `json:"value,omitempty"` Value []byte `json:"value,omitempty"`
Exists bool `json:"exists,omitempty"` Values map[string][]byte `json:"values,omitempty"`
Keys []string `json:"keys,omitempty"` Exists bool `json:"exists,omitempty"`
StorageUsed int64 `json:"storage_used,omitempty"` Keys []string `json:"keys,omitempty"`
Error *string `json:"error,omitempty"` StorageUsed int64 `json:"storage_used,omitempty"`
DeletedCount int64 `json:"deleted_count,omitempty"`
Error *string `json:"error,omitempty"`
} }
callTestKVStore := func(ctx context.Context, input testKVStoreInput) (*testKVStoreOutput, error) { callTestKVStore := func(ctx context.Context, input testKVStoreInput) (*testKVStoreOutput, error) {
@@ -594,6 +902,107 @@ var _ = Describe("KVStoreService Integration", Ordered, func() {
Expect(output.Exists).To(BeTrue()) Expect(output.Exists).To(BeTrue())
Expect(output.Value).To(Equal(binaryData)) Expect(output.Value).To(Equal(binaryData))
}) })
It("should set value with TTL and expire it", func() {
ctx := GinkgoT().Context()
// Set value with 1 second TTL
_, err := callTestKVStore(ctx, testKVStoreInput{
Operation: "set_with_ttl",
Key: "ttl_key",
Value: []byte("temporary"),
TTLSeconds: 1,
})
Expect(err).ToNot(HaveOccurred())
// Immediately should exist
output, err := callTestKVStore(ctx, testKVStoreInput{
Operation: "get",
Key: "ttl_key",
})
Expect(err).ToNot(HaveOccurred())
Expect(output.Exists).To(BeTrue())
Expect(output.Value).To(Equal([]byte("temporary")))
// Wait for expiration
time.Sleep(2 * time.Second)
// Should no longer exist
output, err = callTestKVStore(ctx, testKVStoreInput{
Operation: "get",
Key: "ttl_key",
})
Expect(err).ToNot(HaveOccurred())
Expect(output.Exists).To(BeFalse())
})
It("should delete keys by prefix", func() {
ctx := GinkgoT().Context()
// Set multiple keys with shared prefix
for _, key := range []string{"del_prefix:a", "del_prefix:b", "keep:c"} {
_, err := callTestKVStore(ctx, testKVStoreInput{
Operation: "set",
Key: key,
Value: []byte("value"),
})
Expect(err).ToNot(HaveOccurred())
}
// Delete by prefix
output, err := callTestKVStore(ctx, testKVStoreInput{
Operation: "delete_by_prefix",
Prefix: "del_prefix:",
})
Expect(err).ToNot(HaveOccurred())
Expect(output.DeletedCount).To(Equal(int64(2)))
// Verify remaining key
getOutput, err := callTestKVStore(ctx, testKVStoreInput{
Operation: "has",
Key: "keep:c",
})
Expect(err).ToNot(HaveOccurred())
Expect(getOutput.Exists).To(BeTrue())
// Verify deleted keys are gone
getOutput, err = callTestKVStore(ctx, testKVStoreInput{
Operation: "has",
Key: "del_prefix:a",
})
Expect(err).ToNot(HaveOccurred())
Expect(getOutput.Exists).To(BeFalse())
})
It("should get many values at once", func() {
ctx := GinkgoT().Context()
// Set multiple keys
for _, kv := range []struct{ k, v string }{
{"many:1", "val1"},
{"many:2", "val2"},
{"many:3", "val3"},
} {
_, err := callTestKVStore(ctx, testKVStoreInput{
Operation: "set",
Key: kv.k,
Value: []byte(kv.v),
})
Expect(err).ToNot(HaveOccurred())
}
// Get many, including a missing key
output, err := callTestKVStore(ctx, testKVStoreInput{
Operation: "get_many",
Keys: []string{"many:1", "many:3", "many:missing"},
})
Expect(err).ToNot(HaveOccurred())
Expect(output.Values).To(HaveLen(2))
Expect(output.Values["many:1"]).To(Equal([]byte("val1")))
Expect(output.Values["many:3"]).To(Equal([]byte("val3")))
_, hasMissing := output.Values["many:missing"]
Expect(hasMissing).To(BeFalse())
})
}) })
Describe("Database Isolation", func() { Describe("Database Isolation", func() {

595
plugins/host_taskqueue.go Normal file
View File

@@ -0,0 +1,595 @@
package plugins
import (
"context"
"database/sql"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
_ "github.com/mattn/go-sqlite3"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model/id"
"github.com/navidrome/navidrome/plugins/capabilities"
"github.com/navidrome/navidrome/plugins/host"
"golang.org/x/time/rate"
)
const (
defaultConcurrency int32 = 1
defaultBackoffMs int64 = 1000
defaultRetentionMs int64 = 3_600_000 // 1 hour
minRetentionMs int64 = 60_000 // 1 minute
maxRetentionMs int64 = 604_800_000 // 1 week
maxQueueNameLength = 128
maxPayloadSize = 1 * 1024 * 1024 // 1MB
maxBackoffMs int64 = 3_600_000 // 1 hour
taskCleanupInterval = 5 * time.Minute
pollInterval = 5 * time.Second
shutdownTimeout = 10 * time.Second
taskStatusPending = "pending"
taskStatusRunning = "running"
taskStatusCompleted = "completed"
taskStatusFailed = "failed"
taskStatusCancelled = "cancelled"
)
// CapabilityTaskWorker indicates the plugin can receive task execution callbacks.
const CapabilityTaskWorker Capability = "TaskWorker"
const FuncTaskWorkerCallback = "nd_task_execute"
func init() {
registerCapability(CapabilityTaskWorker, FuncTaskWorkerCallback)
}
type queueState struct {
config host.QueueConfig
signal chan struct{}
limiter *rate.Limiter
}
// notifyWorkers sends a non-blocking signal to wake up queue workers.
func (qs *queueState) notifyWorkers() {
select {
case qs.signal <- struct{}{}:
default:
}
}
// taskQueueServiceImpl implements host.TaskQueueService with SQLite persistence
// and background worker goroutines for task execution.
type taskQueueServiceImpl struct {
pluginName string
manager *Manager
maxConcurrency int32
db *sql.DB
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.Mutex
queues map[string]*queueState
// For testing: override how callbacks are invoked
invokeCallbackFn func(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error)
}
// newTaskQueueService creates a new taskQueueServiceImpl with its own SQLite database.
func newTaskQueueService(pluginName string, manager *Manager, maxConcurrency int32) (*taskQueueServiceImpl, error) {
dataDir := filepath.Join(conf.Server.DataFolder, "plugins", pluginName)
if err := os.MkdirAll(dataDir, 0700); err != nil {
return nil, fmt.Errorf("creating plugin data directory: %w", err)
}
dbPath := filepath.Join(dataDir, "taskqueue.db")
db, err := sql.Open("sqlite3", dbPath+"?_busy_timeout=5000&_journal_mode=WAL&_foreign_keys=off")
if err != nil {
return nil, fmt.Errorf("opening taskqueue database: %w", err)
}
db.SetMaxOpenConns(3)
db.SetMaxIdleConns(1)
if err := createTaskQueueSchema(db); err != nil {
db.Close()
return nil, fmt.Errorf("creating taskqueue schema: %w", err)
}
ctx, cancel := context.WithCancel(manager.ctx)
s := &taskQueueServiceImpl{
pluginName: pluginName,
manager: manager,
maxConcurrency: maxConcurrency,
db: db,
ctx: ctx,
cancel: cancel,
queues: make(map[string]*queueState),
}
s.invokeCallbackFn = s.defaultInvokeCallback
s.wg.Go(s.cleanupLoop)
log.Debug("Initialized plugin taskqueue", "plugin", pluginName, "path", dbPath, "maxConcurrency", maxConcurrency)
return s, nil
}
// createTaskQueueSchema applies schema migrations to the taskqueue database.
// New migrations must be appended at the end of the slice.
func createTaskQueueSchema(db *sql.DB) error {
return migrateDB(db, []string{
`CREATE TABLE IF NOT EXISTS queues (
name TEXT PRIMARY KEY,
concurrency INTEGER NOT NULL DEFAULT 1,
max_retries INTEGER NOT NULL DEFAULT 0,
backoff_ms INTEGER NOT NULL DEFAULT 1000,
delay_ms INTEGER NOT NULL DEFAULT 0,
retention_ms INTEGER NOT NULL DEFAULT 3600000
)`,
`CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
queue_name TEXT NOT NULL REFERENCES queues(name),
payload BLOB NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
attempt INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL,
next_run_at INTEGER NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
message TEXT NOT NULL DEFAULT ''
)`,
`CREATE INDEX IF NOT EXISTS idx_tasks_dequeue ON tasks(queue_name, status, next_run_at)`,
})
}
// applyConfigDefaults fills zero-value config fields with sensible defaults
// and clamps values to valid ranges, logging warnings for clamped values.
func (s *taskQueueServiceImpl) applyConfigDefaults(ctx context.Context, name string, config *host.QueueConfig) {
if config.Concurrency <= 0 {
config.Concurrency = defaultConcurrency
}
if config.BackoffMs <= 0 {
config.BackoffMs = defaultBackoffMs
}
if config.RetentionMs <= 0 {
config.RetentionMs = defaultRetentionMs
}
if config.RetentionMs < minRetentionMs {
log.Warn(ctx, "TaskQueue retention clamped to minimum", "plugin", s.pluginName, "queue", name,
"requested", config.RetentionMs, "min", minRetentionMs)
config.RetentionMs = minRetentionMs
}
if config.RetentionMs > maxRetentionMs {
log.Warn(ctx, "TaskQueue retention clamped to maximum", "plugin", s.pluginName, "queue", name,
"requested", config.RetentionMs, "max", maxRetentionMs)
config.RetentionMs = maxRetentionMs
}
}
// clampConcurrency reduces config.Concurrency if it exceeds the remaining budget.
// Returns an error when the concurrency budget is fully exhausted.
// Must be called with s.mu held.
func (s *taskQueueServiceImpl) clampConcurrency(ctx context.Context, name string, config *host.QueueConfig) error {
var allocated int32
for _, qs := range s.queues {
allocated += qs.config.Concurrency
}
available := s.maxConcurrency - allocated
if available <= 0 {
log.Warn(ctx, "TaskQueue concurrency budget exhausted", "plugin", s.pluginName, "queue", name,
"allocated", allocated, "maxConcurrency", s.maxConcurrency)
return fmt.Errorf("concurrency budget exhausted (%d/%d allocated)", allocated, s.maxConcurrency)
}
if config.Concurrency > available {
log.Warn(ctx, "TaskQueue concurrency clamped", "plugin", s.pluginName, "queue", name,
"requested", config.Concurrency, "available", available, "maxConcurrency", s.maxConcurrency)
config.Concurrency = available
}
return nil
}
func (s *taskQueueServiceImpl) CreateQueue(ctx context.Context, name string, config host.QueueConfig) error {
if len(name) == 0 {
return fmt.Errorf("queue name cannot be empty")
}
if len(name) > maxQueueNameLength {
return fmt.Errorf("queue name exceeds maximum length of %d bytes", maxQueueNameLength)
}
s.applyConfigDefaults(ctx, name, &config)
s.mu.Lock()
defer s.mu.Unlock()
if err := s.clampConcurrency(ctx, name, &config); err != nil {
return err
}
if _, exists := s.queues[name]; exists {
return fmt.Errorf("queue %q already exists", name)
}
// Upsert into queues table (idempotent across restarts)
_, err := s.db.ExecContext(ctx, `
INSERT INTO queues (name, concurrency, max_retries, backoff_ms, delay_ms, retention_ms)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(name) DO UPDATE SET
concurrency = excluded.concurrency,
max_retries = excluded.max_retries,
backoff_ms = excluded.backoff_ms,
delay_ms = excluded.delay_ms,
retention_ms = excluded.retention_ms
`, name, config.Concurrency, config.MaxRetries, config.BackoffMs, config.DelayMs, config.RetentionMs)
if err != nil {
return fmt.Errorf("creating queue: %w", err)
}
// Reset stale running tasks from previous crash
now := time.Now().UnixMilli()
_, err = s.db.ExecContext(ctx, `
UPDATE tasks SET status = ?, updated_at = ? WHERE queue_name = ? AND status = ?
`, taskStatusPending, now, name, taskStatusRunning)
if err != nil {
return fmt.Errorf("resetting stale tasks: %w", err)
}
qs := &queueState{
config: config,
signal: make(chan struct{}, 1),
}
if config.DelayMs > 0 {
// Rate limit dispatches to enforce delay between tasks.
// Burst of 1 allows one immediate dispatch, then enforces the delay interval.
qs.limiter = rate.NewLimiter(rate.Every(time.Duration(config.DelayMs)*time.Millisecond), 1)
}
s.queues[name] = qs
for i := int32(0); i < config.Concurrency; i++ {
s.wg.Go(func() { s.worker(name, qs) })
}
log.Debug(ctx, "Created task queue", "plugin", s.pluginName, "queue", name,
"concurrency", config.Concurrency, "maxRetries", config.MaxRetries,
"backoffMs", config.BackoffMs, "delayMs", config.DelayMs, "retentionMs", config.RetentionMs)
return nil
}
func (s *taskQueueServiceImpl) Enqueue(ctx context.Context, queueName string, payload []byte) (string, error) {
s.mu.Lock()
qs, exists := s.queues[queueName]
s.mu.Unlock()
if !exists {
return "", fmt.Errorf("queue %q does not exist", queueName)
}
if len(payload) > maxPayloadSize {
return "", fmt.Errorf("payload size %d exceeds maximum of %d bytes", len(payload), maxPayloadSize)
}
taskID := id.NewRandom()
now := time.Now().UnixMilli()
_, err := s.db.ExecContext(ctx, `
INSERT INTO tasks (id, queue_name, payload, status, attempt, max_retries, next_run_at, created_at, updated_at)
VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?)
`, taskID, queueName, payload, taskStatusPending, qs.config.MaxRetries, now, now, now)
if err != nil {
return "", fmt.Errorf("enqueuing task: %w", err)
}
qs.notifyWorkers()
log.Trace(ctx, "Enqueued task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
return taskID, nil
}
// Get returns the current state of a task.
func (s *taskQueueServiceImpl) Get(ctx context.Context, taskID string) (*host.TaskInfo, error) {
var info host.TaskInfo
err := s.db.QueryRowContext(ctx, `SELECT status, message, attempt FROM tasks WHERE id = ?`, taskID).
Scan(&info.Status, &info.Message, &info.Attempt)
if errors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("task %q not found", taskID)
}
if err != nil {
return nil, fmt.Errorf("getting task info: %w", err)
}
return &info, nil
}
// Cancel cancels a pending task.
func (s *taskQueueServiceImpl) Cancel(ctx context.Context, taskID string) error {
now := time.Now().UnixMilli()
result, err := s.db.ExecContext(ctx, `
UPDATE tasks SET status = ?, updated_at = ? WHERE id = ? AND status = ?
`, taskStatusCancelled, now, taskID, taskStatusPending)
if err != nil {
return fmt.Errorf("cancelling task: %w", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("checking cancel result: %w", err)
}
if rowsAffected == 0 {
// Check if task exists at all
var status string
err := s.db.QueryRowContext(ctx, `SELECT status FROM tasks WHERE id = ?`, taskID).Scan(&status)
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("task %q not found", taskID)
}
if err != nil {
return fmt.Errorf("checking task existence: %w", err)
}
return fmt.Errorf("task %q cannot be cancelled (status: %s)", taskID, status)
}
log.Trace(ctx, "Cancelled task", "plugin", s.pluginName, "taskID", taskID)
return nil
}
// ClearQueue removes all pending tasks from the named queue.
// Running tasks are not affected. Returns the number of tasks removed.
func (s *taskQueueServiceImpl) ClearQueue(ctx context.Context, queueName string) (int64, error) {
s.mu.Lock()
_, exists := s.queues[queueName]
s.mu.Unlock()
if !exists {
return 0, fmt.Errorf("queue %q does not exist", queueName)
}
now := time.Now().UnixMilli()
result, err := s.db.ExecContext(ctx, `
UPDATE tasks SET status = ?, updated_at = ? WHERE queue_name = ? AND status = ?
`, taskStatusCancelled, now, queueName, taskStatusPending)
if err != nil {
return 0, fmt.Errorf("clearing queue: %w", err)
}
cleared, err := result.RowsAffected()
if err != nil {
return 0, fmt.Errorf("checking clear result: %w", err)
}
if cleared > 0 {
log.Debug(ctx, "Cleared pending tasks from queue", "plugin", s.pluginName, "queue", queueName, "cleared", cleared)
}
return cleared, nil
}
// worker is the main loop for a single worker goroutine.
func (s *taskQueueServiceImpl) worker(queueName string, qs *queueState) {
// Process any existing pending tasks immediately on startup
s.drainQueue(queueName, qs)
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-qs.signal:
s.drainQueue(queueName, qs)
case <-ticker.C:
s.drainQueue(queueName, qs)
}
}
}
func (s *taskQueueServiceImpl) drainQueue(queueName string, qs *queueState) {
for s.ctx.Err() == nil && s.processTask(queueName, qs) {
}
}
// processTask dequeues and processes a single task. Returns true if a task was processed.
func (s *taskQueueServiceImpl) processTask(queueName string, qs *queueState) bool {
now := time.Now().UnixMilli()
// Atomically dequeue a task
var taskID string
var payload []byte
var attempt, maxRetries int32
err := s.db.QueryRowContext(s.ctx, `
UPDATE tasks SET status = ?, attempt = attempt + 1, updated_at = ?
WHERE id = (
SELECT id FROM tasks
WHERE queue_name = ? AND status = ? AND next_run_at <= ?
ORDER BY next_run_at, created_at LIMIT 1
)
RETURNING id, payload, attempt, max_retries
`, taskStatusRunning, now, queueName, taskStatusPending, now).Scan(&taskID, &payload, &attempt, &maxRetries)
if errors.Is(err, sql.ErrNoRows) {
return false
}
if err != nil {
log.Error(s.ctx, "Failed to dequeue task", "plugin", s.pluginName, "queue", queueName, err)
return false
}
// Enforce delay between task dispatches using a rate limiter.
// This is done after dequeue so that empty polls don't consume rate tokens.
if qs.limiter != nil {
if err := qs.limiter.Wait(s.ctx); err != nil {
// Context cancelled during wait — revert task to pending for recovery
s.revertTaskToPending(taskID)
return false
}
}
// Invoke callback
log.Debug(s.ctx, "Executing task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID, "attempt", attempt)
message, callbackErr := s.invokeCallbackFn(s.ctx, queueName, taskID, payload, attempt)
// If context was cancelled (shutdown), revert task to pending for recovery
if s.ctx.Err() != nil {
s.revertTaskToPending(taskID)
return false
}
if callbackErr == nil {
s.completeTask(queueName, taskID, message)
} else {
s.handleTaskFailure(queueName, taskID, attempt, maxRetries, qs, callbackErr, message)
}
return true
}
func (s *taskQueueServiceImpl) completeTask(queueName, taskID, message string) {
now := time.Now().UnixMilli()
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusCompleted, message, now, taskID); err != nil {
log.Error(s.ctx, "Failed to mark task as completed", "plugin", s.pluginName, "taskID", taskID, err)
}
log.Debug(s.ctx, "Task completed", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
}
func (s *taskQueueServiceImpl) handleTaskFailure(queueName, taskID string, attempt, maxRetries int32, qs *queueState, callbackErr error, message string) {
log.Warn(s.ctx, "Task execution failed", "plugin", s.pluginName, "queue", queueName,
"taskID", taskID, "attempt", attempt, "maxRetries", maxRetries, "err", callbackErr)
// Use error message as fallback if no message was provided
if message == "" {
message = callbackErr.Error()
}
now := time.Now().UnixMilli()
if attempt > maxRetries {
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusFailed, message, now, taskID); err != nil {
log.Error(s.ctx, "Failed to mark task as failed", "plugin", s.pluginName, "taskID", taskID, err)
}
log.Warn(s.ctx, "Task failed after all retries", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
return
}
// Exponential backoff: backoffMs * 2^(attempt-1)
backoff := qs.config.BackoffMs << (attempt - 1)
if backoff <= 0 || backoff > maxBackoffMs {
backoff = maxBackoffMs
}
nextRunAt := now + backoff
if _, err := s.db.ExecContext(s.ctx, `
UPDATE tasks SET status = ?, next_run_at = ?, updated_at = ? WHERE id = ?
`, taskStatusPending, nextRunAt, now, taskID); err != nil {
log.Error(s.ctx, "Failed to reschedule task for retry", "plugin", s.pluginName, "taskID", taskID, err)
}
// Wake worker after backoff expires
time.AfterFunc(time.Duration(backoff)*time.Millisecond, func() {
qs.notifyWorkers()
})
}
// revertTaskToPending puts a running task back to pending status and decrements the attempt
// counter (used during shutdown to ensure the interrupted attempt doesn't count).
func (s *taskQueueServiceImpl) revertTaskToPending(taskID string) {
now := time.Now().UnixMilli()
_, err := s.db.Exec(`UPDATE tasks SET status = ?, attempt = MAX(attempt - 1, 0), updated_at = ? WHERE id = ? AND status = ?`, taskStatusPending, now, taskID, taskStatusRunning)
if err != nil {
log.Error("Failed to revert task to pending", "plugin", s.pluginName, "taskID", taskID, err)
}
}
// defaultInvokeCallback calls the plugin's nd_task_execute function.
func (s *taskQueueServiceImpl) defaultInvokeCallback(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error) {
s.manager.mu.RLock()
p, ok := s.manager.plugins[s.pluginName]
s.manager.mu.RUnlock()
if !ok {
return "", fmt.Errorf("plugin %s not loaded", s.pluginName)
}
input := capabilities.TaskExecuteRequest{
QueueName: queueName,
TaskID: taskID,
Payload: payload,
Attempt: attempt,
}
message, err := callPluginFunction[capabilities.TaskExecuteRequest, string](ctx, p, FuncTaskWorkerCallback, input)
if err != nil {
return "", err
}
return message, nil
}
// cleanupLoop periodically removes terminal tasks past their retention period.
func (s *taskQueueServiceImpl) cleanupLoop() {
ticker := time.NewTicker(taskCleanupInterval)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.runCleanup()
}
}
}
// runCleanup deletes terminal tasks past their retention period.
func (s *taskQueueServiceImpl) runCleanup() {
s.mu.Lock()
queues := make(map[string]*queueState, len(s.queues))
for k, v := range s.queues {
queues[k] = v
}
s.mu.Unlock()
now := time.Now().UnixMilli()
for name, qs := range queues {
result, err := s.db.ExecContext(s.ctx, `
DELETE FROM tasks WHERE queue_name = ? AND status IN (?, ?, ?) AND updated_at + ? < ?
`, name, taskStatusCompleted, taskStatusFailed, taskStatusCancelled, qs.config.RetentionMs, now)
if err != nil {
log.Error(s.ctx, "Failed to cleanup tasks", "plugin", s.pluginName, "queue", name, err)
continue
}
if deleted, _ := result.RowsAffected(); deleted > 0 {
log.Debug(s.ctx, "Cleaned up terminal tasks", "plugin", s.pluginName, "queue", name, "deleted", deleted)
}
}
}
// Close shuts down the task queue service, stopping all workers and closing the database.
func (s *taskQueueServiceImpl) Close() error {
// Cancel context to signal all goroutines
s.cancel()
// Wait for goroutines with timeout
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(shutdownTimeout):
log.Warn("TaskQueue shutdown timed out", "plugin", s.pluginName)
}
// Mark running tasks as pending for recovery on next startup
if s.db != nil {
now := time.Now().UnixMilli()
if _, err := s.db.Exec(`UPDATE tasks SET status = ?, updated_at = ? WHERE status = ?`, taskStatusPending, now, taskStatusRunning); err != nil {
log.Error("Failed to reset running tasks on shutdown", "plugin", s.pluginName, err)
}
log.Debug("Closing plugin taskqueue", "plugin", s.pluginName)
return s.db.Close()
}
return nil
}
// Compile-time verification
var _ host.TaskService = (*taskQueueServiceImpl)(nil)
var _ io.Closer = (*taskQueueServiceImpl)(nil)

View File

File diff suppressed because it is too large Load Diff

View File

@@ -103,7 +103,7 @@ var hostServices = []hostServiceEntry{
hasPermission: func(p *Permissions) bool { return p != nil && p.Kvstore != nil }, hasPermission: func(p *Permissions) bool { return p != nil && p.Kvstore != nil },
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) { create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
perm := ctx.permissions.Kvstore perm := ctx.permissions.Kvstore
service, err := newKVStoreService(ctx.pluginName, perm) service, err := newKVStoreService(ctx.manager.ctx, ctx.pluginName, perm)
if err != nil { if err != nil {
log.Error("Failed to create KVStore service", "plugin", ctx.pluginName, err) log.Error("Failed to create KVStore service", "plugin", ctx.pluginName, err)
return nil, nil return nil, nil
@@ -128,6 +128,23 @@ var hostServices = []hostServiceEntry{
return host.RegisterHTTPHostFunctions(service), nil return host.RegisterHTTPHostFunctions(service), nil
}, },
}, },
{
name: "Task",
hasPermission: func(p *Permissions) bool { return p != nil && p.Taskqueue != nil },
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
perm := ctx.permissions.Taskqueue
maxConcurrency := int32(1)
if perm.MaxConcurrency > 0 {
maxConcurrency = int32(perm.MaxConcurrency)
}
service, err := newTaskQueueService(ctx.pluginName, ctx.manager, maxConcurrency)
if err != nil {
log.Error("Failed to create Task service", "plugin", ctx.pluginName, err)
return nil, nil
}
return host.RegisterTaskHostFunctions(service), service
},
},
} }
// extractManifest reads manifest from an .ndp package and computes its SHA-256 hash. // extractManifest reads manifest from an .ndp package and computes its SHA-256 hash.

View File

@@ -110,6 +110,9 @@
}, },
"users": { "users": {
"$ref": "#/$defs/UsersPermission" "$ref": "#/$defs/UsersPermission"
},
"taskqueue": {
"$ref": "#/$defs/TaskQueuePermission"
} }
} }
}, },
@@ -224,6 +227,23 @@
} }
} }
}, },
"TaskQueuePermission": {
"type": "object",
"description": "Task queue permissions for background task processing",
"additionalProperties": false,
"properties": {
"reason": {
"type": "string",
"description": "Explanation for why task queue access is needed"
},
"maxConcurrency": {
"type": "integer",
"description": "Maximum total concurrent workers across all queues. Default: 1",
"minimum": 1,
"default": 1
}
}
},
"UsersPermission": { "UsersPermission": {
"type": "object", "type": "object",
"description": "Users service permissions for accessing user information", "description": "Users service permissions for accessing user information",

View File

@@ -72,6 +72,13 @@ func ValidateWithCapabilities(m *Manifest, capabilities []Capability) error {
} }
} }
// Task (taskqueue) permission requires TaskWorker capability
if m.Permissions != nil && m.Permissions.Taskqueue != nil {
if !hasCapability(capabilities, CapabilityTaskWorker) {
return fmt.Errorf("'taskqueue' permission requires plugin to export '%s' function", FuncTaskWorkerCallback)
}
}
return nil return nil
} }

View File

@@ -181,6 +181,9 @@ type Permissions struct {
// Subsonicapi corresponds to the JSON schema field "subsonicapi". // Subsonicapi corresponds to the JSON schema field "subsonicapi".
Subsonicapi *SubsonicAPIPermission `json:"subsonicapi,omitempty" yaml:"subsonicapi,omitempty" mapstructure:"subsonicapi,omitempty"` Subsonicapi *SubsonicAPIPermission `json:"subsonicapi,omitempty" yaml:"subsonicapi,omitempty" mapstructure:"subsonicapi,omitempty"`
// Taskqueue corresponds to the JSON schema field "taskqueue".
Taskqueue *TaskQueuePermission `json:"taskqueue,omitempty" yaml:"taskqueue,omitempty" mapstructure:"taskqueue,omitempty"`
// Users corresponds to the JSON schema field "users". // Users corresponds to the JSON schema field "users".
Users *UsersPermission `json:"users,omitempty" yaml:"users,omitempty" mapstructure:"users,omitempty"` Users *UsersPermission `json:"users,omitempty" yaml:"users,omitempty" mapstructure:"users,omitempty"`
@@ -200,6 +203,36 @@ type SubsonicAPIPermission struct {
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"` Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
} }
// Task queue permissions for background task processing
type TaskQueuePermission struct {
// Maximum total concurrent workers across all queues. Default: 1
MaxConcurrency int `json:"maxConcurrency,omitempty" yaml:"maxConcurrency,omitempty" mapstructure:"maxConcurrency,omitempty"`
// Explanation for why task queue access is needed
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *TaskQueuePermission) UnmarshalJSON(value []byte) error {
var raw map[string]interface{}
if err := json.Unmarshal(value, &raw); err != nil {
return err
}
type Plain TaskQueuePermission
var plain Plain
if err := json.Unmarshal(value, &plain); err != nil {
return err
}
if v, ok := raw["maxConcurrency"]; !ok || v == nil {
plain.MaxConcurrency = 1.0
}
if 1 > plain.MaxConcurrency {
return fmt.Errorf("field %s: must be >= %v", "maxConcurrency", 1)
}
*j = TaskQueuePermission(plain)
return nil
}
// Enable experimental WebAssembly threads support // Enable experimental WebAssembly threads support
type ThreadsFeature struct { type ThreadsFeature struct {
// Explanation for why threads support is needed // Explanation for why threads support is needed

47
plugins/migrate.go Normal file
View File

@@ -0,0 +1,47 @@
package plugins
import (
"database/sql"
"fmt"
)
// migrateDB applies schema migrations to a SQLite database.
//
// Each entry in migrations is a single SQL statement. The current schema version
// is tracked using SQLite's built-in PRAGMA user_version. Only statements after
// the current version are executed, within a single transaction.
func migrateDB(db *sql.DB, migrations []string) error {
var version int
if err := db.QueryRow(`PRAGMA user_version`).Scan(&version); err != nil {
return fmt.Errorf("reading schema version: %w", err)
}
if version >= len(migrations) {
return nil
}
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("starting migration transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()
for i := version; i < len(migrations); i++ {
if _, err := tx.Exec(migrations[i]); err != nil {
return fmt.Errorf("migration %d failed: %w", i+1, err)
}
}
// PRAGMA statements cannot be executed inside a transaction in some SQLite
// drivers, but with mattn/go-sqlite3 this works. We set it inside the tx
// so that a failed commit leaves the version unchanged.
if _, err := tx.Exec(fmt.Sprintf(`PRAGMA user_version = %d`, len(migrations))); err != nil {
return fmt.Errorf("updating schema version: %w", err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("committing migrations: %w", err)
}
return nil
}

99
plugins/migrate_test.go Normal file
View File

@@ -0,0 +1,99 @@
//go:build !windows
package plugins
import (
"database/sql"
_ "github.com/mattn/go-sqlite3"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("migrateDB", func() {
var db *sql.DB
BeforeEach(func() {
var err error
db, err = sql.Open("sqlite3", ":memory:")
Expect(err).ToNot(HaveOccurred())
})
AfterEach(func() {
if db != nil {
db.Close()
}
})
getUserVersion := func() int {
var version int
Expect(db.QueryRow(`PRAGMA user_version`).Scan(&version)).To(Succeed())
return version
}
It("applies all migrations on a fresh database", func() {
migrations := []string{
`CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)`,
`ALTER TABLE test ADD COLUMN email TEXT`,
}
Expect(migrateDB(db, migrations)).To(Succeed())
Expect(getUserVersion()).To(Equal(2))
// Verify schema
_, err := db.Exec(`INSERT INTO test (id, name, email) VALUES (1, 'Alice', 'alice@test.com')`)
Expect(err).ToNot(HaveOccurred())
})
It("skips already applied migrations", func() {
migrations1 := []string{
`CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)`,
}
Expect(migrateDB(db, migrations1)).To(Succeed())
Expect(getUserVersion()).To(Equal(1))
// Add a new migration
migrations2 := []string{
`CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)`,
`ALTER TABLE test ADD COLUMN email TEXT`,
}
Expect(migrateDB(db, migrations2)).To(Succeed())
Expect(getUserVersion()).To(Equal(2))
// Verify the new column exists
_, err := db.Exec(`INSERT INTO test (id, name, email) VALUES (1, 'Alice', 'alice@test.com')`)
Expect(err).ToNot(HaveOccurred())
})
It("is a no-op when all migrations are applied", func() {
migrations := []string{
`CREATE TABLE test (id INTEGER PRIMARY KEY)`,
}
Expect(migrateDB(db, migrations)).To(Succeed())
Expect(migrateDB(db, migrations)).To(Succeed())
Expect(getUserVersion()).To(Equal(1))
})
It("is a no-op with empty migrations slice", func() {
Expect(migrateDB(db, nil)).To(Succeed())
Expect(getUserVersion()).To(Equal(0))
})
It("rolls back on failure", func() {
migrations := []string{
`CREATE TABLE test (id INTEGER PRIMARY KEY)`,
`INVALID SQL STATEMENT`,
}
err := migrateDB(db, migrations)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("migration 2 failed"))
// Version should remain 0 (rolled back)
Expect(getUserVersion()).To(Equal(0))
// Table should not exist (rolled back)
_, err = db.Exec(`INSERT INTO test (id) VALUES (1)`)
Expect(err).To(HaveOccurred())
})
})

View File

@@ -43,6 +43,7 @@ The following host services are available:
- Library: provides access to music library metadata for plugins. - Library: provides access to music library metadata for plugins.
- Scheduler: provides task scheduling capabilities for plugins. - Scheduler: provides task scheduling capabilities for plugins.
- SubsonicAPI: provides access to Navidrome's Subsonic API from plugins. - SubsonicAPI: provides access to Navidrome's Subsonic API from plugins.
- Task: provides persistent task queues for plugins.
- Users: provides access to user information for plugins. - Users: provides access to user information for plugins.
- WebSocket: provides WebSocket communication capabilities for plugins. - WebSocket: provides WebSocket communication capabilities for plugins.

View File

@@ -19,15 +19,20 @@ import (
//go:wasmimport extism:host/user kvstore_set //go:wasmimport extism:host/user kvstore_set
func kvstore_set(uint64) uint64 func kvstore_set(uint64) uint64
// kvstore_setwithttl is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user kvstore_setwithttl
func kvstore_setwithttl(uint64) uint64
// kvstore_get is the host function provided by Navidrome. // kvstore_get is the host function provided by Navidrome.
// //
//go:wasmimport extism:host/user kvstore_get //go:wasmimport extism:host/user kvstore_get
func kvstore_get(uint64) uint64 func kvstore_get(uint64) uint64
// kvstore_delete is the host function provided by Navidrome. // kvstore_getmany is the host function provided by Navidrome.
// //
//go:wasmimport extism:host/user kvstore_delete //go:wasmimport extism:host/user kvstore_getmany
func kvstore_delete(uint64) uint64 func kvstore_getmany(uint64) uint64
// kvstore_has is the host function provided by Navidrome. // kvstore_has is the host function provided by Navidrome.
// //
@@ -39,6 +44,16 @@ func kvstore_has(uint64) uint64
//go:wasmimport extism:host/user kvstore_list //go:wasmimport extism:host/user kvstore_list
func kvstore_list(uint64) uint64 func kvstore_list(uint64) uint64
// kvstore_delete is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user kvstore_delete
func kvstore_delete(uint64) uint64
// kvstore_deletebyprefix is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user kvstore_deletebyprefix
func kvstore_deletebyprefix(uint64) uint64
// kvstore_getstorageused is the host function provided by Navidrome. // kvstore_getstorageused is the host function provided by Navidrome.
// //
//go:wasmimport extism:host/user kvstore_getstorageused //go:wasmimport extism:host/user kvstore_getstorageused
@@ -49,6 +64,12 @@ type kVStoreSetRequest struct {
Value []byte `json:"value"` Value []byte `json:"value"`
} }
type kVStoreSetWithTTLRequest struct {
Key string `json:"key"`
Value []byte `json:"value"`
TtlSeconds int64 `json:"ttlSeconds"`
}
type kVStoreGetRequest struct { type kVStoreGetRequest struct {
Key string `json:"key"` Key string `json:"key"`
} }
@@ -59,8 +80,13 @@ type kVStoreGetResponse struct {
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
type kVStoreDeleteRequest struct { type kVStoreGetManyRequest struct {
Key string `json:"key"` Keys []string `json:"keys"`
}
type kVStoreGetManyResponse struct {
Values map[string][]byte `json:"values,omitempty"`
Error string `json:"error,omitempty"`
} }
type kVStoreHasRequest struct { type kVStoreHasRequest struct {
@@ -81,6 +107,19 @@ type kVStoreListResponse struct {
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
type kVStoreDeleteRequest struct {
Key string `json:"key"`
}
type kVStoreDeleteByPrefixRequest struct {
Prefix string `json:"prefix"`
}
type kVStoreDeleteByPrefixResponse struct {
DeletedCount int64 `json:"deletedCount,omitempty"`
Error string `json:"error,omitempty"`
}
type kVStoreGetStorageUsedResponse struct { type kVStoreGetStorageUsedResponse struct {
Bytes int64 `json:"bytes,omitempty"` Bytes int64 `json:"bytes,omitempty"`
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
@@ -127,6 +166,52 @@ func KVStoreSet(key string, value []byte) error {
return nil return nil
} }
// KVStoreSetWithTTL calls the kvstore_setwithttl host function.
// SetWithTTL stores a byte value with the given key and a time-to-live.
//
// After ttlSeconds, the key is treated as non-existent and will be
// cleaned up lazily. ttlSeconds must be greater than 0.
//
// Parameters:
// - key: The storage key (max 256 bytes, UTF-8)
// - value: The byte slice to store
// - ttlSeconds: Time-to-live in seconds (must be > 0)
//
// Returns an error if the storage limit would be exceeded or the operation fails.
func KVStoreSetWithTTL(key string, value []byte, ttlSeconds int64) error {
// Marshal request to JSON
req := kVStoreSetWithTTLRequest{
Key: key,
Value: value,
TtlSeconds: ttlSeconds,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := kvstore_setwithttl(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse error-only response
var response struct {
Error string `json:"error,omitempty"`
}
if err := json.Unmarshal(responseBytes, &response); err != nil {
return err
}
if response.Error != "" {
return errors.New(response.Error)
}
return nil
}
// KVStoreGet calls the kvstore_get host function. // KVStoreGet calls the kvstore_get host function.
// Get retrieves a byte value from storage. // Get retrieves a byte value from storage.
// //
@@ -167,43 +252,45 @@ func KVStoreGet(key string) ([]byte, bool, error) {
return response.Value, response.Exists, nil return response.Value, response.Exists, nil
} }
// KVStoreDelete calls the kvstore_delete host function. // KVStoreGetMany calls the kvstore_getmany host function.
// Delete removes a value from storage. // GetMany retrieves multiple values in a single call.
// //
// Parameters: // Parameters:
// - key: The storage key // - keys: The storage keys to retrieve
// //
// Returns an error if the operation fails. Does not return an error if the key doesn't exist. // Returns a map of key to value for keys that exist and have not expired.
func KVStoreDelete(key string) error { // Missing or expired keys are omitted from the result.
func KVStoreGetMany(keys []string) (map[string][]byte, error) {
// Marshal request to JSON // Marshal request to JSON
req := kVStoreDeleteRequest{ req := kVStoreGetManyRequest{
Key: key, Keys: keys,
} }
reqBytes, err := json.Marshal(req) reqBytes, err := json.Marshal(req)
if err != nil { if err != nil {
return err return nil, err
} }
reqMem := pdk.AllocateBytes(reqBytes) reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free() defer reqMem.Free()
// Call the host function // Call the host function
responsePtr := kvstore_delete(reqMem.Offset()) responsePtr := kvstore_getmany(reqMem.Offset())
// Read the response from memory // Read the response from memory
responseMem := pdk.FindMemory(responsePtr) responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes() responseBytes := responseMem.ReadBytes()
// Parse error-only response // Parse the response
var response struct { var response kVStoreGetManyResponse
Error string `json:"error,omitempty"`
}
if err := json.Unmarshal(responseBytes, &response); err != nil { if err := json.Unmarshal(responseBytes, &response); err != nil {
return err return nil, err
} }
// Convert Error field to Go error
if response.Error != "" { if response.Error != "" {
return errors.New(response.Error) return nil, errors.New(response.Error)
} }
return nil
return response.Values, nil
} }
// KVStoreHas calls the kvstore_has host function. // KVStoreHas calls the kvstore_has host function.
@@ -286,6 +373,85 @@ func KVStoreList(prefix string) ([]string, error) {
return response.Keys, nil return response.Keys, nil
} }
// KVStoreDelete calls the kvstore_delete host function.
// Delete removes a value from storage.
//
// Parameters:
// - key: The storage key
//
// Returns an error if the operation fails. Does not return an error if the key doesn't exist.
func KVStoreDelete(key string) error {
// Marshal request to JSON
req := kVStoreDeleteRequest{
Key: key,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := kvstore_delete(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse error-only response
var response struct {
Error string `json:"error,omitempty"`
}
if err := json.Unmarshal(responseBytes, &response); err != nil {
return err
}
if response.Error != "" {
return errors.New(response.Error)
}
return nil
}
// KVStoreDeleteByPrefix calls the kvstore_deletebyprefix host function.
// DeleteByPrefix removes all keys matching the given prefix.
//
// Parameters:
// - prefix: Key prefix to match (must not be empty)
//
// Returns the number of keys deleted. Includes expired keys.
func KVStoreDeleteByPrefix(prefix string) (int64, error) {
// Marshal request to JSON
req := kVStoreDeleteByPrefixRequest{
Prefix: prefix,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return 0, err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := kvstore_deletebyprefix(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse the response
var response kVStoreDeleteByPrefixResponse
if err := json.Unmarshal(responseBytes, &response); err != nil {
return 0, err
}
// Convert Error field to Go error
if response.Error != "" {
return 0, errors.New(response.Error)
}
return response.DeletedCount, nil
}
// KVStoreGetStorageUsed calls the kvstore_getstorageused host function. // KVStoreGetStorageUsed calls the kvstore_getstorageused host function.
// GetStorageUsed returns the total storage used by this plugin in bytes. // GetStorageUsed returns the total storage used by this plugin in bytes.
func KVStoreGetStorageUsed() (int64, error) { func KVStoreGetStorageUsed() (int64, error) {

View File

@@ -37,6 +37,28 @@ func KVStoreSet(key string, value []byte) error {
return KVStoreMock.Set(key, value) return KVStoreMock.Set(key, value)
} }
// SetWithTTL is the mock method for KVStoreSetWithTTL.
func (m *mockKVStoreService) SetWithTTL(key string, value []byte, ttlSeconds int64) error {
args := m.Called(key, value, ttlSeconds)
return args.Error(0)
}
// KVStoreSetWithTTL delegates to the mock instance.
// SetWithTTL stores a byte value with the given key and a time-to-live.
//
// After ttlSeconds, the key is treated as non-existent and will be
// cleaned up lazily. ttlSeconds must be greater than 0.
//
// Parameters:
// - key: The storage key (max 256 bytes, UTF-8)
// - value: The byte slice to store
// - ttlSeconds: Time-to-live in seconds (must be > 0)
//
// Returns an error if the storage limit would be exceeded or the operation fails.
func KVStoreSetWithTTL(key string, value []byte, ttlSeconds int64) error {
return KVStoreMock.SetWithTTL(key, value, ttlSeconds)
}
// Get is the mock method for KVStoreGet. // Get is the mock method for KVStoreGet.
func (m *mockKVStoreService) Get(key string) ([]byte, bool, error) { func (m *mockKVStoreService) Get(key string) ([]byte, bool, error) {
args := m.Called(key) args := m.Called(key)
@@ -54,21 +76,22 @@ func KVStoreGet(key string) ([]byte, bool, error) {
return KVStoreMock.Get(key) return KVStoreMock.Get(key)
} }
// Delete is the mock method for KVStoreDelete. // GetMany is the mock method for KVStoreGetMany.
func (m *mockKVStoreService) Delete(key string) error { func (m *mockKVStoreService) GetMany(keys []string) (map[string][]byte, error) {
args := m.Called(key) args := m.Called(keys)
return args.Error(0) return args.Get(0).(map[string][]byte), args.Error(1)
} }
// KVStoreDelete delegates to the mock instance. // KVStoreGetMany delegates to the mock instance.
// Delete removes a value from storage. // GetMany retrieves multiple values in a single call.
// //
// Parameters: // Parameters:
// - key: The storage key // - keys: The storage keys to retrieve
// //
// Returns an error if the operation fails. Does not return an error if the key doesn't exist. // Returns a map of key to value for keys that exist and have not expired.
func KVStoreDelete(key string) error { // Missing or expired keys are omitted from the result.
return KVStoreMock.Delete(key) func KVStoreGetMany(keys []string) (map[string][]byte, error) {
return KVStoreMock.GetMany(keys)
} }
// Has is the mock method for KVStoreHas. // Has is the mock method for KVStoreHas.
@@ -105,6 +128,40 @@ func KVStoreList(prefix string) ([]string, error) {
return KVStoreMock.List(prefix) return KVStoreMock.List(prefix)
} }
// Delete is the mock method for KVStoreDelete.
func (m *mockKVStoreService) Delete(key string) error {
args := m.Called(key)
return args.Error(0)
}
// KVStoreDelete delegates to the mock instance.
// Delete removes a value from storage.
//
// Parameters:
// - key: The storage key
//
// Returns an error if the operation fails. Does not return an error if the key doesn't exist.
func KVStoreDelete(key string) error {
return KVStoreMock.Delete(key)
}
// DeleteByPrefix is the mock method for KVStoreDeleteByPrefix.
func (m *mockKVStoreService) DeleteByPrefix(prefix string) (int64, error) {
args := m.Called(prefix)
return args.Get(0).(int64), args.Error(1)
}
// KVStoreDeleteByPrefix delegates to the mock instance.
// DeleteByPrefix removes all keys matching the given prefix.
//
// Parameters:
// - prefix: Key prefix to match (must not be empty)
//
// Returns the number of keys deleted. Includes expired keys.
func KVStoreDeleteByPrefix(prefix string) (int64, error) {
return KVStoreMock.DeleteByPrefix(prefix)
}
// GetStorageUsed is the mock method for KVStoreGetStorageUsed. // GetStorageUsed is the mock method for KVStoreGetStorageUsed.
func (m *mockKVStoreService) GetStorageUsed() (int64, error) { func (m *mockKVStoreService) GetStorageUsed() (int64, error) {
args := m.Called() args := m.Called()

View File

@@ -0,0 +1,277 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains client wrappers for the Task host service.
// It is intended for use in Navidrome plugins built with TinyGo.
//
//go:build wasip1
package host
import (
"encoding/json"
"errors"
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
)
// QueueConfig represents the QueueConfig data structure.
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
Concurrency int32 `json:"concurrency"`
MaxRetries int32 `json:"maxRetries"`
BackoffMs int64 `json:"backoffMs"`
DelayMs int64 `json:"delayMs"`
RetentionMs int64 `json:"retentionMs"`
}
// TaskInfo represents the TaskInfo data structure.
// TaskInfo holds the current state of a task.
type TaskInfo struct {
Status string `json:"status"`
Message string `json:"message"`
Attempt int32 `json:"attempt"`
}
// task_createqueue is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_createqueue
func task_createqueue(uint64) uint64
// task_enqueue is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_enqueue
func task_enqueue(uint64) uint64
// task_get is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_get
func task_get(uint64) uint64
// task_cancel is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_cancel
func task_cancel(uint64) uint64
// task_clearqueue is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_clearqueue
func task_clearqueue(uint64) uint64
type taskCreateQueueRequest struct {
Name string `json:"name"`
Config QueueConfig `json:"config"`
}
type taskEnqueueRequest struct {
QueueName string `json:"queueName"`
Payload []byte `json:"payload"`
}
type taskEnqueueResponse struct {
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
type taskGetRequest struct {
TaskID string `json:"taskId"`
}
type taskGetResponse struct {
Result *TaskInfo `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
type taskCancelRequest struct {
TaskID string `json:"taskId"`
}
type taskClearQueueRequest struct {
QueueName string `json:"queueName"`
}
type taskClearQueueResponse struct {
Result int64 `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
// TaskCreateQueue calls the task_createqueue host function.
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
func TaskCreateQueue(name string, config QueueConfig) error {
// Marshal request to JSON
req := taskCreateQueueRequest{
Name: name,
Config: config,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_createqueue(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse error-only response
var response struct {
Error string `json:"error,omitempty"`
}
if err := json.Unmarshal(responseBytes, &response); err != nil {
return err
}
if response.Error != "" {
return errors.New(response.Error)
}
return nil
}
// TaskEnqueue calls the task_enqueue host function.
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
func TaskEnqueue(queueName string, payload []byte) (string, error) {
// Marshal request to JSON
req := taskEnqueueRequest{
QueueName: queueName,
Payload: payload,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return "", err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_enqueue(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse the response
var response taskEnqueueResponse
if err := json.Unmarshal(responseBytes, &response); err != nil {
return "", err
}
// Convert Error field to Go error
if response.Error != "" {
return "", errors.New(response.Error)
}
return response.Result, nil
}
// TaskGet calls the task_get host function.
// Get returns the current state of a task including its status,
// message, and attempt count.
func TaskGet(taskID string) (*TaskInfo, error) {
// Marshal request to JSON
req := taskGetRequest{
TaskID: taskID,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return nil, err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_get(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse the response
var response taskGetResponse
if err := json.Unmarshal(responseBytes, &response); err != nil {
return nil, err
}
// Convert Error field to Go error
if response.Error != "" {
return nil, errors.New(response.Error)
}
return response.Result, nil
}
// TaskCancel calls the task_cancel host function.
// Cancel cancels a pending task. Returns error if already
// running, completed, or failed.
func TaskCancel(taskID string) error {
// Marshal request to JSON
req := taskCancelRequest{
TaskID: taskID,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_cancel(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse error-only response
var response struct {
Error string `json:"error,omitempty"`
}
if err := json.Unmarshal(responseBytes, &response); err != nil {
return err
}
if response.Error != "" {
return errors.New(response.Error)
}
return nil
}
// TaskClearQueue calls the task_clearqueue host function.
// ClearQueue removes all pending tasks from the named queue.
// Running tasks are not affected. Returns the number of tasks removed.
func TaskClearQueue(queueName string) (int64, error) {
// Marshal request to JSON
req := taskClearQueueRequest{
QueueName: queueName,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return 0, err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_clearqueue(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse the response
var response taskClearQueueResponse
if err := json.Unmarshal(responseBytes, &response); err != nil {
return 0, err
}
// Convert Error field to Go error
if response.Error != "" {
return 0, errors.New(response.Error)
}
return response.Result, nil
}

View File

@@ -0,0 +1,92 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains mock implementations for non-WASM builds.
// These mocks allow IDE support, compilation, and unit testing on non-WASM platforms.
// Plugin authors can use the exported mock instances to set expectations in tests.
//
//go:build !wasip1
package host
import "github.com/stretchr/testify/mock"
// QueueConfig represents the QueueConfig data structure.
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
Concurrency int32 `json:"concurrency"`
MaxRetries int32 `json:"maxRetries"`
BackoffMs int64 `json:"backoffMs"`
DelayMs int64 `json:"delayMs"`
RetentionMs int64 `json:"retentionMs"`
}
// TaskInfo represents the TaskInfo data structure.
// TaskInfo holds the current state of a task.
type TaskInfo struct {
Status string `json:"status"`
Message string `json:"message"`
Attempt int32 `json:"attempt"`
}
// mockTaskService is the mock implementation for testing.
type mockTaskService struct {
mock.Mock
}
// TaskMock is the auto-instantiated mock instance for testing.
// Use this to set expectations: host.TaskMock.On("MethodName", args...).Return(values...)
var TaskMock = &mockTaskService{}
// CreateQueue is the mock method for TaskCreateQueue.
func (m *mockTaskService) CreateQueue(name string, config QueueConfig) error {
args := m.Called(name, config)
return args.Error(0)
}
// TaskCreateQueue delegates to the mock instance.
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
func TaskCreateQueue(name string, config QueueConfig) error {
return TaskMock.CreateQueue(name, config)
}
// Enqueue is the mock method for TaskEnqueue.
func (m *mockTaskService) Enqueue(queueName string, payload []byte) (string, error) {
args := m.Called(queueName, payload)
return args.String(0), args.Error(1)
}
// TaskEnqueue delegates to the mock instance.
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
func TaskEnqueue(queueName string, payload []byte) (string, error) {
return TaskMock.Enqueue(queueName, payload)
}
// Get is the mock method for TaskGet.
func (m *mockTaskService) Get(taskID string) (*TaskInfo, error) {
args := m.Called(taskID)
return args.Get(0).(*TaskInfo), args.Error(1)
}
// TaskGet delegates to the mock instance.
// Get returns the current state of a task including its status,
// message, and attempt count.
func TaskGet(taskID string) (*TaskInfo, error) {
return TaskMock.Get(taskID)
}
// Cancel is the mock method for TaskCancel.
func (m *mockTaskService) Cancel(taskID string) error {
args := m.Called(taskID)
return args.Error(0)
}
// TaskCancel delegates to the mock instance.
// Cancel cancels a pending task. Returns error if already
// running, completed, or failed.
func TaskCancel(taskID string) error {
return TaskMock.Cancel(taskID)
}

View File

@@ -0,0 +1,79 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains export wrappers for the TaskWorker capability.
// It is intended for use in Navidrome plugins built with TinyGo.
//
//go:build wasip1
package taskworker
import (
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
)
// TaskExecuteRequest is the request provided when a task is ready to execute.
type TaskExecuteRequest struct {
// QueueName is the name of the queue this task belongs to.
QueueName string `json:"queueName"`
// TaskID is the unique identifier for this task.
TaskID string `json:"taskId"`
// Payload is the opaque data provided when the task was enqueued.
Payload []byte `json:"payload"`
// Attempt is the current attempt number (1-based: first attempt = 1).
Attempt int32 `json:"attempt"`
}
// TaskWorker is the marker interface for taskworker plugins.
// Implement one or more of the provider interfaces below.
// TaskWorker provides task execution handling.
// This capability allows plugins to receive callbacks when their queued tasks
// are ready to execute. Plugins that use the taskqueue host service must
// implement this capability.
type TaskWorker interface{}
// TaskExecuteProvider provides the OnTaskExecute function.
type TaskExecuteProvider interface {
OnTaskExecute(TaskExecuteRequest) (string, error)
} // Internal implementation holders
var (
taskExecuteImpl func(TaskExecuteRequest) (string, error)
)
// Register registers a taskworker implementation.
// The implementation is checked for optional provider interfaces.
func Register(impl TaskWorker) {
if p, ok := impl.(TaskExecuteProvider); ok {
taskExecuteImpl = p.OnTaskExecute
}
}
// NotImplementedCode is the standard return code for unimplemented functions.
// The host recognizes this and skips the plugin gracefully.
const NotImplementedCode int32 = -2
//go:wasmexport nd_task_execute
func _NdTaskExecute() int32 {
if taskExecuteImpl == nil {
// Return standard code - host will skip this plugin gracefully
return NotImplementedCode
}
var input TaskExecuteRequest
if err := pdk.InputJSON(&input); err != nil {
pdk.SetError(err)
return -1
}
output, err := taskExecuteImpl(input)
if err != nil {
pdk.SetError(err)
return -1
}
if err := pdk.OutputJSON(output); err != nil {
pdk.SetError(err)
return -1
}
return 0
}

View File

@@ -0,0 +1,41 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file provides stub implementations for non-WASM platforms.
// It allows Go plugins to compile and run tests outside of WASM,
// but the actual functionality is only available in WASM builds.
//
//go:build !wasip1
package taskworker
// TaskExecuteRequest is the request provided when a task is ready to execute.
type TaskExecuteRequest struct {
// QueueName is the name of the queue this task belongs to.
QueueName string `json:"queueName"`
// TaskID is the unique identifier for this task.
TaskID string `json:"taskId"`
// Payload is the opaque data provided when the task was enqueued.
Payload []byte `json:"payload"`
// Attempt is the current attempt number (1-based: first attempt = 1).
Attempt int32 `json:"attempt"`
}
// TaskWorker is the marker interface for taskworker plugins.
// Implement one or more of the provider interfaces below.
// TaskWorker provides task execution handling.
// This capability allows plugins to receive callbacks when their queued tasks
// are ready to execute. Plugins that use the taskqueue host service must
// implement this capability.
type TaskWorker interface{}
// TaskExecuteProvider provides the OnTaskExecute function.
type TaskExecuteProvider interface {
OnTaskExecute(TaskExecuteRequest) (string, error)
}
// NotImplementedCode is the standard return code for unimplemented functions.
const NotImplementedCode int32 = -2
// Register is a no-op on non-WASM platforms.
// This stub allows code to compile outside of WASM.
func Register(_ TaskWorker) {}

View File

@@ -26,14 +26,20 @@ def _kvstore_set(offset: int) -> int:
... ...
@extism.import_fn("extism:host/user", "kvstore_setwithttl")
def _kvstore_setwithttl(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "kvstore_get") @extism.import_fn("extism:host/user", "kvstore_get")
def _kvstore_get(offset: int) -> int: def _kvstore_get(offset: int) -> int:
"""Raw host function - do not call directly.""" """Raw host function - do not call directly."""
... ...
@extism.import_fn("extism:host/user", "kvstore_delete") @extism.import_fn("extism:host/user", "kvstore_getmany")
def _kvstore_delete(offset: int) -> int: def _kvstore_getmany(offset: int) -> int:
"""Raw host function - do not call directly.""" """Raw host function - do not call directly."""
... ...
@@ -50,6 +56,18 @@ def _kvstore_list(offset: int) -> int:
... ...
@extism.import_fn("extism:host/user", "kvstore_delete")
def _kvstore_delete(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "kvstore_deletebyprefix")
def _kvstore_deletebyprefix(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "kvstore_getstorageused") @extism.import_fn("extism:host/user", "kvstore_getstorageused")
def _kvstore_getstorageused(offset: int) -> int: def _kvstore_getstorageused(offset: int) -> int:
"""Raw host function - do not call directly.""" """Raw host function - do not call directly."""
@@ -94,6 +112,43 @@ Returns an error if the storage limit would be exceeded or the operation fails.
def kvstore_set_with_ttl(key: str, value: bytes, ttl_seconds: int) -> None:
"""SetWithTTL stores a byte value with the given key and a time-to-live.
After ttlSeconds, the key is treated as non-existent and will be
cleaned up lazily. ttlSeconds must be greater than 0.
Parameters:
- key: The storage key (max 256 bytes, UTF-8)
- value: The byte slice to store
- ttlSeconds: Time-to-live in seconds (must be > 0)
Returns an error if the storage limit would be exceeded or the operation fails.
Args:
key: str parameter.
value: bytes parameter.
ttl_seconds: int parameter.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"key": key,
"value": base64.b64encode(value).decode("ascii"),
"ttlSeconds": ttl_seconds,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _kvstore_setwithttl(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
def kvstore_get(key: str) -> KVStoreGetResult: def kvstore_get(key: str) -> KVStoreGetResult:
"""Get retrieves a byte value from storage. """Get retrieves a byte value from storage.
@@ -129,32 +184,37 @@ Returns the value and whether the key exists.
) )
def kvstore_delete(key: str) -> None: def kvstore_get_many(keys: Any) -> Any:
"""Delete removes a value from storage. """GetMany retrieves multiple values in a single call.
Parameters: Parameters:
- key: The storage key - keys: The storage keys to retrieve
Returns an error if the operation fails. Does not return an error if the key doesn't exist. Returns a map of key to value for keys that exist and have not expired.
Missing or expired keys are omitted from the result.
Args: Args:
key: str parameter. keys: Any parameter.
Returns:
Any: The result value.
Raises: Raises:
HostFunctionError: If the host function returns an error. HostFunctionError: If the host function returns an error.
""" """
request = { request = {
"key": key, "keys": keys,
} }
request_bytes = json.dumps(request).encode("utf-8") request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes) request_mem = extism.memory.alloc(request_bytes)
response_offset = _kvstore_delete(request_mem.offset) response_offset = _kvstore_getmany(request_mem.offset)
response_mem = extism.memory.find(response_offset) response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem)) response = json.loads(extism.memory.string(response_mem))
if response.get("error"): if response.get("error"):
raise HostFunctionError(response["error"]) raise HostFunctionError(response["error"])
return response.get("values", None)
def kvstore_has(key: str) -> bool: def kvstore_has(key: str) -> bool:
@@ -221,6 +281,66 @@ Returns a slice of matching keys.
return response.get("keys", None) return response.get("keys", None)
def kvstore_delete(key: str) -> None:
"""Delete removes a value from storage.
Parameters:
- key: The storage key
Returns an error if the operation fails. Does not return an error if the key doesn't exist.
Args:
key: str parameter.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"key": key,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _kvstore_delete(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
def kvstore_delete_by_prefix(prefix: str) -> int:
"""DeleteByPrefix removes all keys matching the given prefix.
Parameters:
- prefix: Key prefix to match (must not be empty)
Returns the number of keys deleted. Includes expired keys.
Args:
prefix: str parameter.
Returns:
int: The result value.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"prefix": prefix,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _kvstore_deletebyprefix(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
return response.get("deletedCount", 0)
def kvstore_get_storage_used() -> int: def kvstore_get_storage_used() -> int:
"""GetStorageUsed returns the total storage used by this plugin in bytes. """GetStorageUsed returns the total storage used by this plugin in bytes.

View File

@@ -0,0 +1,187 @@
# Code generated by ndpgen. DO NOT EDIT.
#
# This file contains client wrappers for the Task host service.
# It is intended for use in Navidrome plugins built with extism-py.
#
# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly.
# The @extism.import_fn decorators are only detected when defined in the plugin's
# main __init__.py file. Copy the needed functions from this file into your plugin.
from dataclasses import dataclass
from typing import Any
import extism
import json
import base64
class HostFunctionError(Exception):
"""Raised when a host function returns an error."""
pass
@extism.import_fn("extism:host/user", "task_createqueue")
def _task_createqueue(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "task_enqueue")
def _task_enqueue(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "task_get")
def _task_get(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "task_cancel")
def _task_cancel(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "task_clearqueue")
def _task_clearqueue(offset: int) -> int:
"""Raw host function - do not call directly."""
...
def task_create_queue(name: str, config: Any) -> None:
"""CreateQueue creates a named task queue with the given configuration.
Zero-value fields in config use sensible defaults.
If a queue with the same name already exists, returns an error.
On startup, this also recovers any stale "running" tasks from a previous crash.
Args:
name: str parameter.
config: Any parameter.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"name": name,
"config": config,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_createqueue(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
def task_enqueue(queue_name: str, payload: bytes) -> str:
"""Enqueue adds a task to the named queue. Returns the task ID.
payload is opaque bytes passed back to the plugin on execution.
Args:
queue_name: str parameter.
payload: bytes parameter.
Returns:
str: The result value.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"queueName": queue_name,
"payload": base64.b64encode(payload).decode("ascii"),
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_enqueue(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
return response.get("result", "")
def task_get(task_id: str) -> Any:
"""Get returns the current state of a task including its status,
message, and attempt count.
Args:
task_id: str parameter.
Returns:
Any: The result value.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"taskId": task_id,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_get(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
return response.get("result", None)
def task_cancel(task_id: str) -> None:
"""Cancel cancels a pending task. Returns error if already
running, completed, or failed.
Args:
task_id: str parameter.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"taskId": task_id,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_cancel(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
def task_clear_queue(queue_name: str) -> int:
"""ClearQueue removes all pending tasks from the named queue.
Running tasks are not affected. Returns the number of tasks removed.
Args:
queue_name: str parameter.
Returns:
int: The number of tasks removed.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"queueName": queue_name,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_clearqueue(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
return response.get("result", 0)

View File

@@ -9,4 +9,5 @@ pub mod lifecycle;
pub mod metadata; pub mod metadata;
pub mod scheduler; pub mod scheduler;
pub mod scrobbler; pub mod scrobbler;
pub mod taskworker;
pub mod websocket; pub mod websocket;

View File

@@ -0,0 +1,102 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains export wrappers for the TaskWorker capability.
// It is intended for use in Navidrome plugins built with extism-pdk.
use serde::{Deserialize, Serialize};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
mod base64_bytes {
use serde::{self, Deserialize, Deserializer, Serializer};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
BASE64.decode(&s).map_err(serde::de::Error::custom)
}
}
// Helper functions for skip_serializing_if with numeric types
#[allow(dead_code)]
fn is_zero_i32(value: &i32) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_u32(value: &u32) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_i64(value: &i64) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_u64(value: &u64) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_f32(value: &f32) -> bool { *value == 0.0 }
#[allow(dead_code)]
fn is_zero_f64(value: &f64) -> bool { *value == 0.0 }
/// TaskExecuteRequest is the request provided when a task is ready to execute.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskExecuteRequest {
/// QueueName is the name of the queue this task belongs to.
#[serde(default)]
pub queue_name: String,
/// TaskID is the unique identifier for this task.
#[serde(default)]
pub task_id: String,
/// Payload is the opaque data provided when the task was enqueued.
#[serde(default)]
#[serde(with = "base64_bytes")]
pub payload: Vec<u8>,
/// Attempt is the current attempt number (1-based: first attempt = 1).
#[serde(default)]
pub attempt: i32,
}
/// Error represents an error from a capability method.
#[derive(Debug)]
pub struct Error {
pub message: String,
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.message)
}
}
impl std::error::Error for Error {}
impl Error {
pub fn new(message: impl Into<String>) -> Self {
Self { message: message.into() }
}
}
/// TaskExecuteProvider provides the OnTaskExecute function.
pub trait TaskExecuteProvider {
fn on_task_execute(&self, req: TaskExecuteRequest) -> Result<String, Error>;
}
/// Register the on_task_execute export.
/// This macro generates the WASM export function for this method.
#[macro_export]
macro_rules! register_taskworker_task_execute {
($plugin_type:ty) => {
#[extism_pdk::plugin_fn]
pub fn nd_task_execute(
req: extism_pdk::Json<$crate::taskworker::TaskExecuteRequest>
) -> extism_pdk::FnResult<extism_pdk::Json<String>> {
let plugin = <$plugin_type>::default();
let result = $crate::taskworker::TaskExecuteProvider::on_task_execute(&plugin, req.into_inner())?;
Ok(extism_pdk::Json(result))
}
};
}

View File

@@ -40,6 +40,7 @@
//! - [`library`] - provides access to music library metadata for plugins. //! - [`library`] - provides access to music library metadata for plugins.
//! - [`scheduler`] - provides task scheduling capabilities for plugins. //! - [`scheduler`] - provides task scheduling capabilities for plugins.
//! - [`subsonicapi`] - provides access to Navidrome's Subsonic API from plugins. //! - [`subsonicapi`] - provides access to Navidrome's Subsonic API from plugins.
//! - [`task`] - provides persistent task queues for plugins.
//! - [`users`] - provides access to user information for plugins. //! - [`users`] - provides access to user information for plugins.
//! - [`websocket`] - provides WebSocket communication capabilities for plugins. //! - [`websocket`] - provides WebSocket communication capabilities for plugins.
@@ -99,6 +100,13 @@ pub mod subsonicapi {
pub use super::nd_host_subsonicapi::*; pub use super::nd_host_subsonicapi::*;
} }
#[doc(hidden)]
mod nd_host_task;
/// provides persistent task queues for plugins.
pub mod task {
pub use super::nd_host_task::*;
}
#[doc(hidden)] #[doc(hidden)]
mod nd_host_users; mod nd_host_users;
/// provides access to user information for plugins. /// provides access to user information for plugins.

View File

@@ -44,6 +44,22 @@ struct KVStoreSetResponse {
error: Option<String>, error: Option<String>,
} }
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct KVStoreSetWithTTLRequest {
key: String,
#[serde(with = "base64_bytes")]
value: Vec<u8>,
ttl_seconds: i64,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct KVStoreSetWithTTLResponse {
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct KVStoreGetRequest { struct KVStoreGetRequest {
@@ -64,13 +80,15 @@ struct KVStoreGetResponse {
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct KVStoreDeleteRequest { struct KVStoreGetManyRequest {
key: String, keys: Vec<String>,
} }
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct KVStoreDeleteResponse { struct KVStoreGetManyResponse {
#[serde(default)]
values: std::collections::HashMap<String, Vec<u8>>,
#[serde(default)] #[serde(default)]
error: Option<String>, error: Option<String>,
} }
@@ -105,6 +123,34 @@ struct KVStoreListResponse {
error: Option<String>, error: Option<String>,
} }
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct KVStoreDeleteRequest {
key: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct KVStoreDeleteResponse {
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct KVStoreDeleteByPrefixRequest {
prefix: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct KVStoreDeleteByPrefixResponse {
#[serde(default)]
deleted_count: i64,
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct KVStoreGetStorageUsedResponse { struct KVStoreGetStorageUsedResponse {
@@ -117,10 +163,13 @@ struct KVStoreGetStorageUsedResponse {
#[host_fn] #[host_fn]
extern "ExtismHost" { extern "ExtismHost" {
fn kvstore_set(input: Json<KVStoreSetRequest>) -> Json<KVStoreSetResponse>; fn kvstore_set(input: Json<KVStoreSetRequest>) -> Json<KVStoreSetResponse>;
fn kvstore_setwithttl(input: Json<KVStoreSetWithTTLRequest>) -> Json<KVStoreSetWithTTLResponse>;
fn kvstore_get(input: Json<KVStoreGetRequest>) -> Json<KVStoreGetResponse>; fn kvstore_get(input: Json<KVStoreGetRequest>) -> Json<KVStoreGetResponse>;
fn kvstore_delete(input: Json<KVStoreDeleteRequest>) -> Json<KVStoreDeleteResponse>; fn kvstore_getmany(input: Json<KVStoreGetManyRequest>) -> Json<KVStoreGetManyResponse>;
fn kvstore_has(input: Json<KVStoreHasRequest>) -> Json<KVStoreHasResponse>; fn kvstore_has(input: Json<KVStoreHasRequest>) -> Json<KVStoreHasResponse>;
fn kvstore_list(input: Json<KVStoreListRequest>) -> Json<KVStoreListResponse>; fn kvstore_list(input: Json<KVStoreListRequest>) -> Json<KVStoreListResponse>;
fn kvstore_delete(input: Json<KVStoreDeleteRequest>) -> Json<KVStoreDeleteResponse>;
fn kvstore_deletebyprefix(input: Json<KVStoreDeleteByPrefixRequest>) -> Json<KVStoreDeleteByPrefixResponse>;
fn kvstore_getstorageused(input: Json<serde_json::Value>) -> Json<KVStoreGetStorageUsedResponse>; fn kvstore_getstorageused(input: Json<serde_json::Value>) -> Json<KVStoreGetStorageUsedResponse>;
} }
@@ -153,6 +202,41 @@ pub fn set(key: &str, value: Vec<u8>) -> Result<(), Error> {
Ok(()) Ok(())
} }
/// SetWithTTL stores a byte value with the given key and a time-to-live.
///
/// After ttlSeconds, the key is treated as non-existent and will be
/// cleaned up lazily. ttlSeconds must be greater than 0.
///
/// Parameters:
/// - key: The storage key (max 256 bytes, UTF-8)
/// - value: The byte slice to store
/// - ttlSeconds: Time-to-live in seconds (must be > 0)
///
/// Returns an error if the storage limit would be exceeded or the operation fails.
///
/// # Arguments
/// * `key` - String parameter.
/// * `value` - Vec<u8> parameter.
/// * `ttl_seconds` - i64 parameter.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn set_with_ttl(key: &str, value: Vec<u8>, ttl_seconds: i64) -> Result<(), Error> {
let response = unsafe {
kvstore_setwithttl(Json(KVStoreSetWithTTLRequest {
key: key.to_owned(),
value: value,
ttl_seconds: ttl_seconds,
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(())
}
/// Get retrieves a byte value from storage. /// Get retrieves a byte value from storage.
/// ///
/// Parameters: /// Parameters:
@@ -186,22 +270,26 @@ pub fn get(key: &str) -> Result<Option<Vec<u8>>, Error> {
} }
} }
/// Delete removes a value from storage. /// GetMany retrieves multiple values in a single call.
/// ///
/// Parameters: /// Parameters:
/// - key: The storage key /// - keys: The storage keys to retrieve
/// ///
/// Returns an error if the operation fails. Does not return an error if the key doesn't exist. /// Returns a map of key to value for keys that exist and have not expired.
/// Missing or expired keys are omitted from the result.
/// ///
/// # Arguments /// # Arguments
/// * `key` - String parameter. /// * `keys` - Vec<String> parameter.
///
/// # Returns
/// The values value.
/// ///
/// # Errors /// # Errors
/// Returns an error if the host function call fails. /// Returns an error if the host function call fails.
pub fn delete(key: &str) -> Result<(), Error> { pub fn get_many(keys: Vec<String>) -> Result<std::collections::HashMap<String, Vec<u8>>, Error> {
let response = unsafe { let response = unsafe {
kvstore_delete(Json(KVStoreDeleteRequest { kvstore_getmany(Json(KVStoreGetManyRequest {
key: key.to_owned(), keys: keys,
}))? }))?
}; };
@@ -209,7 +297,7 @@ pub fn delete(key: &str) -> Result<(), Error> {
return Err(Error::msg(err)); return Err(Error::msg(err));
} }
Ok(()) Ok(response.0.values)
} }
/// Has checks if a key exists in storage. /// Has checks if a key exists in storage.
@@ -270,6 +358,61 @@ pub fn list(prefix: &str) -> Result<Vec<String>, Error> {
Ok(response.0.keys) Ok(response.0.keys)
} }
/// Delete removes a value from storage.
///
/// Parameters:
/// - key: The storage key
///
/// Returns an error if the operation fails. Does not return an error if the key doesn't exist.
///
/// # Arguments
/// * `key` - String parameter.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn delete(key: &str) -> Result<(), Error> {
let response = unsafe {
kvstore_delete(Json(KVStoreDeleteRequest {
key: key.to_owned(),
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(())
}
/// DeleteByPrefix removes all keys matching the given prefix.
///
/// Parameters:
/// - prefix: Key prefix to match (must not be empty)
///
/// Returns the number of keys deleted. Includes expired keys.
///
/// # Arguments
/// * `prefix` - String parameter.
///
/// # Returns
/// The deleted_count value.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn delete_by_prefix(prefix: &str) -> Result<i64, Error> {
let response = unsafe {
kvstore_deletebyprefix(Json(KVStoreDeleteByPrefixRequest {
prefix: prefix.to_owned(),
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(response.0.deleted_count)
}
/// GetStorageUsed returns the total storage used by this plugin in bytes. /// GetStorageUsed returns the total storage used by this plugin in bytes.
/// ///
/// # Returns /// # Returns

View File

@@ -0,0 +1,258 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains client wrappers for the Task host service.
// It is intended for use in Navidrome plugins built with extism-pdk.
use extism_pdk::*;
use serde::{Deserialize, Serialize};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
mod base64_bytes {
use serde::{self, Deserialize, Deserializer, Serializer};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
BASE64.decode(&s).map_err(serde::de::Error::custom)
}
}
/// QueueConfig holds configuration for a task queue.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct QueueConfig {
pub concurrency: i32,
pub max_retries: i32,
pub backoff_ms: i64,
pub delay_ms: i64,
pub retention_ms: i64,
}
/// TaskInfo holds the current state of a task.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskInfo {
pub status: String,
pub message: String,
pub attempt: i32,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskCreateQueueRequest {
name: String,
config: QueueConfig,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskCreateQueueResponse {
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskEnqueueRequest {
queue_name: String,
#[serde(with = "base64_bytes")]
payload: Vec<u8>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskEnqueueResponse {
#[serde(default)]
result: String,
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskGetRequest {
task_id: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskGetResponse {
#[serde(default)]
result: Option<TaskInfo>,
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskCancelRequest {
task_id: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskCancelResponse {
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskClearQueueRequest {
queue_name: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskClearQueueResponse {
#[serde(default)]
result: i64,
#[serde(default)]
error: Option<String>,
}
#[host_fn]
extern "ExtismHost" {
fn task_createqueue(input: Json<TaskCreateQueueRequest>) -> Json<TaskCreateQueueResponse>;
fn task_enqueue(input: Json<TaskEnqueueRequest>) -> Json<TaskEnqueueResponse>;
fn task_get(input: Json<TaskGetRequest>) -> Json<TaskGetResponse>;
fn task_cancel(input: Json<TaskCancelRequest>) -> Json<TaskCancelResponse>;
fn task_clearqueue(input: Json<TaskClearQueueRequest>) -> Json<TaskClearQueueResponse>;
}
/// CreateQueue creates a named task queue with the given configuration.
/// Zero-value fields in config use sensible defaults.
/// If a queue with the same name already exists, returns an error.
/// On startup, this also recovers any stale "running" tasks from a previous crash.
///
/// # Arguments
/// * `name` - String parameter.
/// * `config` - QueueConfig parameter.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn create_queue(name: &str, config: QueueConfig) -> Result<(), Error> {
let response = unsafe {
task_createqueue(Json(TaskCreateQueueRequest {
name: name.to_owned(),
config: config,
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(())
}
/// Enqueue adds a task to the named queue. Returns the task ID.
/// payload is opaque bytes passed back to the plugin on execution.
///
/// # Arguments
/// * `queue_name` - String parameter.
/// * `payload` - Vec<u8> parameter.
///
/// # Returns
/// The result value.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn enqueue(queue_name: &str, payload: Vec<u8>) -> Result<String, Error> {
let response = unsafe {
task_enqueue(Json(TaskEnqueueRequest {
queue_name: queue_name.to_owned(),
payload: payload,
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(response.0.result)
}
/// Get returns the current state of a task including its status,
/// message, and attempt count.
///
/// # Arguments
/// * `task_id` - String parameter.
///
/// # Returns
/// The result value.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn get(task_id: &str) -> Result<Option<TaskInfo>, Error> {
let response = unsafe {
task_get(Json(TaskGetRequest {
task_id: task_id.to_owned(),
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(response.0.result)
}
/// Cancel cancels a pending task. Returns error if already
/// running, completed, or failed.
///
/// # Arguments
/// * `task_id` - String parameter.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn cancel(task_id: &str) -> Result<(), Error> {
let response = unsafe {
task_cancel(Json(TaskCancelRequest {
task_id: task_id.to_owned(),
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(())
}
/// ClearQueue removes all pending tasks from the named queue.
/// Running tasks are not affected. Returns the number of tasks removed.
///
/// # Arguments
/// * `queue_name` - String parameter.
///
/// # Returns
/// The number of tasks removed.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn clear_queue(queue_name: &str) -> Result<i64, Error> {
let response = unsafe {
task_clearqueue(Json(TaskClearQueueRequest {
queue_name: queue_name.to_owned(),
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(response.0.result)
}

View File

@@ -9,19 +9,23 @@ import (
// TestKVStoreInput is the input for nd_test_kvstore callback. // TestKVStoreInput is the input for nd_test_kvstore callback.
type TestKVStoreInput struct { type TestKVStoreInput struct {
Operation string `json:"operation"` // "set", "get", "delete", "has", "list", "get_storage_used" Operation string `json:"operation"` // "set", "get", "delete", "has", "list", "get_storage_used", "set_with_ttl", "delete_by_prefix", "get_many"
Key string `json:"key"` // Storage key Key string `json:"key"` // Storage key
Value []byte `json:"value"` // For set operations Value []byte `json:"value"` // For set operations
Prefix string `json:"prefix"` // For list operation Prefix string `json:"prefix"` // For list/delete_by_prefix operations
TTLSeconds int64 `json:"ttl_seconds,omitempty"` // For set_with_ttl
Keys []string `json:"keys,omitempty"` // For get_many
} }
// TestKVStoreOutput is the output from nd_test_kvstore callback. // TestKVStoreOutput is the output from nd_test_kvstore callback.
type TestKVStoreOutput struct { type TestKVStoreOutput struct {
Value []byte `json:"value,omitempty"` Value []byte `json:"value,omitempty"`
Exists bool `json:"exists,omitempty"` Values map[string][]byte `json:"values,omitempty"`
Keys []string `json:"keys,omitempty"` Exists bool `json:"exists,omitempty"`
StorageUsed int64 `json:"storage_used,omitempty"` Keys []string `json:"keys,omitempty"`
Error *string `json:"error,omitempty"` StorageUsed int64 `json:"storage_used,omitempty"`
DeletedCount int64 `json:"deleted_count,omitempty"`
Error *string `json:"error,omitempty"`
} }
// nd_test_kvstore is the test callback that tests the kvstore host functions. // nd_test_kvstore is the test callback that tests the kvstore host functions.
@@ -96,6 +100,36 @@ func ndTestKVStore() int32 {
pdk.OutputJSON(TestKVStoreOutput{StorageUsed: bytesUsed}) pdk.OutputJSON(TestKVStoreOutput{StorageUsed: bytesUsed})
return 0 return 0
case "set_with_ttl":
err := host.KVStoreSetWithTTL(input.Key, input.Value, input.TTLSeconds)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestKVStoreOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestKVStoreOutput{})
return 0
case "delete_by_prefix":
deletedCount, err := host.KVStoreDeleteByPrefix(input.Prefix)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestKVStoreOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestKVStoreOutput{DeletedCount: deletedCount})
return 0
case "get_many":
values, err := host.KVStoreGetMany(input.Keys)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestKVStoreOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestKVStoreOutput{Values: values})
return 0
default: default:
errStr := "unknown operation: " + input.Operation errStr := "unknown operation: " + input.Operation
pdk.OutputJSON(TestKVStoreOutput{Error: &errStr}) pdk.OutputJSON(TestKVStoreOutput{Error: &errStr})

16
plugins/testdata/test-taskqueue/go.mod vendored Normal file
View File

@@ -0,0 +1,16 @@
module test-taskqueue
go 1.25
require github.com/navidrome/navidrome/plugins/pdk/go v0.0.0
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/extism/go-pdk v1.1.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/stretchr/testify v1.11.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace github.com/navidrome/navidrome/plugins/pdk/go => ../../pdk/go

14
plugins/testdata/test-taskqueue/go.sum vendored Normal file
View File

@@ -0,0 +1,14 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/extism/go-pdk v1.1.3 h1:hfViMPWrqjN6u67cIYRALZTZLk/enSPpNKa+rZ9X2SQ=
github.com/extism/go-pdk v1.1.3/go.mod h1:Gz+LIU/YCKnKXhgge8yo5Yu1F/lbv7KtKFkiCSzW/P4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

114
plugins/testdata/test-taskqueue/main.go vendored Normal file
View File

@@ -0,0 +1,114 @@
// Test TaskQueue plugin for Navidrome plugin system integration tests.
// Build with: tinygo build -o ../test-taskqueue.wasm -target wasip1 -buildmode=c-shared .
package main
import (
"fmt"
"github.com/navidrome/navidrome/plugins/pdk/go/host"
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
"github.com/navidrome/navidrome/plugins/pdk/go/taskworker"
)
func init() {
taskworker.Register(&handler{})
}
type handler struct{}
func (h *handler) OnTaskExecute(req taskworker.TaskExecuteRequest) (string, error) {
payload := string(req.Payload)
if payload == "fail" {
return "", fmt.Errorf("task failed as instructed")
}
if payload == "fail-then-succeed" && req.Attempt < 2 {
return "", fmt.Errorf("transient failure")
}
return "completed successfully", nil
}
// Test helper types
type TestInput struct {
Operation string `json:"operation"`
QueueName string `json:"queueName,omitempty"`
Config *host.QueueConfig `json:"config,omitempty"`
Payload []byte `json:"payload,omitempty"`
TaskID string `json:"taskId,omitempty"`
}
type TestOutput struct {
TaskID string `json:"taskId,omitempty"`
Status string `json:"status,omitempty"`
Message string `json:"message,omitempty"`
Attempt int32 `json:"attempt,omitempty"`
Cleared int64 `json:"cleared,omitempty"`
Error *string `json:"error,omitempty"`
}
//go:wasmexport nd_test_taskqueue
func ndTestTaskQueue() int32 {
var input TestInput
if err := pdk.InputJSON(&input); err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
switch input.Operation {
case "create_queue":
config := host.QueueConfig{}
if input.Config != nil {
config = *input.Config
}
err := host.TaskCreateQueue(input.QueueName, config)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{})
case "enqueue":
taskID, err := host.TaskEnqueue(input.QueueName, input.Payload)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{TaskID: taskID})
case "get_task_status":
info, err := host.TaskGet(input.TaskID)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{Status: info.Status, Message: info.Message, Attempt: info.Attempt})
case "cancel_task":
err := host.TaskCancel(input.TaskID)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{})
case "clear_queue":
cleared, err := host.TaskClearQueue(input.QueueName)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{Cleared: cleared})
default:
errStr := "unknown operation: " + input.Operation
pdk.OutputJSON(TestOutput{Error: &errStr})
}
return 0
}
func main() {}

View File

@@ -0,0 +1,12 @@
{
"name": "Test TaskQueue Plugin",
"author": "Navidrome Test",
"version": "1.0.0",
"description": "A test plugin for TaskQueue integration testing",
"permissions": {
"taskqueue": {
"reason": "For testing task queue operations",
"maxConcurrency": 10
}
}
}