mirror of
https://github.com/navidrome/navidrome.git
synced 2026-03-01 13:29:14 -05:00
Compare commits
19 Commits
feat/lyric
...
feat/plugi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a91fe5ab5b | ||
|
|
da9408958e | ||
|
|
f93a869368 | ||
|
|
dc81fcdff4 | ||
|
|
d40c6f4faa | ||
|
|
421c0732bd | ||
|
|
69f5ede61d | ||
|
|
7bb5e5d38b | ||
|
|
be538fcec8 | ||
|
|
bf850d600e | ||
|
|
e9f265d7c5 | ||
|
|
f076a60def | ||
|
|
ae8bd09bb3 | ||
|
|
60a9fd61ba | ||
|
|
690203f192 | ||
|
|
36f17920cd | ||
|
|
d666b9ed0e | ||
|
|
b187312bce | ||
|
|
2471bb9cf6 |
27
plugins/capabilities/taskworker.go
Normal file
27
plugins/capabilities/taskworker.go
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
package capabilities
|
||||||
|
|
||||||
|
// TaskWorker provides task execution handling.
|
||||||
|
// This capability allows plugins to receive callbacks when their queued tasks
|
||||||
|
// are ready to execute. Plugins that use the taskqueue host service must
|
||||||
|
// implement this capability.
|
||||||
|
//
|
||||||
|
//nd:capability name=taskworker
|
||||||
|
type TaskWorker interface {
|
||||||
|
// OnTaskExecute is called when a queued task is ready to run.
|
||||||
|
// The returned string is a status/result message stored in the tasks table.
|
||||||
|
// Return an error to trigger retry (if retries are configured).
|
||||||
|
//nd:export name=nd_task_execute
|
||||||
|
OnTaskExecute(TaskExecuteRequest) (string, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskExecuteRequest is the request provided when a task is ready to execute.
|
||||||
|
type TaskExecuteRequest struct {
|
||||||
|
// QueueName is the name of the queue this task belongs to.
|
||||||
|
QueueName string `json:"queueName"`
|
||||||
|
// TaskID is the unique identifier for this task.
|
||||||
|
TaskID string `json:"taskId"`
|
||||||
|
// Payload is the opaque data provided when the task was enqueued.
|
||||||
|
Payload []byte `json:"payload"`
|
||||||
|
// Attempt is the current attempt number (1-based: first attempt = 1).
|
||||||
|
Attempt int32 `json:"attempt"`
|
||||||
|
}
|
||||||
38
plugins/capabilities/taskworker.yaml
Normal file
38
plugins/capabilities/taskworker.yaml
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
version: v1-draft
|
||||||
|
exports:
|
||||||
|
nd_task_execute:
|
||||||
|
description: |-
|
||||||
|
OnTaskExecute is called when a queued task is ready to run.
|
||||||
|
The returned string is a status/result message stored in the tasks table.
|
||||||
|
Return an error to trigger retry (if retries are configured).
|
||||||
|
input:
|
||||||
|
$ref: '#/components/schemas/TaskExecuteRequest'
|
||||||
|
contentType: application/json
|
||||||
|
output:
|
||||||
|
type: string
|
||||||
|
contentType: application/json
|
||||||
|
components:
|
||||||
|
schemas:
|
||||||
|
TaskExecuteRequest:
|
||||||
|
description: TaskExecuteRequest is the request provided when a task is ready to execute.
|
||||||
|
properties:
|
||||||
|
queueName:
|
||||||
|
type: string
|
||||||
|
description: QueueName is the name of the queue this task belongs to.
|
||||||
|
taskId:
|
||||||
|
type: string
|
||||||
|
description: TaskID is the unique identifier for this task.
|
||||||
|
payload:
|
||||||
|
type: array
|
||||||
|
description: Payload is the opaque data provided when the task was enqueued.
|
||||||
|
items:
|
||||||
|
type: object
|
||||||
|
attempt:
|
||||||
|
type: integer
|
||||||
|
format: int32
|
||||||
|
description: 'Attempt is the current attempt number (1-based: first attempt = 1).'
|
||||||
|
required:
|
||||||
|
- queueName
|
||||||
|
- taskId
|
||||||
|
- payload
|
||||||
|
- attempt
|
||||||
@@ -23,6 +23,20 @@ type KVStoreService interface {
|
|||||||
//nd:hostfunc
|
//nd:hostfunc
|
||||||
Set(ctx context.Context, key string, value []byte) error
|
Set(ctx context.Context, key string, value []byte) error
|
||||||
|
|
||||||
|
// SetWithTTL stores a byte value with the given key and a time-to-live.
|
||||||
|
//
|
||||||
|
// After ttlSeconds, the key is treated as non-existent and will be
|
||||||
|
// cleaned up lazily. ttlSeconds must be greater than 0.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - key: The storage key (max 256 bytes, UTF-8)
|
||||||
|
// - value: The byte slice to store
|
||||||
|
// - ttlSeconds: Time-to-live in seconds (must be > 0)
|
||||||
|
//
|
||||||
|
// Returns an error if the storage limit would be exceeded or the operation fails.
|
||||||
|
//nd:hostfunc
|
||||||
|
SetWithTTL(ctx context.Context, key string, value []byte, ttlSeconds int64) error
|
||||||
|
|
||||||
// Get retrieves a byte value from storage.
|
// Get retrieves a byte value from storage.
|
||||||
//
|
//
|
||||||
// Parameters:
|
// Parameters:
|
||||||
@@ -32,14 +46,15 @@ type KVStoreService interface {
|
|||||||
//nd:hostfunc
|
//nd:hostfunc
|
||||||
Get(ctx context.Context, key string) (value []byte, exists bool, err error)
|
Get(ctx context.Context, key string) (value []byte, exists bool, err error)
|
||||||
|
|
||||||
// Delete removes a value from storage.
|
// GetMany retrieves multiple values in a single call.
|
||||||
//
|
//
|
||||||
// Parameters:
|
// Parameters:
|
||||||
// - key: The storage key
|
// - keys: The storage keys to retrieve
|
||||||
//
|
//
|
||||||
// Returns an error if the operation fails. Does not return an error if the key doesn't exist.
|
// Returns a map of key to value for keys that exist and have not expired.
|
||||||
|
// Missing or expired keys are omitted from the result.
|
||||||
//nd:hostfunc
|
//nd:hostfunc
|
||||||
Delete(ctx context.Context, key string) error
|
GetMany(ctx context.Context, keys []string) (values map[string][]byte, err error)
|
||||||
|
|
||||||
// Has checks if a key exists in storage.
|
// Has checks if a key exists in storage.
|
||||||
//
|
//
|
||||||
@@ -59,6 +74,24 @@ type KVStoreService interface {
|
|||||||
//nd:hostfunc
|
//nd:hostfunc
|
||||||
List(ctx context.Context, prefix string) (keys []string, err error)
|
List(ctx context.Context, prefix string) (keys []string, err error)
|
||||||
|
|
||||||
|
// Delete removes a value from storage.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - key: The storage key
|
||||||
|
//
|
||||||
|
// Returns an error if the operation fails. Does not return an error if the key doesn't exist.
|
||||||
|
//nd:hostfunc
|
||||||
|
Delete(ctx context.Context, key string) error
|
||||||
|
|
||||||
|
// DeleteByPrefix removes all keys matching the given prefix.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - prefix: Key prefix to match (must not be empty)
|
||||||
|
//
|
||||||
|
// Returns the number of keys deleted. Includes expired keys.
|
||||||
|
//nd:hostfunc
|
||||||
|
DeleteByPrefix(ctx context.Context, prefix string) (deletedCount int64, err error)
|
||||||
|
|
||||||
// GetStorageUsed returns the total storage used by this plugin in bytes.
|
// GetStorageUsed returns the total storage used by this plugin in bytes.
|
||||||
//nd:hostfunc
|
//nd:hostfunc
|
||||||
GetStorageUsed(ctx context.Context) (bytes int64, err error)
|
GetStorageUsed(ctx context.Context) (bytes int64, err error)
|
||||||
|
|||||||
@@ -20,6 +20,18 @@ type KVStoreSetResponse struct {
|
|||||||
Error string `json:"error,omitempty"`
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// KVStoreSetWithTTLRequest is the request type for KVStore.SetWithTTL.
|
||||||
|
type KVStoreSetWithTTLRequest struct {
|
||||||
|
Key string `json:"key"`
|
||||||
|
Value []byte `json:"value"`
|
||||||
|
TtlSeconds int64 `json:"ttlSeconds"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVStoreSetWithTTLResponse is the response type for KVStore.SetWithTTL.
|
||||||
|
type KVStoreSetWithTTLResponse struct {
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
// KVStoreGetRequest is the request type for KVStore.Get.
|
// KVStoreGetRequest is the request type for KVStore.Get.
|
||||||
type KVStoreGetRequest struct {
|
type KVStoreGetRequest struct {
|
||||||
Key string `json:"key"`
|
Key string `json:"key"`
|
||||||
@@ -32,14 +44,15 @@ type KVStoreGetResponse struct {
|
|||||||
Error string `json:"error,omitempty"`
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// KVStoreDeleteRequest is the request type for KVStore.Delete.
|
// KVStoreGetManyRequest is the request type for KVStore.GetMany.
|
||||||
type KVStoreDeleteRequest struct {
|
type KVStoreGetManyRequest struct {
|
||||||
Key string `json:"key"`
|
Keys []string `json:"keys"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// KVStoreDeleteResponse is the response type for KVStore.Delete.
|
// KVStoreGetManyResponse is the response type for KVStore.GetMany.
|
||||||
type KVStoreDeleteResponse struct {
|
type KVStoreGetManyResponse struct {
|
||||||
Error string `json:"error,omitempty"`
|
Values map[string][]byte `json:"values,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// KVStoreHasRequest is the request type for KVStore.Has.
|
// KVStoreHasRequest is the request type for KVStore.Has.
|
||||||
@@ -64,6 +77,27 @@ type KVStoreListResponse struct {
|
|||||||
Error string `json:"error,omitempty"`
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// KVStoreDeleteRequest is the request type for KVStore.Delete.
|
||||||
|
type KVStoreDeleteRequest struct {
|
||||||
|
Key string `json:"key"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVStoreDeleteResponse is the response type for KVStore.Delete.
|
||||||
|
type KVStoreDeleteResponse struct {
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVStoreDeleteByPrefixRequest is the request type for KVStore.DeleteByPrefix.
|
||||||
|
type KVStoreDeleteByPrefixRequest struct {
|
||||||
|
Prefix string `json:"prefix"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVStoreDeleteByPrefixResponse is the response type for KVStore.DeleteByPrefix.
|
||||||
|
type KVStoreDeleteByPrefixResponse struct {
|
||||||
|
DeletedCount int64 `json:"deletedCount,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
// KVStoreGetStorageUsedResponse is the response type for KVStore.GetStorageUsed.
|
// KVStoreGetStorageUsedResponse is the response type for KVStore.GetStorageUsed.
|
||||||
type KVStoreGetStorageUsedResponse struct {
|
type KVStoreGetStorageUsedResponse struct {
|
||||||
Bytes int64 `json:"bytes,omitempty"`
|
Bytes int64 `json:"bytes,omitempty"`
|
||||||
@@ -75,10 +109,13 @@ type KVStoreGetStorageUsedResponse struct {
|
|||||||
func RegisterKVStoreHostFunctions(service KVStoreService) []extism.HostFunction {
|
func RegisterKVStoreHostFunctions(service KVStoreService) []extism.HostFunction {
|
||||||
return []extism.HostFunction{
|
return []extism.HostFunction{
|
||||||
newKVStoreSetHostFunction(service),
|
newKVStoreSetHostFunction(service),
|
||||||
|
newKVStoreSetWithTTLHostFunction(service),
|
||||||
newKVStoreGetHostFunction(service),
|
newKVStoreGetHostFunction(service),
|
||||||
newKVStoreDeleteHostFunction(service),
|
newKVStoreGetManyHostFunction(service),
|
||||||
newKVStoreHasHostFunction(service),
|
newKVStoreHasHostFunction(service),
|
||||||
newKVStoreListHostFunction(service),
|
newKVStoreListHostFunction(service),
|
||||||
|
newKVStoreDeleteHostFunction(service),
|
||||||
|
newKVStoreDeleteByPrefixHostFunction(service),
|
||||||
newKVStoreGetStorageUsedHostFunction(service),
|
newKVStoreGetStorageUsedHostFunction(service),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -114,6 +151,37 @@ func newKVStoreSetHostFunction(service KVStoreService) extism.HostFunction {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newKVStoreSetWithTTLHostFunction(service KVStoreService) extism.HostFunction {
|
||||||
|
return extism.NewHostFunctionWithStack(
|
||||||
|
"kvstore_setwithttl",
|
||||||
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||||
|
// Read JSON request from plugin memory
|
||||||
|
reqBytes, err := p.ReadBytes(stack[0])
|
||||||
|
if err != nil {
|
||||||
|
kvstoreWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req KVStoreSetWithTTLRequest
|
||||||
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||||
|
kvstoreWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the service method
|
||||||
|
if svcErr := service.SetWithTTL(ctx, req.Key, req.Value, req.TtlSeconds); svcErr != nil {
|
||||||
|
kvstoreWriteError(p, stack, svcErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write JSON response to plugin memory
|
||||||
|
resp := KVStoreSetWithTTLResponse{}
|
||||||
|
kvstoreWriteResponse(p, stack, resp)
|
||||||
|
},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
func newKVStoreGetHostFunction(service KVStoreService) extism.HostFunction {
|
func newKVStoreGetHostFunction(service KVStoreService) extism.HostFunction {
|
||||||
return extism.NewHostFunctionWithStack(
|
return extism.NewHostFunctionWithStack(
|
||||||
"kvstore_get",
|
"kvstore_get",
|
||||||
@@ -149,9 +217,9 @@ func newKVStoreGetHostFunction(service KVStoreService) extism.HostFunction {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newKVStoreDeleteHostFunction(service KVStoreService) extism.HostFunction {
|
func newKVStoreGetManyHostFunction(service KVStoreService) extism.HostFunction {
|
||||||
return extism.NewHostFunctionWithStack(
|
return extism.NewHostFunctionWithStack(
|
||||||
"kvstore_delete",
|
"kvstore_getmany",
|
||||||
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||||
// Read JSON request from plugin memory
|
// Read JSON request from plugin memory
|
||||||
reqBytes, err := p.ReadBytes(stack[0])
|
reqBytes, err := p.ReadBytes(stack[0])
|
||||||
@@ -159,20 +227,23 @@ func newKVStoreDeleteHostFunction(service KVStoreService) extism.HostFunction {
|
|||||||
kvstoreWriteError(p, stack, err)
|
kvstoreWriteError(p, stack, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var req KVStoreDeleteRequest
|
var req KVStoreGetManyRequest
|
||||||
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||||
kvstoreWriteError(p, stack, err)
|
kvstoreWriteError(p, stack, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call the service method
|
// Call the service method
|
||||||
if svcErr := service.Delete(ctx, req.Key); svcErr != nil {
|
values, svcErr := service.GetMany(ctx, req.Keys)
|
||||||
|
if svcErr != nil {
|
||||||
kvstoreWriteError(p, stack, svcErr)
|
kvstoreWriteError(p, stack, svcErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write JSON response to plugin memory
|
// Write JSON response to plugin memory
|
||||||
resp := KVStoreDeleteResponse{}
|
resp := KVStoreGetManyResponse{
|
||||||
|
Values: values,
|
||||||
|
}
|
||||||
kvstoreWriteResponse(p, stack, resp)
|
kvstoreWriteResponse(p, stack, resp)
|
||||||
},
|
},
|
||||||
[]extism.ValueType{extism.ValueTypePTR},
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
@@ -248,6 +319,71 @@ func newKVStoreListHostFunction(service KVStoreService) extism.HostFunction {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newKVStoreDeleteHostFunction(service KVStoreService) extism.HostFunction {
|
||||||
|
return extism.NewHostFunctionWithStack(
|
||||||
|
"kvstore_delete",
|
||||||
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||||
|
// Read JSON request from plugin memory
|
||||||
|
reqBytes, err := p.ReadBytes(stack[0])
|
||||||
|
if err != nil {
|
||||||
|
kvstoreWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req KVStoreDeleteRequest
|
||||||
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||||
|
kvstoreWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the service method
|
||||||
|
if svcErr := service.Delete(ctx, req.Key); svcErr != nil {
|
||||||
|
kvstoreWriteError(p, stack, svcErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write JSON response to plugin memory
|
||||||
|
resp := KVStoreDeleteResponse{}
|
||||||
|
kvstoreWriteResponse(p, stack, resp)
|
||||||
|
},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newKVStoreDeleteByPrefixHostFunction(service KVStoreService) extism.HostFunction {
|
||||||
|
return extism.NewHostFunctionWithStack(
|
||||||
|
"kvstore_deletebyprefix",
|
||||||
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||||
|
// Read JSON request from plugin memory
|
||||||
|
reqBytes, err := p.ReadBytes(stack[0])
|
||||||
|
if err != nil {
|
||||||
|
kvstoreWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req KVStoreDeleteByPrefixRequest
|
||||||
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||||
|
kvstoreWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the service method
|
||||||
|
deletedcount, svcErr := service.DeleteByPrefix(ctx, req.Prefix)
|
||||||
|
if svcErr != nil {
|
||||||
|
kvstoreWriteError(p, stack, svcErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write JSON response to plugin memory
|
||||||
|
resp := KVStoreDeleteByPrefixResponse{
|
||||||
|
DeletedCount: deletedcount,
|
||||||
|
}
|
||||||
|
kvstoreWriteResponse(p, stack, resp)
|
||||||
|
},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
func newKVStoreGetStorageUsedHostFunction(service KVStoreService) extism.HostFunction {
|
func newKVStoreGetStorageUsedHostFunction(service KVStoreService) extism.HostFunction {
|
||||||
return extism.NewHostFunctionWithStack(
|
return extism.NewHostFunctionWithStack(
|
||||||
"kvstore_getstorageused",
|
"kvstore_getstorageused",
|
||||||
|
|||||||
73
plugins/host/task.go
Normal file
73
plugins/host/task.go
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
package host
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
// TaskInfo holds the current state of a task.
|
||||||
|
type TaskInfo struct {
|
||||||
|
// Status is the current task status: "pending", "running",
|
||||||
|
// "completed", "failed", or "cancelled".
|
||||||
|
Status string `json:"status"`
|
||||||
|
// Message is the status/result message returned by the plugin callback.
|
||||||
|
Message string `json:"message"`
|
||||||
|
// Attempt is the current or last attempt number (1-based).
|
||||||
|
Attempt int32 `json:"attempt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueConfig holds configuration for a task queue.
|
||||||
|
type QueueConfig struct {
|
||||||
|
// Concurrency is the max number of parallel workers. Default: 1.
|
||||||
|
// Capped by the plugin's manifest maxConcurrency.
|
||||||
|
Concurrency int32 `json:"concurrency"`
|
||||||
|
|
||||||
|
// MaxRetries is the number of times to retry a failed task. Default: 0.
|
||||||
|
MaxRetries int32 `json:"maxRetries"`
|
||||||
|
|
||||||
|
// BackoffMs is the initial backoff between retries in milliseconds.
|
||||||
|
// Doubles each retry (exponential: backoffMs * 2^(attempt-1)). Default: 1000.
|
||||||
|
BackoffMs int64 `json:"backoffMs"`
|
||||||
|
|
||||||
|
// DelayMs is the minimum delay between starting consecutive tasks
|
||||||
|
// in milliseconds. Useful for rate limiting. Default: 0.
|
||||||
|
DelayMs int64 `json:"delayMs"`
|
||||||
|
|
||||||
|
// RetentionMs is how long completed/failed/cancelled tasks are kept
|
||||||
|
// in milliseconds. Default: 3600000 (1h). Min: 60000 (1m). Max: 604800000 (1w).
|
||||||
|
RetentionMs int64 `json:"retentionMs"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskService provides persistent task queues for plugins.
|
||||||
|
//
|
||||||
|
// This service allows plugins to create named queues with configurable concurrency,
|
||||||
|
// retry policies, and rate limiting. Tasks are persisted to SQLite and survive
|
||||||
|
// server restarts. When a task is ready to execute, the host calls the plugin's
|
||||||
|
// nd_task_execute callback function.
|
||||||
|
//
|
||||||
|
//nd:hostservice name=Task permission=taskqueue
|
||||||
|
type TaskService interface {
|
||||||
|
// CreateQueue creates a named task queue with the given configuration.
|
||||||
|
// Zero-value fields in config use sensible defaults.
|
||||||
|
// If a queue with the same name already exists, returns an error.
|
||||||
|
// On startup, this also recovers any stale "running" tasks from a previous crash.
|
||||||
|
//nd:hostfunc
|
||||||
|
CreateQueue(ctx context.Context, name string, config QueueConfig) error
|
||||||
|
|
||||||
|
// Enqueue adds a task to the named queue. Returns the task ID.
|
||||||
|
// payload is opaque bytes passed back to the plugin on execution.
|
||||||
|
//nd:hostfunc
|
||||||
|
Enqueue(ctx context.Context, queueName string, payload []byte) (string, error)
|
||||||
|
|
||||||
|
// Get returns the current state of a task including its status,
|
||||||
|
// message, and attempt count.
|
||||||
|
//nd:hostfunc
|
||||||
|
Get(ctx context.Context, taskID string) (*TaskInfo, error)
|
||||||
|
|
||||||
|
// Cancel cancels a pending task. Returns error if already
|
||||||
|
// running, completed, or failed.
|
||||||
|
//nd:hostfunc
|
||||||
|
Cancel(ctx context.Context, taskID string) error
|
||||||
|
|
||||||
|
// ClearQueue removes all pending tasks from the named queue.
|
||||||
|
// Running tasks are not affected. Returns the number of tasks removed.
|
||||||
|
//nd:hostfunc
|
||||||
|
ClearQueue(ctx context.Context, queueName string) (int64, error)
|
||||||
|
}
|
||||||
266
plugins/host/task_gen.go
Normal file
266
plugins/host/task_gen.go
Normal file
@@ -0,0 +1,266 @@
|
|||||||
|
// Code generated by ndpgen. DO NOT EDIT.
|
||||||
|
|
||||||
|
package host
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
extism "github.com/extism/go-sdk"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TaskCreateQueueRequest is the request type for Task.CreateQueue.
|
||||||
|
type TaskCreateQueueRequest struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Config QueueConfig `json:"config"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskCreateQueueResponse is the response type for Task.CreateQueue.
|
||||||
|
type TaskCreateQueueResponse struct {
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskEnqueueRequest is the request type for Task.Enqueue.
|
||||||
|
type TaskEnqueueRequest struct {
|
||||||
|
QueueName string `json:"queueName"`
|
||||||
|
Payload []byte `json:"payload"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskEnqueueResponse is the response type for Task.Enqueue.
|
||||||
|
type TaskEnqueueResponse struct {
|
||||||
|
Result string `json:"result,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskGetRequest is the request type for Task.Get.
|
||||||
|
type TaskGetRequest struct {
|
||||||
|
TaskID string `json:"taskId"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskGetResponse is the response type for Task.Get.
|
||||||
|
type TaskGetResponse struct {
|
||||||
|
Result *TaskInfo `json:"result,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskCancelRequest is the request type for Task.Cancel.
|
||||||
|
type TaskCancelRequest struct {
|
||||||
|
TaskID string `json:"taskId"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskCancelResponse is the response type for Task.Cancel.
|
||||||
|
type TaskCancelResponse struct {
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskClearQueueRequest is the request type for Task.ClearQueue.
|
||||||
|
type TaskClearQueueRequest struct {
|
||||||
|
QueueName string `json:"queueName"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskClearQueueResponse is the response type for Task.ClearQueue.
|
||||||
|
type TaskClearQueueResponse struct {
|
||||||
|
Result int64 `json:"result,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterTaskHostFunctions registers Task service host functions.
|
||||||
|
// The returned host functions should be added to the plugin's configuration.
|
||||||
|
func RegisterTaskHostFunctions(service TaskService) []extism.HostFunction {
|
||||||
|
return []extism.HostFunction{
|
||||||
|
newTaskCreateQueueHostFunction(service),
|
||||||
|
newTaskEnqueueHostFunction(service),
|
||||||
|
newTaskGetHostFunction(service),
|
||||||
|
newTaskCancelHostFunction(service),
|
||||||
|
newTaskClearQueueHostFunction(service),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTaskCreateQueueHostFunction(service TaskService) extism.HostFunction {
|
||||||
|
return extism.NewHostFunctionWithStack(
|
||||||
|
"task_createqueue",
|
||||||
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||||
|
// Read JSON request from plugin memory
|
||||||
|
reqBytes, err := p.ReadBytes(stack[0])
|
||||||
|
if err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req TaskCreateQueueRequest
|
||||||
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the service method
|
||||||
|
if svcErr := service.CreateQueue(ctx, req.Name, req.Config); svcErr != nil {
|
||||||
|
taskWriteError(p, stack, svcErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write JSON response to plugin memory
|
||||||
|
resp := TaskCreateQueueResponse{}
|
||||||
|
taskWriteResponse(p, stack, resp)
|
||||||
|
},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTaskEnqueueHostFunction(service TaskService) extism.HostFunction {
|
||||||
|
return extism.NewHostFunctionWithStack(
|
||||||
|
"task_enqueue",
|
||||||
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||||
|
// Read JSON request from plugin memory
|
||||||
|
reqBytes, err := p.ReadBytes(stack[0])
|
||||||
|
if err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req TaskEnqueueRequest
|
||||||
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the service method
|
||||||
|
result, svcErr := service.Enqueue(ctx, req.QueueName, req.Payload)
|
||||||
|
if svcErr != nil {
|
||||||
|
taskWriteError(p, stack, svcErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write JSON response to plugin memory
|
||||||
|
resp := TaskEnqueueResponse{
|
||||||
|
Result: result,
|
||||||
|
}
|
||||||
|
taskWriteResponse(p, stack, resp)
|
||||||
|
},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTaskGetHostFunction(service TaskService) extism.HostFunction {
|
||||||
|
return extism.NewHostFunctionWithStack(
|
||||||
|
"task_get",
|
||||||
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||||
|
// Read JSON request from plugin memory
|
||||||
|
reqBytes, err := p.ReadBytes(stack[0])
|
||||||
|
if err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req TaskGetRequest
|
||||||
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the service method
|
||||||
|
result, svcErr := service.Get(ctx, req.TaskID)
|
||||||
|
if svcErr != nil {
|
||||||
|
taskWriteError(p, stack, svcErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write JSON response to plugin memory
|
||||||
|
resp := TaskGetResponse{
|
||||||
|
Result: result,
|
||||||
|
}
|
||||||
|
taskWriteResponse(p, stack, resp)
|
||||||
|
},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTaskCancelHostFunction(service TaskService) extism.HostFunction {
|
||||||
|
return extism.NewHostFunctionWithStack(
|
||||||
|
"task_cancel",
|
||||||
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||||
|
// Read JSON request from plugin memory
|
||||||
|
reqBytes, err := p.ReadBytes(stack[0])
|
||||||
|
if err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req TaskCancelRequest
|
||||||
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the service method
|
||||||
|
if svcErr := service.Cancel(ctx, req.TaskID); svcErr != nil {
|
||||||
|
taskWriteError(p, stack, svcErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write JSON response to plugin memory
|
||||||
|
resp := TaskCancelResponse{}
|
||||||
|
taskWriteResponse(p, stack, resp)
|
||||||
|
},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTaskClearQueueHostFunction(service TaskService) extism.HostFunction {
|
||||||
|
return extism.NewHostFunctionWithStack(
|
||||||
|
"task_clearqueue",
|
||||||
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||||
|
// Read JSON request from plugin memory
|
||||||
|
reqBytes, err := p.ReadBytes(stack[0])
|
||||||
|
if err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req TaskClearQueueRequest
|
||||||
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the service method
|
||||||
|
result, svcErr := service.ClearQueue(ctx, req.QueueName)
|
||||||
|
if svcErr != nil {
|
||||||
|
taskWriteError(p, stack, svcErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write JSON response to plugin memory
|
||||||
|
resp := TaskClearQueueResponse{
|
||||||
|
Result: result,
|
||||||
|
}
|
||||||
|
taskWriteResponse(p, stack, resp)
|
||||||
|
},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// taskWriteResponse writes a JSON response to plugin memory.
|
||||||
|
func taskWriteResponse(p *extism.CurrentPlugin, stack []uint64, resp any) {
|
||||||
|
respBytes, err := json.Marshal(resp)
|
||||||
|
if err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
respPtr, err := p.WriteBytes(respBytes)
|
||||||
|
if err != nil {
|
||||||
|
stack[0] = 0
|
||||||
|
return
|
||||||
|
}
|
||||||
|
stack[0] = respPtr
|
||||||
|
}
|
||||||
|
|
||||||
|
// taskWriteError writes an error response to plugin memory.
|
||||||
|
func taskWriteError(p *extism.CurrentPlugin, stack []uint64, err error) {
|
||||||
|
errResp := struct {
|
||||||
|
Error string `json:"error"`
|
||||||
|
}{Error: err.Error()}
|
||||||
|
respBytes, _ := json.Marshal(errResp)
|
||||||
|
respPtr, _ := p.WriteBytes(respBytes)
|
||||||
|
stack[0] = respPtr
|
||||||
|
}
|
||||||
@@ -7,14 +7,16 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"time"
|
||||||
|
|
||||||
"github.com/dustin/go-humanize"
|
"github.com/dustin/go-humanize"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
"github.com/navidrome/navidrome/conf"
|
"github.com/navidrome/navidrome/conf"
|
||||||
"github.com/navidrome/navidrome/log"
|
"github.com/navidrome/navidrome/log"
|
||||||
"github.com/navidrome/navidrome/plugins/host"
|
"github.com/navidrome/navidrome/plugins/host"
|
||||||
|
"github.com/navidrome/navidrome/utils/slice"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -22,17 +24,22 @@ const (
|
|||||||
maxKeyLength = 256 // Max key length in bytes
|
maxKeyLength = 256 // Max key length in bytes
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// notExpiredFilter is the SQL condition to exclude expired keys.
|
||||||
|
const notExpiredFilter = "(expires_at IS NULL OR expires_at > datetime('now'))"
|
||||||
|
|
||||||
|
const cleanupInterval = 1 * time.Hour
|
||||||
|
|
||||||
// kvstoreServiceImpl implements the host.KVStoreService interface.
|
// kvstoreServiceImpl implements the host.KVStoreService interface.
|
||||||
// Each plugin gets its own SQLite database for isolation.
|
// Each plugin gets its own SQLite database for isolation.
|
||||||
type kvstoreServiceImpl struct {
|
type kvstoreServiceImpl struct {
|
||||||
pluginName string
|
pluginName string
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
maxSize int64
|
maxSize int64
|
||||||
currentSize atomic.Int64 // cached total size, updated on Set/Delete
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// newKVStoreService creates a new kvstoreServiceImpl instance with its own SQLite database.
|
// newKVStoreService creates a new kvstoreServiceImpl instance with its own SQLite database.
|
||||||
func newKVStoreService(pluginName string, perm *KVStorePermission) (*kvstoreServiceImpl, error) {
|
// The provided context controls the lifetime of the background cleanup goroutine.
|
||||||
|
func newKVStoreService(ctx context.Context, pluginName string, perm *KVStorePermission) (*kvstoreServiceImpl, error) {
|
||||||
// Parse max size from permission, default to 1MB
|
// Parse max size from permission, default to 1MB
|
||||||
maxSize := int64(defaultMaxKVStoreSize)
|
maxSize := int64(defaultMaxKVStoreSize)
|
||||||
if perm != nil && perm.MaxSize != nil && *perm.MaxSize != "" {
|
if perm != nil && perm.MaxSize != nil && *perm.MaxSize != "" {
|
||||||
@@ -59,46 +66,69 @@ func newKVStoreService(pluginName string, perm *KVStorePermission) (*kvstoreServ
|
|||||||
db.SetMaxOpenConns(3)
|
db.SetMaxOpenConns(3)
|
||||||
db.SetMaxIdleConns(1)
|
db.SetMaxIdleConns(1)
|
||||||
|
|
||||||
// Create schema
|
// Apply schema migrations
|
||||||
if err := createKVStoreSchema(db); err != nil {
|
if err := createKVStoreSchema(db); err != nil {
|
||||||
db.Close()
|
db.Close()
|
||||||
return nil, fmt.Errorf("creating kvstore schema: %w", err)
|
return nil, fmt.Errorf("migrating kvstore schema: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load current storage size from database
|
log.Debug("Initialized plugin kvstore", "plugin", pluginName, "path", dbPath, "maxSize", humanize.Bytes(uint64(maxSize)))
|
||||||
var currentSize int64
|
|
||||||
if err := db.QueryRow(`SELECT COALESCE(SUM(size), 0) FROM kvstore`).Scan(¤tSize); err != nil {
|
|
||||||
db.Close()
|
|
||||||
return nil, fmt.Errorf("loading storage size: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debug("Initialized plugin kvstore", "plugin", pluginName, "path", dbPath, "maxSize", humanize.Bytes(uint64(maxSize)), "currentSize", humanize.Bytes(uint64(currentSize)))
|
|
||||||
|
|
||||||
svc := &kvstoreServiceImpl{
|
svc := &kvstoreServiceImpl{
|
||||||
pluginName: pluginName,
|
pluginName: pluginName,
|
||||||
db: db,
|
db: db,
|
||||||
maxSize: maxSize,
|
maxSize: maxSize,
|
||||||
}
|
}
|
||||||
svc.currentSize.Store(currentSize)
|
go svc.cleanupLoop(ctx)
|
||||||
return svc, nil
|
return svc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// createKVStoreSchema applies schema migrations to the kvstore database.
|
||||||
|
// New migrations must be appended at the end of the slice.
|
||||||
func createKVStoreSchema(db *sql.DB) error {
|
func createKVStoreSchema(db *sql.DB) error {
|
||||||
_, err := db.Exec(`
|
return migrateDB(db, []string{
|
||||||
CREATE TABLE IF NOT EXISTS kvstore (
|
`CREATE TABLE IF NOT EXISTS kvstore (
|
||||||
key TEXT PRIMARY KEY NOT NULL,
|
key TEXT PRIMARY KEY NOT NULL,
|
||||||
value BLOB NOT NULL,
|
value BLOB NOT NULL,
|
||||||
size INTEGER NOT NULL,
|
size INTEGER NOT NULL,
|
||||||
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||||
)
|
)`,
|
||||||
`)
|
`ALTER TABLE kvstore ADD COLUMN expires_at DATETIME DEFAULT NULL`,
|
||||||
return err
|
`CREATE INDEX idx_kvstore_expires_at ON kvstore(expires_at)`,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set stores a byte value with the given key.
|
// storageUsed returns the current total storage used by non-expired keys.
|
||||||
func (s *kvstoreServiceImpl) Set(ctx context.Context, key string, value []byte) error {
|
func (s *kvstoreServiceImpl) storageUsed(ctx context.Context) (int64, error) {
|
||||||
// Validate key
|
var used int64
|
||||||
|
err := s.db.QueryRowContext(ctx, `SELECT COALESCE(SUM(size), 0) FROM kvstore WHERE `+notExpiredFilter).Scan(&used)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("calculating storage used: %w", err)
|
||||||
|
}
|
||||||
|
return used, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkStorageLimit verifies that adding delta bytes would not exceed the storage limit.
|
||||||
|
func (s *kvstoreServiceImpl) checkStorageLimit(ctx context.Context, delta int64) error {
|
||||||
|
if delta <= 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
used, err := s.storageUsed(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
newTotal := used + delta
|
||||||
|
if newTotal > s.maxSize {
|
||||||
|
return fmt.Errorf("storage limit exceeded: would use %s of %s allowed",
|
||||||
|
humanize.Bytes(uint64(newTotal)), humanize.Bytes(uint64(s.maxSize)))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// setValue is the shared implementation for Set and SetWithTTL.
|
||||||
|
// A ttlSeconds of 0 means no expiration.
|
||||||
|
func (s *kvstoreServiceImpl) setValue(ctx context.Context, key string, value []byte, ttlSeconds int64) error {
|
||||||
if len(key) == 0 {
|
if len(key) == 0 {
|
||||||
return fmt.Errorf("key cannot be empty")
|
return fmt.Errorf("key cannot be empty")
|
||||||
}
|
}
|
||||||
@@ -108,46 +138,59 @@ func (s *kvstoreServiceImpl) Set(ctx context.Context, key string, value []byte)
|
|||||||
|
|
||||||
newValueSize := int64(len(value))
|
newValueSize := int64(len(value))
|
||||||
|
|
||||||
// Get current size of this key (if it exists) to calculate delta
|
// Get current size of this key (if it exists and not expired) to calculate delta
|
||||||
var oldSize int64
|
var oldSize int64
|
||||||
err := s.db.QueryRowContext(ctx, `SELECT COALESCE(size, 0) FROM kvstore WHERE key = ?`, key).Scan(&oldSize)
|
err := s.db.QueryRowContext(ctx, `SELECT COALESCE(size, 0) FROM kvstore WHERE key = ? AND `+notExpiredFilter, key).Scan(&oldSize)
|
||||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||||
return fmt.Errorf("checking existing key: %w", err)
|
return fmt.Errorf("checking existing key: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check size limits using cached total
|
if err := s.checkStorageLimit(ctx, newValueSize-oldSize); err != nil {
|
||||||
delta := newValueSize - oldSize
|
return err
|
||||||
newTotal := s.currentSize.Load() + delta
|
}
|
||||||
if newTotal > s.maxSize {
|
|
||||||
return fmt.Errorf("storage limit exceeded: would use %s of %s allowed",
|
// Compute expires_at: sql.NullString{Valid:false} sends NULL (no expiration),
|
||||||
humanize.Bytes(uint64(newTotal)), humanize.Bytes(uint64(s.maxSize)))
|
// otherwise we send a concrete timestamp.
|
||||||
|
var expiresAt sql.NullString
|
||||||
|
if ttlSeconds > 0 {
|
||||||
|
expiresAt = sql.NullString{String: fmt.Sprintf("+%d seconds", ttlSeconds), Valid: true}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upsert the value
|
|
||||||
_, err = s.db.ExecContext(ctx, `
|
_, err = s.db.ExecContext(ctx, `
|
||||||
INSERT INTO kvstore (key, value, size, created_at, updated_at)
|
INSERT INTO kvstore (key, value, size, created_at, updated_at, expires_at)
|
||||||
VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, datetime('now', ?))
|
||||||
ON CONFLICT(key) DO UPDATE SET
|
ON CONFLICT(key) DO UPDATE SET
|
||||||
value = excluded.value,
|
value = excluded.value,
|
||||||
size = excluded.size,
|
size = excluded.size,
|
||||||
updated_at = CURRENT_TIMESTAMP
|
updated_at = CURRENT_TIMESTAMP,
|
||||||
`, key, value, newValueSize)
|
expires_at = excluded.expires_at
|
||||||
|
`, key, value, newValueSize, expiresAt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("storing value: %w", err)
|
return fmt.Errorf("storing value: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update cached size
|
log.Trace(ctx, "KVStore.Set", "plugin", s.pluginName, "key", key, "size", newValueSize, "ttlSeconds", ttlSeconds)
|
||||||
s.currentSize.Add(delta)
|
|
||||||
|
|
||||||
log.Trace(ctx, "KVStore.Set", "plugin", s.pluginName, "key", key, "size", newValueSize)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set stores a byte value with the given key.
|
||||||
|
func (s *kvstoreServiceImpl) Set(ctx context.Context, key string, value []byte) error {
|
||||||
|
return s.setValue(ctx, key, value, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetWithTTL stores a byte value with the given key and a time-to-live.
|
||||||
|
func (s *kvstoreServiceImpl) SetWithTTL(ctx context.Context, key string, value []byte, ttlSeconds int64) error {
|
||||||
|
if ttlSeconds <= 0 {
|
||||||
|
return fmt.Errorf("ttlSeconds must be greater than 0")
|
||||||
|
}
|
||||||
|
return s.setValue(ctx, key, value, ttlSeconds)
|
||||||
|
}
|
||||||
|
|
||||||
// Get retrieves a byte value from storage.
|
// Get retrieves a byte value from storage.
|
||||||
func (s *kvstoreServiceImpl) Get(ctx context.Context, key string) ([]byte, bool, error) {
|
func (s *kvstoreServiceImpl) Get(ctx context.Context, key string) ([]byte, bool, error) {
|
||||||
var value []byte
|
var value []byte
|
||||||
err := s.db.QueryRowContext(ctx, `SELECT value FROM kvstore WHERE key = ?`, key).Scan(&value)
|
err := s.db.QueryRowContext(ctx, `SELECT value FROM kvstore WHERE key = ? AND `+notExpiredFilter, key).Scan(&value)
|
||||||
if err == sql.ErrNoRows {
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -160,25 +203,11 @@ func (s *kvstoreServiceImpl) Get(ctx context.Context, key string) ([]byte, bool,
|
|||||||
|
|
||||||
// Delete removes a value from storage.
|
// Delete removes a value from storage.
|
||||||
func (s *kvstoreServiceImpl) Delete(ctx context.Context, key string) error {
|
func (s *kvstoreServiceImpl) Delete(ctx context.Context, key string) error {
|
||||||
// Get size of the key being deleted to update cache
|
_, err := s.db.ExecContext(ctx, `DELETE FROM kvstore WHERE key = ?`, key)
|
||||||
var oldSize int64
|
|
||||||
err := s.db.QueryRowContext(ctx, `SELECT size FROM kvstore WHERE key = ?`, key).Scan(&oldSize)
|
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
|
||||||
// Key doesn't exist, nothing to delete
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("checking key size: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = s.db.ExecContext(ctx, `DELETE FROM kvstore WHERE key = ?`, key)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("deleting value: %w", err)
|
return fmt.Errorf("deleting value: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update cached size
|
|
||||||
s.currentSize.Add(-oldSize)
|
|
||||||
|
|
||||||
log.Trace(ctx, "KVStore.Delete", "plugin", s.pluginName, "key", key)
|
log.Trace(ctx, "KVStore.Delete", "plugin", s.pluginName, "key", key)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -186,7 +215,7 @@ func (s *kvstoreServiceImpl) Delete(ctx context.Context, key string) error {
|
|||||||
// Has checks if a key exists in storage.
|
// Has checks if a key exists in storage.
|
||||||
func (s *kvstoreServiceImpl) Has(ctx context.Context, key string) (bool, error) {
|
func (s *kvstoreServiceImpl) Has(ctx context.Context, key string) (bool, error) {
|
||||||
var count int
|
var count int
|
||||||
err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM kvstore WHERE key = ?`, key).Scan(&count)
|
err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM kvstore WHERE key = ? AND `+notExpiredFilter, key).Scan(&count)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("checking key: %w", err)
|
return false, fmt.Errorf("checking key: %w", err)
|
||||||
}
|
}
|
||||||
@@ -200,12 +229,12 @@ func (s *kvstoreServiceImpl) List(ctx context.Context, prefix string) ([]string,
|
|||||||
var err error
|
var err error
|
||||||
|
|
||||||
if prefix == "" {
|
if prefix == "" {
|
||||||
rows, err = s.db.QueryContext(ctx, `SELECT key FROM kvstore ORDER BY key`)
|
rows, err = s.db.QueryContext(ctx, `SELECT key FROM kvstore WHERE `+notExpiredFilter+` ORDER BY key`)
|
||||||
} else {
|
} else {
|
||||||
// Escape special LIKE characters in prefix
|
// Escape special LIKE characters in prefix
|
||||||
escapedPrefix := strings.ReplaceAll(prefix, "%", "\\%")
|
escapedPrefix := strings.ReplaceAll(prefix, "%", "\\%")
|
||||||
escapedPrefix = strings.ReplaceAll(escapedPrefix, "_", "\\_")
|
escapedPrefix = strings.ReplaceAll(escapedPrefix, "_", "\\_")
|
||||||
rows, err = s.db.QueryContext(ctx, `SELECT key FROM kvstore WHERE key LIKE ? ESCAPE '\' ORDER BY key`, escapedPrefix+"%")
|
rows, err = s.db.QueryContext(ctx, `SELECT key FROM kvstore WHERE key LIKE ? ESCAPE '\' AND `+notExpiredFilter+` ORDER BY key`, escapedPrefix+"%")
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("listing keys: %w", err)
|
return nil, fmt.Errorf("listing keys: %w", err)
|
||||||
@@ -231,16 +260,113 @@ func (s *kvstoreServiceImpl) List(ctx context.Context, prefix string) ([]string,
|
|||||||
|
|
||||||
// GetStorageUsed returns the total storage used by this plugin in bytes.
|
// GetStorageUsed returns the total storage used by this plugin in bytes.
|
||||||
func (s *kvstoreServiceImpl) GetStorageUsed(ctx context.Context) (int64, error) {
|
func (s *kvstoreServiceImpl) GetStorageUsed(ctx context.Context) (int64, error) {
|
||||||
used := s.currentSize.Load()
|
used, err := s.storageUsed(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
log.Trace(ctx, "KVStore.GetStorageUsed", "plugin", s.pluginName, "bytes", used)
|
log.Trace(ctx, "KVStore.GetStorageUsed", "plugin", s.pluginName, "bytes", used)
|
||||||
return used, nil
|
return used, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the SQLite database connection.
|
// DeleteByPrefix removes all keys matching the given prefix.
|
||||||
// This is called when the plugin is unloaded.
|
func (s *kvstoreServiceImpl) DeleteByPrefix(ctx context.Context, prefix string) (int64, error) {
|
||||||
|
if prefix == "" {
|
||||||
|
return 0, fmt.Errorf("prefix cannot be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
escapedPrefix := strings.ReplaceAll(prefix, "%", "\\%")
|
||||||
|
escapedPrefix = strings.ReplaceAll(escapedPrefix, "_", "\\_")
|
||||||
|
result, err := s.db.ExecContext(ctx, `DELETE FROM kvstore WHERE key LIKE ? ESCAPE '\'`, escapedPrefix+"%")
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("deleting keys: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
count, err := result.RowsAffected()
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("getting deleted count: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Trace(ctx, "KVStore.DeleteByPrefix", "plugin", s.pluginName, "prefix", prefix, "deletedCount", count)
|
||||||
|
return count, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetMany retrieves multiple values in a single call, processing keys in batches.
|
||||||
|
func (s *kvstoreServiceImpl) GetMany(ctx context.Context, keys []string) (map[string][]byte, error) {
|
||||||
|
if len(keys) == 0 {
|
||||||
|
return map[string][]byte{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const batchSize = 200
|
||||||
|
result := make(map[string][]byte)
|
||||||
|
for chunk := range slice.CollectChunks(slices.Values(keys), batchSize) {
|
||||||
|
placeholders := make([]string, len(chunk))
|
||||||
|
args := make([]any, len(chunk))
|
||||||
|
for i, key := range chunk {
|
||||||
|
placeholders[i] = "?"
|
||||||
|
args[i] = key
|
||||||
|
}
|
||||||
|
|
||||||
|
query := `SELECT key, value FROM kvstore WHERE key IN (` + strings.Join(placeholders, ",") + `) AND ` + notExpiredFilter //nolint:gosec // placeholders are always "?"
|
||||||
|
rows, err := s.db.QueryContext(ctx, query, args...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("querying values: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
var key string
|
||||||
|
var value []byte
|
||||||
|
if err := rows.Scan(&key, &value); err != nil {
|
||||||
|
rows.Close()
|
||||||
|
return nil, fmt.Errorf("scanning value: %w", err)
|
||||||
|
}
|
||||||
|
result[key] = value
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
rows.Close()
|
||||||
|
return nil, fmt.Errorf("iterating values: %w", err)
|
||||||
|
}
|
||||||
|
rows.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Trace(ctx, "KVStore.GetMany", "plugin", s.pluginName, "requested", len(keys), "found", len(result))
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanupLoop periodically removes expired keys from the database.
|
||||||
|
// It stops when the provided context is cancelled.
|
||||||
|
func (s *kvstoreServiceImpl) cleanupLoop(ctx context.Context) {
|
||||||
|
ticker := time.NewTicker(cleanupInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
s.cleanupExpired(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanupExpired removes all expired keys from the database to reclaim disk space.
|
||||||
|
func (s *kvstoreServiceImpl) cleanupExpired(ctx context.Context) {
|
||||||
|
result, err := s.db.ExecContext(ctx, `DELETE FROM kvstore WHERE expires_at IS NOT NULL AND expires_at <= datetime('now')`)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(ctx, "KVStore cleanup: failed to delete expired keys", "plugin", s.pluginName, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if count, err := result.RowsAffected(); err == nil && count > 0 {
|
||||||
|
log.Debug("KVStore cleanup completed", "plugin", s.pluginName, "deletedKeys", count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close runs a final cleanup and closes the SQLite database connection.
|
||||||
|
// The cleanup goroutine is stopped by the context passed to newKVStoreService.
|
||||||
func (s *kvstoreServiceImpl) Close() error {
|
func (s *kvstoreServiceImpl) Close() error {
|
||||||
if s.db != nil {
|
if s.db != nil {
|
||||||
log.Debug("Closing plugin kvstore", "plugin", s.pluginName)
|
log.Debug("Closing plugin kvstore", "plugin", s.pluginName)
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
s.cleanupExpired(ctx)
|
||||||
return s.db.Close()
|
return s.db.Close()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/navidrome/navidrome/conf"
|
"github.com/navidrome/navidrome/conf"
|
||||||
"github.com/navidrome/navidrome/conf/configtest"
|
"github.com/navidrome/navidrome/conf/configtest"
|
||||||
@@ -37,7 +38,7 @@ var _ = Describe("KVStoreService", func() {
|
|||||||
|
|
||||||
// Create service with 1KB limit for testing
|
// Create service with 1KB limit for testing
|
||||||
maxSize := "1KB"
|
maxSize := "1KB"
|
||||||
service, err = newKVStoreService("test_plugin", &KVStorePermission{MaxSize: &maxSize})
|
service, err = newKVStoreService(ctx, "test_plugin", &KVStorePermission{MaxSize: &maxSize})
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -253,7 +254,7 @@ var _ = Describe("KVStoreService", func() {
|
|||||||
Expect(service.Close()).To(Succeed())
|
Expect(service.Close()).To(Succeed())
|
||||||
|
|
||||||
maxSize := "1KB"
|
maxSize := "1KB"
|
||||||
service2, err := newKVStoreService("test_plugin", &KVStorePermission{MaxSize: &maxSize})
|
service2, err := newKVStoreService(ctx, "test_plugin", &KVStorePermission{MaxSize: &maxSize})
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
defer service2.Close()
|
defer service2.Close()
|
||||||
|
|
||||||
@@ -302,7 +303,7 @@ var _ = Describe("KVStoreService", func() {
|
|||||||
|
|
||||||
Describe("Plugin Isolation", func() {
|
Describe("Plugin Isolation", func() {
|
||||||
It("isolates data between plugins", func() {
|
It("isolates data between plugins", func() {
|
||||||
service2, err := newKVStoreService("other_plugin", &KVStorePermission{})
|
service2, err := newKVStoreService(ctx, "other_plugin", &KVStorePermission{})
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
defer service2.Close()
|
defer service2.Close()
|
||||||
|
|
||||||
@@ -321,7 +322,7 @@ var _ = Describe("KVStoreService", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
It("creates separate database files per plugin", func() {
|
It("creates separate database files per plugin", func() {
|
||||||
service2, err := newKVStoreService("other_plugin", &KVStorePermission{})
|
service2, err := newKVStoreService(ctx, "other_plugin", &KVStorePermission{})
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
defer service2.Close()
|
defer service2.Close()
|
||||||
|
|
||||||
@@ -343,6 +344,309 @@ var _ = Describe("KVStoreService", func() {
|
|||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Describe("TTL Expiration", func() {
|
||||||
|
It("Get returns not-exists for expired keys", func() {
|
||||||
|
_, err := service.db.Exec(`
|
||||||
|
INSERT INTO kvstore (key, value, size, expires_at)
|
||||||
|
VALUES ('expired_key', 'old', 3, datetime('now', '-1 seconds'))
|
||||||
|
`)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
value, exists, err := service.Get(ctx, "expired_key")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(exists).To(BeFalse())
|
||||||
|
Expect(value).To(BeNil())
|
||||||
|
})
|
||||||
|
It("Has returns false for expired keys", func() {
|
||||||
|
_, err := service.db.Exec(`
|
||||||
|
INSERT INTO kvstore (key, value, size, expires_at)
|
||||||
|
VALUES ('expired_has', 'old', 3, datetime('now', '-1 seconds'))
|
||||||
|
`)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
exists, err := service.Has(ctx, "expired_has")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(exists).To(BeFalse())
|
||||||
|
})
|
||||||
|
It("List excludes expired keys", func() {
|
||||||
|
Expect(service.Set(ctx, "live:1", []byte("alive"))).To(Succeed())
|
||||||
|
_, err := service.db.Exec(`
|
||||||
|
INSERT INTO kvstore (key, value, size, expires_at)
|
||||||
|
VALUES ('live:expired', 'dead', 4, datetime('now', '-1 seconds'))
|
||||||
|
`)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
keys, err := service.List(ctx, "live:")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(keys).To(HaveLen(1))
|
||||||
|
Expect(keys).To(ContainElement("live:1"))
|
||||||
|
})
|
||||||
|
It("Get returns value for non-expired keys with TTL", func() {
|
||||||
|
_, err := service.db.Exec(`
|
||||||
|
INSERT INTO kvstore (key, value, size, expires_at)
|
||||||
|
VALUES ('future_key', 'still alive', 11, datetime('now', '+3600 seconds'))
|
||||||
|
`)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
value, exists, err := service.Get(ctx, "future_key")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(exists).To(BeTrue())
|
||||||
|
Expect(value).To(Equal([]byte("still alive")))
|
||||||
|
})
|
||||||
|
It("Set clears expires_at from a key previously set with TTL", func() {
|
||||||
|
// Insert a key with a TTL that has already expired
|
||||||
|
_, err := service.db.Exec(`
|
||||||
|
INSERT INTO kvstore (key, value, size, expires_at)
|
||||||
|
VALUES ('ttl_then_set', 'temp', 4, datetime('now', '-1 seconds'))
|
||||||
|
`)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
// Overwrite with Set (no TTL) — should become permanent
|
||||||
|
err = service.Set(ctx, "ttl_then_set", []byte("permanent"))
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
// Should exist because Set cleared expires_at
|
||||||
|
value, exists, err := service.Get(ctx, "ttl_then_set")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(exists).To(BeTrue())
|
||||||
|
Expect(value).To(Equal([]byte("permanent")))
|
||||||
|
|
||||||
|
// Verify expires_at is actually NULL
|
||||||
|
var expiresAt *string
|
||||||
|
Expect(service.db.QueryRow(`SELECT expires_at FROM kvstore WHERE key = 'ttl_then_set'`).Scan(&expiresAt)).To(Succeed())
|
||||||
|
Expect(expiresAt).To(BeNil())
|
||||||
|
})
|
||||||
|
It("expired keys are not counted in storage used", func() {
|
||||||
|
_, err := service.db.Exec(`
|
||||||
|
INSERT INTO kvstore (key, value, size, expires_at)
|
||||||
|
VALUES ('expired_key', '12345', 5, datetime('now', '-1 seconds'))
|
||||||
|
`)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
// Expired keys should not be counted
|
||||||
|
used, err := service.GetStorageUsed(ctx)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(used).To(Equal(int64(0)))
|
||||||
|
})
|
||||||
|
It("cleanup removes expired rows from disk", func() {
|
||||||
|
_, err := service.db.Exec(`
|
||||||
|
INSERT INTO kvstore (key, value, size, expires_at)
|
||||||
|
VALUES ('cleanup_me', '12345', 5, datetime('now', '-1 seconds'))
|
||||||
|
`)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
// Row exists in DB but is logically expired
|
||||||
|
var count int
|
||||||
|
Expect(service.db.QueryRow(`SELECT COUNT(*) FROM kvstore`).Scan(&count)).To(Succeed())
|
||||||
|
Expect(count).To(Equal(1))
|
||||||
|
|
||||||
|
service.cleanupExpired(ctx)
|
||||||
|
|
||||||
|
// Row should be physically deleted
|
||||||
|
Expect(service.db.QueryRow(`SELECT COUNT(*) FROM kvstore`).Scan(&count)).To(Succeed())
|
||||||
|
Expect(count).To(Equal(0))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Describe("SetWithTTL", func() {
|
||||||
|
It("stores value that is retrievable before expiry", func() {
|
||||||
|
err := service.SetWithTTL(ctx, "ttl_key", []byte("ttl_value"), 3600)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
value, exists, err := service.Get(ctx, "ttl_key")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(exists).To(BeTrue())
|
||||||
|
Expect(value).To(Equal([]byte("ttl_value")))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("value is not retrievable after expiry", func() {
|
||||||
|
// Insert a key with an already-expired TTL
|
||||||
|
_, err := service.db.Exec(`
|
||||||
|
INSERT INTO kvstore (key, value, size, expires_at)
|
||||||
|
VALUES ('short_ttl', 'gone_soon', 9, datetime('now', '-1 seconds'))
|
||||||
|
`)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
_, exists, err := service.Get(ctx, "short_ttl")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(exists).To(BeFalse())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("rejects ttlSeconds <= 0", func() {
|
||||||
|
err := service.SetWithTTL(ctx, "bad_ttl", []byte("value"), 0)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("ttlSeconds must be greater than 0"))
|
||||||
|
|
||||||
|
err = service.SetWithTTL(ctx, "bad_ttl", []byte("value"), -5)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("ttlSeconds must be greater than 0"))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("validates key same as Set", func() {
|
||||||
|
err := service.SetWithTTL(ctx, "", []byte("value"), 60)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("key cannot be empty"))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("enforces size limits same as Set", func() {
|
||||||
|
bigValue := make([]byte, 2048)
|
||||||
|
err := service.SetWithTTL(ctx, "big_ttl", bigValue, 60)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("storage limit exceeded"))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("overwrites existing key and updates TTL", func() {
|
||||||
|
// Insert a key with an already-expired TTL
|
||||||
|
_, err := service.db.Exec(`
|
||||||
|
INSERT INTO kvstore (key, value, size, expires_at)
|
||||||
|
VALUES ('overwrite_ttl', 'first', 5, datetime('now', '-1 seconds'))
|
||||||
|
`)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
// Overwrite with a long TTL — should be retrievable
|
||||||
|
err = service.SetWithTTL(ctx, "overwrite_ttl", []byte("second"), 3600)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
value, exists, err := service.Get(ctx, "overwrite_ttl")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(exists).To(BeTrue())
|
||||||
|
Expect(value).To(Equal([]byte("second")))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("tracks storage correctly", func() {
|
||||||
|
err := service.SetWithTTL(ctx, "sized_ttl", []byte("12345"), 3600)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
used, err := service.GetStorageUsed(ctx)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(used).To(Equal(int64(5)))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Describe("DeleteByPrefix", func() {
|
||||||
|
BeforeEach(func() {
|
||||||
|
Expect(service.Set(ctx, "cache:user:1", []byte("Alice"))).To(Succeed())
|
||||||
|
Expect(service.Set(ctx, "cache:user:2", []byte("Bob"))).To(Succeed())
|
||||||
|
Expect(service.Set(ctx, "cache:item:1", []byte("Widget"))).To(Succeed())
|
||||||
|
Expect(service.Set(ctx, "data:important", []byte("keep"))).To(Succeed())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("deletes all keys with the given prefix", func() {
|
||||||
|
deleted, err := service.DeleteByPrefix(ctx, "cache:user:")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(deleted).To(Equal(int64(2)))
|
||||||
|
|
||||||
|
keys, err := service.List(ctx, "")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(keys).To(HaveLen(2))
|
||||||
|
Expect(keys).To(ContainElements("cache:item:1", "data:important"))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("rejects empty prefix", func() {
|
||||||
|
_, err := service.DeleteByPrefix(ctx, "")
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("prefix cannot be empty"))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("returns 0 when no keys match", func() {
|
||||||
|
deleted, err := service.DeleteByPrefix(ctx, "nonexistent:")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(deleted).To(Equal(int64(0)))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("updates storage size correctly", func() {
|
||||||
|
usedBefore, _ := service.GetStorageUsed(ctx)
|
||||||
|
Expect(usedBefore).To(BeNumerically(">", 0))
|
||||||
|
|
||||||
|
_, err := service.DeleteByPrefix(ctx, "cache:")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
usedAfter, _ := service.GetStorageUsed(ctx)
|
||||||
|
Expect(usedAfter).To(Equal(int64(4)))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("handles special LIKE characters in prefix", func() {
|
||||||
|
Expect(service.Set(ctx, "test%special", []byte("v1"))).To(Succeed())
|
||||||
|
Expect(service.Set(ctx, "test_special", []byte("v2"))).To(Succeed())
|
||||||
|
Expect(service.Set(ctx, "testXspecial", []byte("v3"))).To(Succeed())
|
||||||
|
|
||||||
|
deleted, err := service.DeleteByPrefix(ctx, "test%")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(deleted).To(Equal(int64(1)))
|
||||||
|
|
||||||
|
exists, _ := service.Has(ctx, "test_special")
|
||||||
|
Expect(exists).To(BeTrue())
|
||||||
|
exists, _ = service.Has(ctx, "testXspecial")
|
||||||
|
Expect(exists).To(BeTrue())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("also deletes expired keys matching prefix", func() {
|
||||||
|
_, err := service.db.Exec(`
|
||||||
|
INSERT INTO kvstore (key, value, size, expires_at)
|
||||||
|
VALUES ('cache:expired', 'old', 3, datetime('now', '-1 seconds'))
|
||||||
|
`)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
deleted, err := service.DeleteByPrefix(ctx, "cache:")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(deleted).To(Equal(int64(4)))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Describe("GetMany", func() {
|
||||||
|
BeforeEach(func() {
|
||||||
|
Expect(service.Set(ctx, "key1", []byte("value1"))).To(Succeed())
|
||||||
|
Expect(service.Set(ctx, "key2", []byte("value2"))).To(Succeed())
|
||||||
|
Expect(service.Set(ctx, "key3", []byte("value3"))).To(Succeed())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("retrieves multiple values at once", func() {
|
||||||
|
values, err := service.GetMany(ctx, []string{"key1", "key2", "key3"})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(values).To(HaveLen(3))
|
||||||
|
Expect(values["key1"]).To(Equal([]byte("value1")))
|
||||||
|
Expect(values["key2"]).To(Equal([]byte("value2")))
|
||||||
|
Expect(values["key3"]).To(Equal([]byte("value3")))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("omits missing keys from result", func() {
|
||||||
|
values, err := service.GetMany(ctx, []string{"key1", "missing", "key3"})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(values).To(HaveLen(2))
|
||||||
|
Expect(values["key1"]).To(Equal([]byte("value1")))
|
||||||
|
Expect(values["key3"]).To(Equal([]byte("value3")))
|
||||||
|
_, hasMissing := values["missing"]
|
||||||
|
Expect(hasMissing).To(BeFalse())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("returns empty map for empty keys slice", func() {
|
||||||
|
values, err := service.GetMany(ctx, []string{})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(values).To(BeEmpty())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("returns empty map for nil keys slice", func() {
|
||||||
|
values, err := service.GetMany(ctx, nil)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(values).To(BeEmpty())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("excludes expired keys", func() {
|
||||||
|
_, err := service.db.Exec(`
|
||||||
|
INSERT INTO kvstore (key, value, size, expires_at)
|
||||||
|
VALUES ('expired_many', 'old', 3, datetime('now', '-1 seconds'))
|
||||||
|
`)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
values, err := service.GetMany(ctx, []string{"key1", "expired_many"})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(values).To(HaveLen(1))
|
||||||
|
Expect(values["key1"]).To(Equal([]byte("value1")))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("handles all keys missing", func() {
|
||||||
|
values, err := service.GetMany(ctx, []string{"nope1", "nope2"})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(values).To(BeEmpty())
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
var _ = Describe("KVStoreService Integration", Ordered, func() {
|
var _ = Describe("KVStoreService Integration", Ordered, func() {
|
||||||
@@ -416,17 +720,21 @@ var _ = Describe("KVStoreService Integration", Ordered, func() {
|
|||||||
|
|
||||||
Describe("KVStore Operations via Plugin", func() {
|
Describe("KVStore Operations via Plugin", func() {
|
||||||
type testKVStoreInput struct {
|
type testKVStoreInput struct {
|
||||||
Operation string `json:"operation"`
|
Operation string `json:"operation"`
|
||||||
Key string `json:"key"`
|
Key string `json:"key"`
|
||||||
Value []byte `json:"value,omitempty"`
|
Value []byte `json:"value,omitempty"`
|
||||||
Prefix string `json:"prefix,omitempty"`
|
Prefix string `json:"prefix,omitempty"`
|
||||||
|
TTLSeconds int64 `json:"ttl_seconds,omitempty"`
|
||||||
|
Keys []string `json:"keys,omitempty"`
|
||||||
}
|
}
|
||||||
type testKVStoreOutput struct {
|
type testKVStoreOutput struct {
|
||||||
Value []byte `json:"value,omitempty"`
|
Value []byte `json:"value,omitempty"`
|
||||||
Exists bool `json:"exists,omitempty"`
|
Values map[string][]byte `json:"values,omitempty"`
|
||||||
Keys []string `json:"keys,omitempty"`
|
Exists bool `json:"exists,omitempty"`
|
||||||
StorageUsed int64 `json:"storage_used,omitempty"`
|
Keys []string `json:"keys,omitempty"`
|
||||||
Error *string `json:"error,omitempty"`
|
StorageUsed int64 `json:"storage_used,omitempty"`
|
||||||
|
DeletedCount int64 `json:"deleted_count,omitempty"`
|
||||||
|
Error *string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
callTestKVStore := func(ctx context.Context, input testKVStoreInput) (*testKVStoreOutput, error) {
|
callTestKVStore := func(ctx context.Context, input testKVStoreInput) (*testKVStoreOutput, error) {
|
||||||
@@ -594,6 +902,107 @@ var _ = Describe("KVStoreService Integration", Ordered, func() {
|
|||||||
Expect(output.Exists).To(BeTrue())
|
Expect(output.Exists).To(BeTrue())
|
||||||
Expect(output.Value).To(Equal(binaryData))
|
Expect(output.Value).To(Equal(binaryData))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should set value with TTL and expire it", func() {
|
||||||
|
ctx := GinkgoT().Context()
|
||||||
|
|
||||||
|
// Set value with 1 second TTL
|
||||||
|
_, err := callTestKVStore(ctx, testKVStoreInput{
|
||||||
|
Operation: "set_with_ttl",
|
||||||
|
Key: "ttl_key",
|
||||||
|
Value: []byte("temporary"),
|
||||||
|
TTLSeconds: 1,
|
||||||
|
})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
// Immediately should exist
|
||||||
|
output, err := callTestKVStore(ctx, testKVStoreInput{
|
||||||
|
Operation: "get",
|
||||||
|
Key: "ttl_key",
|
||||||
|
})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(output.Exists).To(BeTrue())
|
||||||
|
Expect(output.Value).To(Equal([]byte("temporary")))
|
||||||
|
|
||||||
|
// Wait for expiration
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
// Should no longer exist
|
||||||
|
output, err = callTestKVStore(ctx, testKVStoreInput{
|
||||||
|
Operation: "get",
|
||||||
|
Key: "ttl_key",
|
||||||
|
})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(output.Exists).To(BeFalse())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should delete keys by prefix", func() {
|
||||||
|
ctx := GinkgoT().Context()
|
||||||
|
|
||||||
|
// Set multiple keys with shared prefix
|
||||||
|
for _, key := range []string{"del_prefix:a", "del_prefix:b", "keep:c"} {
|
||||||
|
_, err := callTestKVStore(ctx, testKVStoreInput{
|
||||||
|
Operation: "set",
|
||||||
|
Key: key,
|
||||||
|
Value: []byte("value"),
|
||||||
|
})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete by prefix
|
||||||
|
output, err := callTestKVStore(ctx, testKVStoreInput{
|
||||||
|
Operation: "delete_by_prefix",
|
||||||
|
Prefix: "del_prefix:",
|
||||||
|
})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(output.DeletedCount).To(Equal(int64(2)))
|
||||||
|
|
||||||
|
// Verify remaining key
|
||||||
|
getOutput, err := callTestKVStore(ctx, testKVStoreInput{
|
||||||
|
Operation: "has",
|
||||||
|
Key: "keep:c",
|
||||||
|
})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(getOutput.Exists).To(BeTrue())
|
||||||
|
|
||||||
|
// Verify deleted keys are gone
|
||||||
|
getOutput, err = callTestKVStore(ctx, testKVStoreInput{
|
||||||
|
Operation: "has",
|
||||||
|
Key: "del_prefix:a",
|
||||||
|
})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(getOutput.Exists).To(BeFalse())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should get many values at once", func() {
|
||||||
|
ctx := GinkgoT().Context()
|
||||||
|
|
||||||
|
// Set multiple keys
|
||||||
|
for _, kv := range []struct{ k, v string }{
|
||||||
|
{"many:1", "val1"},
|
||||||
|
{"many:2", "val2"},
|
||||||
|
{"many:3", "val3"},
|
||||||
|
} {
|
||||||
|
_, err := callTestKVStore(ctx, testKVStoreInput{
|
||||||
|
Operation: "set",
|
||||||
|
Key: kv.k,
|
||||||
|
Value: []byte(kv.v),
|
||||||
|
})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get many, including a missing key
|
||||||
|
output, err := callTestKVStore(ctx, testKVStoreInput{
|
||||||
|
Operation: "get_many",
|
||||||
|
Keys: []string{"many:1", "many:3", "many:missing"},
|
||||||
|
})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(output.Values).To(HaveLen(2))
|
||||||
|
Expect(output.Values["many:1"]).To(Equal([]byte("val1")))
|
||||||
|
Expect(output.Values["many:3"]).To(Equal([]byte("val3")))
|
||||||
|
_, hasMissing := output.Values["many:missing"]
|
||||||
|
Expect(hasMissing).To(BeFalse())
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
Describe("Database Isolation", func() {
|
Describe("Database Isolation", func() {
|
||||||
|
|||||||
595
plugins/host_taskqueue.go
Normal file
595
plugins/host_taskqueue.go
Normal file
@@ -0,0 +1,595 @@
|
|||||||
|
package plugins
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
_ "github.com/mattn/go-sqlite3"
|
||||||
|
"github.com/navidrome/navidrome/conf"
|
||||||
|
"github.com/navidrome/navidrome/log"
|
||||||
|
"github.com/navidrome/navidrome/model/id"
|
||||||
|
"github.com/navidrome/navidrome/plugins/capabilities"
|
||||||
|
"github.com/navidrome/navidrome/plugins/host"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultConcurrency int32 = 1
|
||||||
|
defaultBackoffMs int64 = 1000
|
||||||
|
defaultRetentionMs int64 = 3_600_000 // 1 hour
|
||||||
|
minRetentionMs int64 = 60_000 // 1 minute
|
||||||
|
maxRetentionMs int64 = 604_800_000 // 1 week
|
||||||
|
maxQueueNameLength = 128
|
||||||
|
maxPayloadSize = 1 * 1024 * 1024 // 1MB
|
||||||
|
maxBackoffMs int64 = 3_600_000 // 1 hour
|
||||||
|
taskCleanupInterval = 5 * time.Minute
|
||||||
|
pollInterval = 5 * time.Second
|
||||||
|
shutdownTimeout = 10 * time.Second
|
||||||
|
|
||||||
|
taskStatusPending = "pending"
|
||||||
|
taskStatusRunning = "running"
|
||||||
|
taskStatusCompleted = "completed"
|
||||||
|
taskStatusFailed = "failed"
|
||||||
|
taskStatusCancelled = "cancelled"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CapabilityTaskWorker indicates the plugin can receive task execution callbacks.
|
||||||
|
const CapabilityTaskWorker Capability = "TaskWorker"
|
||||||
|
|
||||||
|
const FuncTaskWorkerCallback = "nd_task_execute"
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
registerCapability(CapabilityTaskWorker, FuncTaskWorkerCallback)
|
||||||
|
}
|
||||||
|
|
||||||
|
type queueState struct {
|
||||||
|
config host.QueueConfig
|
||||||
|
signal chan struct{}
|
||||||
|
limiter *rate.Limiter
|
||||||
|
}
|
||||||
|
|
||||||
|
// notifyWorkers sends a non-blocking signal to wake up queue workers.
|
||||||
|
func (qs *queueState) notifyWorkers() {
|
||||||
|
select {
|
||||||
|
case qs.signal <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// taskQueueServiceImpl implements host.TaskQueueService with SQLite persistence
|
||||||
|
// and background worker goroutines for task execution.
|
||||||
|
type taskQueueServiceImpl struct {
|
||||||
|
pluginName string
|
||||||
|
manager *Manager
|
||||||
|
maxConcurrency int32
|
||||||
|
db *sql.DB
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
wg sync.WaitGroup
|
||||||
|
mu sync.Mutex
|
||||||
|
queues map[string]*queueState
|
||||||
|
|
||||||
|
// For testing: override how callbacks are invoked
|
||||||
|
invokeCallbackFn func(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newTaskQueueService creates a new taskQueueServiceImpl with its own SQLite database.
|
||||||
|
func newTaskQueueService(pluginName string, manager *Manager, maxConcurrency int32) (*taskQueueServiceImpl, error) {
|
||||||
|
dataDir := filepath.Join(conf.Server.DataFolder, "plugins", pluginName)
|
||||||
|
if err := os.MkdirAll(dataDir, 0700); err != nil {
|
||||||
|
return nil, fmt.Errorf("creating plugin data directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dbPath := filepath.Join(dataDir, "taskqueue.db")
|
||||||
|
db, err := sql.Open("sqlite3", dbPath+"?_busy_timeout=5000&_journal_mode=WAL&_foreign_keys=off")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("opening taskqueue database: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
db.SetMaxOpenConns(3)
|
||||||
|
db.SetMaxIdleConns(1)
|
||||||
|
|
||||||
|
if err := createTaskQueueSchema(db); err != nil {
|
||||||
|
db.Close()
|
||||||
|
return nil, fmt.Errorf("creating taskqueue schema: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(manager.ctx)
|
||||||
|
|
||||||
|
s := &taskQueueServiceImpl{
|
||||||
|
pluginName: pluginName,
|
||||||
|
manager: manager,
|
||||||
|
maxConcurrency: maxConcurrency,
|
||||||
|
db: db,
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
queues: make(map[string]*queueState),
|
||||||
|
}
|
||||||
|
s.invokeCallbackFn = s.defaultInvokeCallback
|
||||||
|
|
||||||
|
s.wg.Go(s.cleanupLoop)
|
||||||
|
|
||||||
|
log.Debug("Initialized plugin taskqueue", "plugin", pluginName, "path", dbPath, "maxConcurrency", maxConcurrency)
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// createTaskQueueSchema applies schema migrations to the taskqueue database.
|
||||||
|
// New migrations must be appended at the end of the slice.
|
||||||
|
func createTaskQueueSchema(db *sql.DB) error {
|
||||||
|
return migrateDB(db, []string{
|
||||||
|
`CREATE TABLE IF NOT EXISTS queues (
|
||||||
|
name TEXT PRIMARY KEY,
|
||||||
|
concurrency INTEGER NOT NULL DEFAULT 1,
|
||||||
|
max_retries INTEGER NOT NULL DEFAULT 0,
|
||||||
|
backoff_ms INTEGER NOT NULL DEFAULT 1000,
|
||||||
|
delay_ms INTEGER NOT NULL DEFAULT 0,
|
||||||
|
retention_ms INTEGER NOT NULL DEFAULT 3600000
|
||||||
|
)`,
|
||||||
|
`CREATE TABLE IF NOT EXISTS tasks (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
queue_name TEXT NOT NULL REFERENCES queues(name),
|
||||||
|
payload BLOB NOT NULL,
|
||||||
|
status TEXT NOT NULL DEFAULT 'pending',
|
||||||
|
attempt INTEGER NOT NULL DEFAULT 0,
|
||||||
|
max_retries INTEGER NOT NULL,
|
||||||
|
next_run_at INTEGER NOT NULL,
|
||||||
|
created_at INTEGER NOT NULL,
|
||||||
|
updated_at INTEGER NOT NULL,
|
||||||
|
message TEXT NOT NULL DEFAULT ''
|
||||||
|
)`,
|
||||||
|
`CREATE INDEX IF NOT EXISTS idx_tasks_dequeue ON tasks(queue_name, status, next_run_at)`,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// applyConfigDefaults fills zero-value config fields with sensible defaults
|
||||||
|
// and clamps values to valid ranges, logging warnings for clamped values.
|
||||||
|
func (s *taskQueueServiceImpl) applyConfigDefaults(ctx context.Context, name string, config *host.QueueConfig) {
|
||||||
|
if config.Concurrency <= 0 {
|
||||||
|
config.Concurrency = defaultConcurrency
|
||||||
|
}
|
||||||
|
if config.BackoffMs <= 0 {
|
||||||
|
config.BackoffMs = defaultBackoffMs
|
||||||
|
}
|
||||||
|
if config.RetentionMs <= 0 {
|
||||||
|
config.RetentionMs = defaultRetentionMs
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.RetentionMs < minRetentionMs {
|
||||||
|
log.Warn(ctx, "TaskQueue retention clamped to minimum", "plugin", s.pluginName, "queue", name,
|
||||||
|
"requested", config.RetentionMs, "min", minRetentionMs)
|
||||||
|
config.RetentionMs = minRetentionMs
|
||||||
|
}
|
||||||
|
if config.RetentionMs > maxRetentionMs {
|
||||||
|
log.Warn(ctx, "TaskQueue retention clamped to maximum", "plugin", s.pluginName, "queue", name,
|
||||||
|
"requested", config.RetentionMs, "max", maxRetentionMs)
|
||||||
|
config.RetentionMs = maxRetentionMs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// clampConcurrency reduces config.Concurrency if it exceeds the remaining budget.
|
||||||
|
// Returns an error when the concurrency budget is fully exhausted.
|
||||||
|
// Must be called with s.mu held.
|
||||||
|
func (s *taskQueueServiceImpl) clampConcurrency(ctx context.Context, name string, config *host.QueueConfig) error {
|
||||||
|
var allocated int32
|
||||||
|
for _, qs := range s.queues {
|
||||||
|
allocated += qs.config.Concurrency
|
||||||
|
}
|
||||||
|
available := s.maxConcurrency - allocated
|
||||||
|
if available <= 0 {
|
||||||
|
log.Warn(ctx, "TaskQueue concurrency budget exhausted", "plugin", s.pluginName, "queue", name,
|
||||||
|
"allocated", allocated, "maxConcurrency", s.maxConcurrency)
|
||||||
|
return fmt.Errorf("concurrency budget exhausted (%d/%d allocated)", allocated, s.maxConcurrency)
|
||||||
|
}
|
||||||
|
if config.Concurrency > available {
|
||||||
|
log.Warn(ctx, "TaskQueue concurrency clamped", "plugin", s.pluginName, "queue", name,
|
||||||
|
"requested", config.Concurrency, "available", available, "maxConcurrency", s.maxConcurrency)
|
||||||
|
config.Concurrency = available
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *taskQueueServiceImpl) CreateQueue(ctx context.Context, name string, config host.QueueConfig) error {
|
||||||
|
if len(name) == 0 {
|
||||||
|
return fmt.Errorf("queue name cannot be empty")
|
||||||
|
}
|
||||||
|
if len(name) > maxQueueNameLength {
|
||||||
|
return fmt.Errorf("queue name exceeds maximum length of %d bytes", maxQueueNameLength)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.applyConfigDefaults(ctx, name, &config)
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
if err := s.clampConcurrency(ctx, name, &config); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, exists := s.queues[name]; exists {
|
||||||
|
return fmt.Errorf("queue %q already exists", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upsert into queues table (idempotent across restarts)
|
||||||
|
_, err := s.db.ExecContext(ctx, `
|
||||||
|
INSERT INTO queues (name, concurrency, max_retries, backoff_ms, delay_ms, retention_ms)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT(name) DO UPDATE SET
|
||||||
|
concurrency = excluded.concurrency,
|
||||||
|
max_retries = excluded.max_retries,
|
||||||
|
backoff_ms = excluded.backoff_ms,
|
||||||
|
delay_ms = excluded.delay_ms,
|
||||||
|
retention_ms = excluded.retention_ms
|
||||||
|
`, name, config.Concurrency, config.MaxRetries, config.BackoffMs, config.DelayMs, config.RetentionMs)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating queue: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset stale running tasks from previous crash
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
_, err = s.db.ExecContext(ctx, `
|
||||||
|
UPDATE tasks SET status = ?, updated_at = ? WHERE queue_name = ? AND status = ?
|
||||||
|
`, taskStatusPending, now, name, taskStatusRunning)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("resetting stale tasks: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
qs := &queueState{
|
||||||
|
config: config,
|
||||||
|
signal: make(chan struct{}, 1),
|
||||||
|
}
|
||||||
|
if config.DelayMs > 0 {
|
||||||
|
// Rate limit dispatches to enforce delay between tasks.
|
||||||
|
// Burst of 1 allows one immediate dispatch, then enforces the delay interval.
|
||||||
|
qs.limiter = rate.NewLimiter(rate.Every(time.Duration(config.DelayMs)*time.Millisecond), 1)
|
||||||
|
}
|
||||||
|
s.queues[name] = qs
|
||||||
|
|
||||||
|
for i := int32(0); i < config.Concurrency; i++ {
|
||||||
|
s.wg.Go(func() { s.worker(name, qs) })
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug(ctx, "Created task queue", "plugin", s.pluginName, "queue", name,
|
||||||
|
"concurrency", config.Concurrency, "maxRetries", config.MaxRetries,
|
||||||
|
"backoffMs", config.BackoffMs, "delayMs", config.DelayMs, "retentionMs", config.RetentionMs)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *taskQueueServiceImpl) Enqueue(ctx context.Context, queueName string, payload []byte) (string, error) {
|
||||||
|
s.mu.Lock()
|
||||||
|
qs, exists := s.queues[queueName]
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return "", fmt.Errorf("queue %q does not exist", queueName)
|
||||||
|
}
|
||||||
|
if len(payload) > maxPayloadSize {
|
||||||
|
return "", fmt.Errorf("payload size %d exceeds maximum of %d bytes", len(payload), maxPayloadSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
taskID := id.NewRandom()
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
|
||||||
|
_, err := s.db.ExecContext(ctx, `
|
||||||
|
INSERT INTO tasks (id, queue_name, payload, status, attempt, max_retries, next_run_at, created_at, updated_at)
|
||||||
|
VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?)
|
||||||
|
`, taskID, queueName, payload, taskStatusPending, qs.config.MaxRetries, now, now, now)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("enqueuing task: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
qs.notifyWorkers()
|
||||||
|
log.Trace(ctx, "Enqueued task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
|
||||||
|
return taskID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns the current state of a task.
|
||||||
|
func (s *taskQueueServiceImpl) Get(ctx context.Context, taskID string) (*host.TaskInfo, error) {
|
||||||
|
var info host.TaskInfo
|
||||||
|
err := s.db.QueryRowContext(ctx, `SELECT status, message, attempt FROM tasks WHERE id = ?`, taskID).
|
||||||
|
Scan(&info.Status, &info.Message, &info.Attempt)
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return nil, fmt.Errorf("task %q not found", taskID)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("getting task info: %w", err)
|
||||||
|
}
|
||||||
|
return &info, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel cancels a pending task.
|
||||||
|
func (s *taskQueueServiceImpl) Cancel(ctx context.Context, taskID string) error {
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
result, err := s.db.ExecContext(ctx, `
|
||||||
|
UPDATE tasks SET status = ?, updated_at = ? WHERE id = ? AND status = ?
|
||||||
|
`, taskStatusCancelled, now, taskID, taskStatusPending)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cancelling task: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rowsAffected, err := result.RowsAffected()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("checking cancel result: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if rowsAffected == 0 {
|
||||||
|
// Check if task exists at all
|
||||||
|
var status string
|
||||||
|
err := s.db.QueryRowContext(ctx, `SELECT status FROM tasks WHERE id = ?`, taskID).Scan(&status)
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return fmt.Errorf("task %q not found", taskID)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("checking task existence: %w", err)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("task %q cannot be cancelled (status: %s)", taskID, status)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Trace(ctx, "Cancelled task", "plugin", s.pluginName, "taskID", taskID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClearQueue removes all pending tasks from the named queue.
|
||||||
|
// Running tasks are not affected. Returns the number of tasks removed.
|
||||||
|
func (s *taskQueueServiceImpl) ClearQueue(ctx context.Context, queueName string) (int64, error) {
|
||||||
|
s.mu.Lock()
|
||||||
|
_, exists := s.queues[queueName]
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return 0, fmt.Errorf("queue %q does not exist", queueName)
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
result, err := s.db.ExecContext(ctx, `
|
||||||
|
UPDATE tasks SET status = ?, updated_at = ? WHERE queue_name = ? AND status = ?
|
||||||
|
`, taskStatusCancelled, now, queueName, taskStatusPending)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("clearing queue: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cleared, err := result.RowsAffected()
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("checking clear result: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cleared > 0 {
|
||||||
|
log.Debug(ctx, "Cleared pending tasks from queue", "plugin", s.pluginName, "queue", queueName, "cleared", cleared)
|
||||||
|
}
|
||||||
|
return cleared, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// worker is the main loop for a single worker goroutine.
|
||||||
|
func (s *taskQueueServiceImpl) worker(queueName string, qs *queueState) {
|
||||||
|
// Process any existing pending tasks immediately on startup
|
||||||
|
s.drainQueue(queueName, qs)
|
||||||
|
|
||||||
|
ticker := time.NewTicker(pollInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
case <-qs.signal:
|
||||||
|
s.drainQueue(queueName, qs)
|
||||||
|
case <-ticker.C:
|
||||||
|
s.drainQueue(queueName, qs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *taskQueueServiceImpl) drainQueue(queueName string, qs *queueState) {
|
||||||
|
for s.ctx.Err() == nil && s.processTask(queueName, qs) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// processTask dequeues and processes a single task. Returns true if a task was processed.
|
||||||
|
func (s *taskQueueServiceImpl) processTask(queueName string, qs *queueState) bool {
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
|
||||||
|
// Atomically dequeue a task
|
||||||
|
var taskID string
|
||||||
|
var payload []byte
|
||||||
|
var attempt, maxRetries int32
|
||||||
|
err := s.db.QueryRowContext(s.ctx, `
|
||||||
|
UPDATE tasks SET status = ?, attempt = attempt + 1, updated_at = ?
|
||||||
|
WHERE id = (
|
||||||
|
SELECT id FROM tasks
|
||||||
|
WHERE queue_name = ? AND status = ? AND next_run_at <= ?
|
||||||
|
ORDER BY next_run_at, created_at LIMIT 1
|
||||||
|
)
|
||||||
|
RETURNING id, payload, attempt, max_retries
|
||||||
|
`, taskStatusRunning, now, queueName, taskStatusPending, now).Scan(&taskID, &payload, &attempt, &maxRetries)
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.Error(s.ctx, "Failed to dequeue task", "plugin", s.pluginName, "queue", queueName, err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enforce delay between task dispatches using a rate limiter.
|
||||||
|
// This is done after dequeue so that empty polls don't consume rate tokens.
|
||||||
|
if qs.limiter != nil {
|
||||||
|
if err := qs.limiter.Wait(s.ctx); err != nil {
|
||||||
|
// Context cancelled during wait — revert task to pending for recovery
|
||||||
|
s.revertTaskToPending(taskID)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Invoke callback
|
||||||
|
log.Debug(s.ctx, "Executing task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID, "attempt", attempt)
|
||||||
|
message, callbackErr := s.invokeCallbackFn(s.ctx, queueName, taskID, payload, attempt)
|
||||||
|
|
||||||
|
// If context was cancelled (shutdown), revert task to pending for recovery
|
||||||
|
if s.ctx.Err() != nil {
|
||||||
|
s.revertTaskToPending(taskID)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if callbackErr == nil {
|
||||||
|
s.completeTask(queueName, taskID, message)
|
||||||
|
} else {
|
||||||
|
s.handleTaskFailure(queueName, taskID, attempt, maxRetries, qs, callbackErr, message)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *taskQueueServiceImpl) completeTask(queueName, taskID, message string) {
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusCompleted, message, now, taskID); err != nil {
|
||||||
|
log.Error(s.ctx, "Failed to mark task as completed", "plugin", s.pluginName, "taskID", taskID, err)
|
||||||
|
}
|
||||||
|
log.Debug(s.ctx, "Task completed", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *taskQueueServiceImpl) handleTaskFailure(queueName, taskID string, attempt, maxRetries int32, qs *queueState, callbackErr error, message string) {
|
||||||
|
log.Warn(s.ctx, "Task execution failed", "plugin", s.pluginName, "queue", queueName,
|
||||||
|
"taskID", taskID, "attempt", attempt, "maxRetries", maxRetries, "err", callbackErr)
|
||||||
|
|
||||||
|
// Use error message as fallback if no message was provided
|
||||||
|
if message == "" {
|
||||||
|
message = callbackErr.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
if attempt > maxRetries {
|
||||||
|
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusFailed, message, now, taskID); err != nil {
|
||||||
|
log.Error(s.ctx, "Failed to mark task as failed", "plugin", s.pluginName, "taskID", taskID, err)
|
||||||
|
}
|
||||||
|
log.Warn(s.ctx, "Task failed after all retries", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exponential backoff: backoffMs * 2^(attempt-1)
|
||||||
|
backoff := qs.config.BackoffMs << (attempt - 1)
|
||||||
|
if backoff <= 0 || backoff > maxBackoffMs {
|
||||||
|
backoff = maxBackoffMs
|
||||||
|
}
|
||||||
|
nextRunAt := now + backoff
|
||||||
|
if _, err := s.db.ExecContext(s.ctx, `
|
||||||
|
UPDATE tasks SET status = ?, next_run_at = ?, updated_at = ? WHERE id = ?
|
||||||
|
`, taskStatusPending, nextRunAt, now, taskID); err != nil {
|
||||||
|
log.Error(s.ctx, "Failed to reschedule task for retry", "plugin", s.pluginName, "taskID", taskID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wake worker after backoff expires
|
||||||
|
time.AfterFunc(time.Duration(backoff)*time.Millisecond, func() {
|
||||||
|
qs.notifyWorkers()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// revertTaskToPending puts a running task back to pending status and decrements the attempt
|
||||||
|
// counter (used during shutdown to ensure the interrupted attempt doesn't count).
|
||||||
|
func (s *taskQueueServiceImpl) revertTaskToPending(taskID string) {
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
_, err := s.db.Exec(`UPDATE tasks SET status = ?, attempt = MAX(attempt - 1, 0), updated_at = ? WHERE id = ? AND status = ?`, taskStatusPending, now, taskID, taskStatusRunning)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Failed to revert task to pending", "plugin", s.pluginName, "taskID", taskID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// defaultInvokeCallback calls the plugin's nd_task_execute function.
|
||||||
|
func (s *taskQueueServiceImpl) defaultInvokeCallback(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error) {
|
||||||
|
s.manager.mu.RLock()
|
||||||
|
p, ok := s.manager.plugins[s.pluginName]
|
||||||
|
s.manager.mu.RUnlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return "", fmt.Errorf("plugin %s not loaded", s.pluginName)
|
||||||
|
}
|
||||||
|
|
||||||
|
input := capabilities.TaskExecuteRequest{
|
||||||
|
QueueName: queueName,
|
||||||
|
TaskID: taskID,
|
||||||
|
Payload: payload,
|
||||||
|
Attempt: attempt,
|
||||||
|
}
|
||||||
|
|
||||||
|
message, err := callPluginFunction[capabilities.TaskExecuteRequest, string](ctx, p, FuncTaskWorkerCallback, input)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return message, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanupLoop periodically removes terminal tasks past their retention period.
|
||||||
|
func (s *taskQueueServiceImpl) cleanupLoop() {
|
||||||
|
ticker := time.NewTicker(taskCleanupInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
s.runCleanup()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// runCleanup deletes terminal tasks past their retention period.
|
||||||
|
func (s *taskQueueServiceImpl) runCleanup() {
|
||||||
|
s.mu.Lock()
|
||||||
|
queues := make(map[string]*queueState, len(s.queues))
|
||||||
|
for k, v := range s.queues {
|
||||||
|
queues[k] = v
|
||||||
|
}
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
for name, qs := range queues {
|
||||||
|
result, err := s.db.ExecContext(s.ctx, `
|
||||||
|
DELETE FROM tasks WHERE queue_name = ? AND status IN (?, ?, ?) AND updated_at + ? < ?
|
||||||
|
`, name, taskStatusCompleted, taskStatusFailed, taskStatusCancelled, qs.config.RetentionMs, now)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(s.ctx, "Failed to cleanup tasks", "plugin", s.pluginName, "queue", name, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if deleted, _ := result.RowsAffected(); deleted > 0 {
|
||||||
|
log.Debug(s.ctx, "Cleaned up terminal tasks", "plugin", s.pluginName, "queue", name, "deleted", deleted)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close shuts down the task queue service, stopping all workers and closing the database.
|
||||||
|
func (s *taskQueueServiceImpl) Close() error {
|
||||||
|
// Cancel context to signal all goroutines
|
||||||
|
s.cancel()
|
||||||
|
|
||||||
|
// Wait for goroutines with timeout
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
s.wg.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(shutdownTimeout):
|
||||||
|
log.Warn("TaskQueue shutdown timed out", "plugin", s.pluginName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark running tasks as pending for recovery on next startup
|
||||||
|
if s.db != nil {
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
if _, err := s.db.Exec(`UPDATE tasks SET status = ?, updated_at = ? WHERE status = ?`, taskStatusPending, now, taskStatusRunning); err != nil {
|
||||||
|
log.Error("Failed to reset running tasks on shutdown", "plugin", s.pluginName, err)
|
||||||
|
}
|
||||||
|
log.Debug("Closing plugin taskqueue", "plugin", s.pluginName)
|
||||||
|
return s.db.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compile-time verification
|
||||||
|
var _ host.TaskService = (*taskQueueServiceImpl)(nil)
|
||||||
|
var _ io.Closer = (*taskQueueServiceImpl)(nil)
|
||||||
1221
plugins/host_taskqueue_test.go
Normal file
1221
plugins/host_taskqueue_test.go
Normal file
File diff suppressed because it is too large
Load Diff
@@ -103,7 +103,7 @@ var hostServices = []hostServiceEntry{
|
|||||||
hasPermission: func(p *Permissions) bool { return p != nil && p.Kvstore != nil },
|
hasPermission: func(p *Permissions) bool { return p != nil && p.Kvstore != nil },
|
||||||
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
||||||
perm := ctx.permissions.Kvstore
|
perm := ctx.permissions.Kvstore
|
||||||
service, err := newKVStoreService(ctx.pluginName, perm)
|
service, err := newKVStoreService(ctx.manager.ctx, ctx.pluginName, perm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Failed to create KVStore service", "plugin", ctx.pluginName, err)
|
log.Error("Failed to create KVStore service", "plugin", ctx.pluginName, err)
|
||||||
return nil, nil
|
return nil, nil
|
||||||
@@ -128,6 +128,23 @@ var hostServices = []hostServiceEntry{
|
|||||||
return host.RegisterHTTPHostFunctions(service), nil
|
return host.RegisterHTTPHostFunctions(service), nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "Task",
|
||||||
|
hasPermission: func(p *Permissions) bool { return p != nil && p.Taskqueue != nil },
|
||||||
|
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
||||||
|
perm := ctx.permissions.Taskqueue
|
||||||
|
maxConcurrency := int32(1)
|
||||||
|
if perm.MaxConcurrency > 0 {
|
||||||
|
maxConcurrency = int32(perm.MaxConcurrency)
|
||||||
|
}
|
||||||
|
service, err := newTaskQueueService(ctx.pluginName, ctx.manager, maxConcurrency)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Failed to create Task service", "plugin", ctx.pluginName, err)
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return host.RegisterTaskHostFunctions(service), service
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// extractManifest reads manifest from an .ndp package and computes its SHA-256 hash.
|
// extractManifest reads manifest from an .ndp package and computes its SHA-256 hash.
|
||||||
|
|||||||
@@ -110,6 +110,9 @@
|
|||||||
},
|
},
|
||||||
"users": {
|
"users": {
|
||||||
"$ref": "#/$defs/UsersPermission"
|
"$ref": "#/$defs/UsersPermission"
|
||||||
|
},
|
||||||
|
"taskqueue": {
|
||||||
|
"$ref": "#/$defs/TaskQueuePermission"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@@ -224,6 +227,23 @@
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"TaskQueuePermission": {
|
||||||
|
"type": "object",
|
||||||
|
"description": "Task queue permissions for background task processing",
|
||||||
|
"additionalProperties": false,
|
||||||
|
"properties": {
|
||||||
|
"reason": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Explanation for why task queue access is needed"
|
||||||
|
},
|
||||||
|
"maxConcurrency": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Maximum total concurrent workers across all queues. Default: 1",
|
||||||
|
"minimum": 1,
|
||||||
|
"default": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
"UsersPermission": {
|
"UsersPermission": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"description": "Users service permissions for accessing user information",
|
"description": "Users service permissions for accessing user information",
|
||||||
|
|||||||
@@ -72,6 +72,13 @@ func ValidateWithCapabilities(m *Manifest, capabilities []Capability) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Task (taskqueue) permission requires TaskWorker capability
|
||||||
|
if m.Permissions != nil && m.Permissions.Taskqueue != nil {
|
||||||
|
if !hasCapability(capabilities, CapabilityTaskWorker) {
|
||||||
|
return fmt.Errorf("'taskqueue' permission requires plugin to export '%s' function", FuncTaskWorkerCallback)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -181,6 +181,9 @@ type Permissions struct {
|
|||||||
// Subsonicapi corresponds to the JSON schema field "subsonicapi".
|
// Subsonicapi corresponds to the JSON schema field "subsonicapi".
|
||||||
Subsonicapi *SubsonicAPIPermission `json:"subsonicapi,omitempty" yaml:"subsonicapi,omitempty" mapstructure:"subsonicapi,omitempty"`
|
Subsonicapi *SubsonicAPIPermission `json:"subsonicapi,omitempty" yaml:"subsonicapi,omitempty" mapstructure:"subsonicapi,omitempty"`
|
||||||
|
|
||||||
|
// Taskqueue corresponds to the JSON schema field "taskqueue".
|
||||||
|
Taskqueue *TaskQueuePermission `json:"taskqueue,omitempty" yaml:"taskqueue,omitempty" mapstructure:"taskqueue,omitempty"`
|
||||||
|
|
||||||
// Users corresponds to the JSON schema field "users".
|
// Users corresponds to the JSON schema field "users".
|
||||||
Users *UsersPermission `json:"users,omitempty" yaml:"users,omitempty" mapstructure:"users,omitempty"`
|
Users *UsersPermission `json:"users,omitempty" yaml:"users,omitempty" mapstructure:"users,omitempty"`
|
||||||
|
|
||||||
@@ -200,6 +203,36 @@ type SubsonicAPIPermission struct {
|
|||||||
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
|
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Task queue permissions for background task processing
|
||||||
|
type TaskQueuePermission struct {
|
||||||
|
// Maximum total concurrent workers across all queues. Default: 1
|
||||||
|
MaxConcurrency int `json:"maxConcurrency,omitempty" yaml:"maxConcurrency,omitempty" mapstructure:"maxConcurrency,omitempty"`
|
||||||
|
|
||||||
|
// Explanation for why task queue access is needed
|
||||||
|
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON implements json.Unmarshaler.
|
||||||
|
func (j *TaskQueuePermission) UnmarshalJSON(value []byte) error {
|
||||||
|
var raw map[string]interface{}
|
||||||
|
if err := json.Unmarshal(value, &raw); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
type Plain TaskQueuePermission
|
||||||
|
var plain Plain
|
||||||
|
if err := json.Unmarshal(value, &plain); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if v, ok := raw["maxConcurrency"]; !ok || v == nil {
|
||||||
|
plain.MaxConcurrency = 1.0
|
||||||
|
}
|
||||||
|
if 1 > plain.MaxConcurrency {
|
||||||
|
return fmt.Errorf("field %s: must be >= %v", "maxConcurrency", 1)
|
||||||
|
}
|
||||||
|
*j = TaskQueuePermission(plain)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Enable experimental WebAssembly threads support
|
// Enable experimental WebAssembly threads support
|
||||||
type ThreadsFeature struct {
|
type ThreadsFeature struct {
|
||||||
// Explanation for why threads support is needed
|
// Explanation for why threads support is needed
|
||||||
|
|||||||
47
plugins/migrate.go
Normal file
47
plugins/migrate.go
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
package plugins
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// migrateDB applies schema migrations to a SQLite database.
|
||||||
|
//
|
||||||
|
// Each entry in migrations is a single SQL statement. The current schema version
|
||||||
|
// is tracked using SQLite's built-in PRAGMA user_version. Only statements after
|
||||||
|
// the current version are executed, within a single transaction.
|
||||||
|
func migrateDB(db *sql.DB, migrations []string) error {
|
||||||
|
var version int
|
||||||
|
if err := db.QueryRow(`PRAGMA user_version`).Scan(&version); err != nil {
|
||||||
|
return fmt.Errorf("reading schema version: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if version >= len(migrations) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
tx, err := db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("starting migration transaction: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = tx.Rollback() }()
|
||||||
|
|
||||||
|
for i := version; i < len(migrations); i++ {
|
||||||
|
if _, err := tx.Exec(migrations[i]); err != nil {
|
||||||
|
return fmt.Errorf("migration %d failed: %w", i+1, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// PRAGMA statements cannot be executed inside a transaction in some SQLite
|
||||||
|
// drivers, but with mattn/go-sqlite3 this works. We set it inside the tx
|
||||||
|
// so that a failed commit leaves the version unchanged.
|
||||||
|
if _, err := tx.Exec(fmt.Sprintf(`PRAGMA user_version = %d`, len(migrations))); err != nil {
|
||||||
|
return fmt.Errorf("updating schema version: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return fmt.Errorf("committing migrations: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
99
plugins/migrate_test.go
Normal file
99
plugins/migrate_test.go
Normal file
@@ -0,0 +1,99 @@
|
|||||||
|
//go:build !windows
|
||||||
|
|
||||||
|
package plugins
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
_ "github.com/mattn/go-sqlite3"
|
||||||
|
. "github.com/onsi/ginkgo/v2"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("migrateDB", func() {
|
||||||
|
var db *sql.DB
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
var err error
|
||||||
|
db, err = sql.Open("sqlite3", ":memory:")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
if db != nil {
|
||||||
|
db.Close()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
getUserVersion := func() int {
|
||||||
|
var version int
|
||||||
|
Expect(db.QueryRow(`PRAGMA user_version`).Scan(&version)).To(Succeed())
|
||||||
|
return version
|
||||||
|
}
|
||||||
|
|
||||||
|
It("applies all migrations on a fresh database", func() {
|
||||||
|
migrations := []string{
|
||||||
|
`CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)`,
|
||||||
|
`ALTER TABLE test ADD COLUMN email TEXT`,
|
||||||
|
}
|
||||||
|
|
||||||
|
Expect(migrateDB(db, migrations)).To(Succeed())
|
||||||
|
Expect(getUserVersion()).To(Equal(2))
|
||||||
|
|
||||||
|
// Verify schema
|
||||||
|
_, err := db.Exec(`INSERT INTO test (id, name, email) VALUES (1, 'Alice', 'alice@test.com')`)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("skips already applied migrations", func() {
|
||||||
|
migrations1 := []string{
|
||||||
|
`CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)`,
|
||||||
|
}
|
||||||
|
Expect(migrateDB(db, migrations1)).To(Succeed())
|
||||||
|
Expect(getUserVersion()).To(Equal(1))
|
||||||
|
|
||||||
|
// Add a new migration
|
||||||
|
migrations2 := []string{
|
||||||
|
`CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)`,
|
||||||
|
`ALTER TABLE test ADD COLUMN email TEXT`,
|
||||||
|
}
|
||||||
|
Expect(migrateDB(db, migrations2)).To(Succeed())
|
||||||
|
Expect(getUserVersion()).To(Equal(2))
|
||||||
|
|
||||||
|
// Verify the new column exists
|
||||||
|
_, err := db.Exec(`INSERT INTO test (id, name, email) VALUES (1, 'Alice', 'alice@test.com')`)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("is a no-op when all migrations are applied", func() {
|
||||||
|
migrations := []string{
|
||||||
|
`CREATE TABLE test (id INTEGER PRIMARY KEY)`,
|
||||||
|
}
|
||||||
|
Expect(migrateDB(db, migrations)).To(Succeed())
|
||||||
|
Expect(migrateDB(db, migrations)).To(Succeed())
|
||||||
|
Expect(getUserVersion()).To(Equal(1))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("is a no-op with empty migrations slice", func() {
|
||||||
|
Expect(migrateDB(db, nil)).To(Succeed())
|
||||||
|
Expect(getUserVersion()).To(Equal(0))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("rolls back on failure", func() {
|
||||||
|
migrations := []string{
|
||||||
|
`CREATE TABLE test (id INTEGER PRIMARY KEY)`,
|
||||||
|
`INVALID SQL STATEMENT`,
|
||||||
|
}
|
||||||
|
|
||||||
|
err := migrateDB(db, migrations)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("migration 2 failed"))
|
||||||
|
|
||||||
|
// Version should remain 0 (rolled back)
|
||||||
|
Expect(getUserVersion()).To(Equal(0))
|
||||||
|
|
||||||
|
// Table should not exist (rolled back)
|
||||||
|
_, err = db.Exec(`INSERT INTO test (id) VALUES (1)`)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
})
|
||||||
|
})
|
||||||
@@ -43,6 +43,7 @@ The following host services are available:
|
|||||||
- Library: provides access to music library metadata for plugins.
|
- Library: provides access to music library metadata for plugins.
|
||||||
- Scheduler: provides task scheduling capabilities for plugins.
|
- Scheduler: provides task scheduling capabilities for plugins.
|
||||||
- SubsonicAPI: provides access to Navidrome's Subsonic API from plugins.
|
- SubsonicAPI: provides access to Navidrome's Subsonic API from plugins.
|
||||||
|
- Task: provides persistent task queues for plugins.
|
||||||
- Users: provides access to user information for plugins.
|
- Users: provides access to user information for plugins.
|
||||||
- WebSocket: provides WebSocket communication capabilities for plugins.
|
- WebSocket: provides WebSocket communication capabilities for plugins.
|
||||||
|
|
||||||
|
|||||||
@@ -19,15 +19,20 @@ import (
|
|||||||
//go:wasmimport extism:host/user kvstore_set
|
//go:wasmimport extism:host/user kvstore_set
|
||||||
func kvstore_set(uint64) uint64
|
func kvstore_set(uint64) uint64
|
||||||
|
|
||||||
|
// kvstore_setwithttl is the host function provided by Navidrome.
|
||||||
|
//
|
||||||
|
//go:wasmimport extism:host/user kvstore_setwithttl
|
||||||
|
func kvstore_setwithttl(uint64) uint64
|
||||||
|
|
||||||
// kvstore_get is the host function provided by Navidrome.
|
// kvstore_get is the host function provided by Navidrome.
|
||||||
//
|
//
|
||||||
//go:wasmimport extism:host/user kvstore_get
|
//go:wasmimport extism:host/user kvstore_get
|
||||||
func kvstore_get(uint64) uint64
|
func kvstore_get(uint64) uint64
|
||||||
|
|
||||||
// kvstore_delete is the host function provided by Navidrome.
|
// kvstore_getmany is the host function provided by Navidrome.
|
||||||
//
|
//
|
||||||
//go:wasmimport extism:host/user kvstore_delete
|
//go:wasmimport extism:host/user kvstore_getmany
|
||||||
func kvstore_delete(uint64) uint64
|
func kvstore_getmany(uint64) uint64
|
||||||
|
|
||||||
// kvstore_has is the host function provided by Navidrome.
|
// kvstore_has is the host function provided by Navidrome.
|
||||||
//
|
//
|
||||||
@@ -39,6 +44,16 @@ func kvstore_has(uint64) uint64
|
|||||||
//go:wasmimport extism:host/user kvstore_list
|
//go:wasmimport extism:host/user kvstore_list
|
||||||
func kvstore_list(uint64) uint64
|
func kvstore_list(uint64) uint64
|
||||||
|
|
||||||
|
// kvstore_delete is the host function provided by Navidrome.
|
||||||
|
//
|
||||||
|
//go:wasmimport extism:host/user kvstore_delete
|
||||||
|
func kvstore_delete(uint64) uint64
|
||||||
|
|
||||||
|
// kvstore_deletebyprefix is the host function provided by Navidrome.
|
||||||
|
//
|
||||||
|
//go:wasmimport extism:host/user kvstore_deletebyprefix
|
||||||
|
func kvstore_deletebyprefix(uint64) uint64
|
||||||
|
|
||||||
// kvstore_getstorageused is the host function provided by Navidrome.
|
// kvstore_getstorageused is the host function provided by Navidrome.
|
||||||
//
|
//
|
||||||
//go:wasmimport extism:host/user kvstore_getstorageused
|
//go:wasmimport extism:host/user kvstore_getstorageused
|
||||||
@@ -49,6 +64,12 @@ type kVStoreSetRequest struct {
|
|||||||
Value []byte `json:"value"`
|
Value []byte `json:"value"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type kVStoreSetWithTTLRequest struct {
|
||||||
|
Key string `json:"key"`
|
||||||
|
Value []byte `json:"value"`
|
||||||
|
TtlSeconds int64 `json:"ttlSeconds"`
|
||||||
|
}
|
||||||
|
|
||||||
type kVStoreGetRequest struct {
|
type kVStoreGetRequest struct {
|
||||||
Key string `json:"key"`
|
Key string `json:"key"`
|
||||||
}
|
}
|
||||||
@@ -59,8 +80,13 @@ type kVStoreGetResponse struct {
|
|||||||
Error string `json:"error,omitempty"`
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type kVStoreDeleteRequest struct {
|
type kVStoreGetManyRequest struct {
|
||||||
Key string `json:"key"`
|
Keys []string `json:"keys"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type kVStoreGetManyResponse struct {
|
||||||
|
Values map[string][]byte `json:"values,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type kVStoreHasRequest struct {
|
type kVStoreHasRequest struct {
|
||||||
@@ -81,6 +107,19 @@ type kVStoreListResponse struct {
|
|||||||
Error string `json:"error,omitempty"`
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type kVStoreDeleteRequest struct {
|
||||||
|
Key string `json:"key"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type kVStoreDeleteByPrefixRequest struct {
|
||||||
|
Prefix string `json:"prefix"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type kVStoreDeleteByPrefixResponse struct {
|
||||||
|
DeletedCount int64 `json:"deletedCount,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
type kVStoreGetStorageUsedResponse struct {
|
type kVStoreGetStorageUsedResponse struct {
|
||||||
Bytes int64 `json:"bytes,omitempty"`
|
Bytes int64 `json:"bytes,omitempty"`
|
||||||
Error string `json:"error,omitempty"`
|
Error string `json:"error,omitempty"`
|
||||||
@@ -127,6 +166,52 @@ func KVStoreSet(key string, value []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// KVStoreSetWithTTL calls the kvstore_setwithttl host function.
|
||||||
|
// SetWithTTL stores a byte value with the given key and a time-to-live.
|
||||||
|
//
|
||||||
|
// After ttlSeconds, the key is treated as non-existent and will be
|
||||||
|
// cleaned up lazily. ttlSeconds must be greater than 0.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - key: The storage key (max 256 bytes, UTF-8)
|
||||||
|
// - value: The byte slice to store
|
||||||
|
// - ttlSeconds: Time-to-live in seconds (must be > 0)
|
||||||
|
//
|
||||||
|
// Returns an error if the storage limit would be exceeded or the operation fails.
|
||||||
|
func KVStoreSetWithTTL(key string, value []byte, ttlSeconds int64) error {
|
||||||
|
// Marshal request to JSON
|
||||||
|
req := kVStoreSetWithTTLRequest{
|
||||||
|
Key: key,
|
||||||
|
Value: value,
|
||||||
|
TtlSeconds: ttlSeconds,
|
||||||
|
}
|
||||||
|
reqBytes, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
reqMem := pdk.AllocateBytes(reqBytes)
|
||||||
|
defer reqMem.Free()
|
||||||
|
|
||||||
|
// Call the host function
|
||||||
|
responsePtr := kvstore_setwithttl(reqMem.Offset())
|
||||||
|
|
||||||
|
// Read the response from memory
|
||||||
|
responseMem := pdk.FindMemory(responsePtr)
|
||||||
|
responseBytes := responseMem.ReadBytes()
|
||||||
|
|
||||||
|
// Parse error-only response
|
||||||
|
var response struct {
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if response.Error != "" {
|
||||||
|
return errors.New(response.Error)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// KVStoreGet calls the kvstore_get host function.
|
// KVStoreGet calls the kvstore_get host function.
|
||||||
// Get retrieves a byte value from storage.
|
// Get retrieves a byte value from storage.
|
||||||
//
|
//
|
||||||
@@ -167,43 +252,45 @@ func KVStoreGet(key string) ([]byte, bool, error) {
|
|||||||
return response.Value, response.Exists, nil
|
return response.Value, response.Exists, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// KVStoreDelete calls the kvstore_delete host function.
|
// KVStoreGetMany calls the kvstore_getmany host function.
|
||||||
// Delete removes a value from storage.
|
// GetMany retrieves multiple values in a single call.
|
||||||
//
|
//
|
||||||
// Parameters:
|
// Parameters:
|
||||||
// - key: The storage key
|
// - keys: The storage keys to retrieve
|
||||||
//
|
//
|
||||||
// Returns an error if the operation fails. Does not return an error if the key doesn't exist.
|
// Returns a map of key to value for keys that exist and have not expired.
|
||||||
func KVStoreDelete(key string) error {
|
// Missing or expired keys are omitted from the result.
|
||||||
|
func KVStoreGetMany(keys []string) (map[string][]byte, error) {
|
||||||
// Marshal request to JSON
|
// Marshal request to JSON
|
||||||
req := kVStoreDeleteRequest{
|
req := kVStoreGetManyRequest{
|
||||||
Key: key,
|
Keys: keys,
|
||||||
}
|
}
|
||||||
reqBytes, err := json.Marshal(req)
|
reqBytes, err := json.Marshal(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
reqMem := pdk.AllocateBytes(reqBytes)
|
reqMem := pdk.AllocateBytes(reqBytes)
|
||||||
defer reqMem.Free()
|
defer reqMem.Free()
|
||||||
|
|
||||||
// Call the host function
|
// Call the host function
|
||||||
responsePtr := kvstore_delete(reqMem.Offset())
|
responsePtr := kvstore_getmany(reqMem.Offset())
|
||||||
|
|
||||||
// Read the response from memory
|
// Read the response from memory
|
||||||
responseMem := pdk.FindMemory(responsePtr)
|
responseMem := pdk.FindMemory(responsePtr)
|
||||||
responseBytes := responseMem.ReadBytes()
|
responseBytes := responseMem.ReadBytes()
|
||||||
|
|
||||||
// Parse error-only response
|
// Parse the response
|
||||||
var response struct {
|
var response kVStoreGetManyResponse
|
||||||
Error string `json:"error,omitempty"`
|
|
||||||
}
|
|
||||||
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Convert Error field to Go error
|
||||||
if response.Error != "" {
|
if response.Error != "" {
|
||||||
return errors.New(response.Error)
|
return nil, errors.New(response.Error)
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
|
return response.Values, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// KVStoreHas calls the kvstore_has host function.
|
// KVStoreHas calls the kvstore_has host function.
|
||||||
@@ -286,6 +373,85 @@ func KVStoreList(prefix string) ([]string, error) {
|
|||||||
return response.Keys, nil
|
return response.Keys, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// KVStoreDelete calls the kvstore_delete host function.
|
||||||
|
// Delete removes a value from storage.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - key: The storage key
|
||||||
|
//
|
||||||
|
// Returns an error if the operation fails. Does not return an error if the key doesn't exist.
|
||||||
|
func KVStoreDelete(key string) error {
|
||||||
|
// Marshal request to JSON
|
||||||
|
req := kVStoreDeleteRequest{
|
||||||
|
Key: key,
|
||||||
|
}
|
||||||
|
reqBytes, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
reqMem := pdk.AllocateBytes(reqBytes)
|
||||||
|
defer reqMem.Free()
|
||||||
|
|
||||||
|
// Call the host function
|
||||||
|
responsePtr := kvstore_delete(reqMem.Offset())
|
||||||
|
|
||||||
|
// Read the response from memory
|
||||||
|
responseMem := pdk.FindMemory(responsePtr)
|
||||||
|
responseBytes := responseMem.ReadBytes()
|
||||||
|
|
||||||
|
// Parse error-only response
|
||||||
|
var response struct {
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if response.Error != "" {
|
||||||
|
return errors.New(response.Error)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVStoreDeleteByPrefix calls the kvstore_deletebyprefix host function.
|
||||||
|
// DeleteByPrefix removes all keys matching the given prefix.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - prefix: Key prefix to match (must not be empty)
|
||||||
|
//
|
||||||
|
// Returns the number of keys deleted. Includes expired keys.
|
||||||
|
func KVStoreDeleteByPrefix(prefix string) (int64, error) {
|
||||||
|
// Marshal request to JSON
|
||||||
|
req := kVStoreDeleteByPrefixRequest{
|
||||||
|
Prefix: prefix,
|
||||||
|
}
|
||||||
|
reqBytes, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
reqMem := pdk.AllocateBytes(reqBytes)
|
||||||
|
defer reqMem.Free()
|
||||||
|
|
||||||
|
// Call the host function
|
||||||
|
responsePtr := kvstore_deletebyprefix(reqMem.Offset())
|
||||||
|
|
||||||
|
// Read the response from memory
|
||||||
|
responseMem := pdk.FindMemory(responsePtr)
|
||||||
|
responseBytes := responseMem.ReadBytes()
|
||||||
|
|
||||||
|
// Parse the response
|
||||||
|
var response kVStoreDeleteByPrefixResponse
|
||||||
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert Error field to Go error
|
||||||
|
if response.Error != "" {
|
||||||
|
return 0, errors.New(response.Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
return response.DeletedCount, nil
|
||||||
|
}
|
||||||
|
|
||||||
// KVStoreGetStorageUsed calls the kvstore_getstorageused host function.
|
// KVStoreGetStorageUsed calls the kvstore_getstorageused host function.
|
||||||
// GetStorageUsed returns the total storage used by this plugin in bytes.
|
// GetStorageUsed returns the total storage used by this plugin in bytes.
|
||||||
func KVStoreGetStorageUsed() (int64, error) {
|
func KVStoreGetStorageUsed() (int64, error) {
|
||||||
|
|||||||
@@ -37,6 +37,28 @@ func KVStoreSet(key string, value []byte) error {
|
|||||||
return KVStoreMock.Set(key, value)
|
return KVStoreMock.Set(key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetWithTTL is the mock method for KVStoreSetWithTTL.
|
||||||
|
func (m *mockKVStoreService) SetWithTTL(key string, value []byte, ttlSeconds int64) error {
|
||||||
|
args := m.Called(key, value, ttlSeconds)
|
||||||
|
return args.Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVStoreSetWithTTL delegates to the mock instance.
|
||||||
|
// SetWithTTL stores a byte value with the given key and a time-to-live.
|
||||||
|
//
|
||||||
|
// After ttlSeconds, the key is treated as non-existent and will be
|
||||||
|
// cleaned up lazily. ttlSeconds must be greater than 0.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - key: The storage key (max 256 bytes, UTF-8)
|
||||||
|
// - value: The byte slice to store
|
||||||
|
// - ttlSeconds: Time-to-live in seconds (must be > 0)
|
||||||
|
//
|
||||||
|
// Returns an error if the storage limit would be exceeded or the operation fails.
|
||||||
|
func KVStoreSetWithTTL(key string, value []byte, ttlSeconds int64) error {
|
||||||
|
return KVStoreMock.SetWithTTL(key, value, ttlSeconds)
|
||||||
|
}
|
||||||
|
|
||||||
// Get is the mock method for KVStoreGet.
|
// Get is the mock method for KVStoreGet.
|
||||||
func (m *mockKVStoreService) Get(key string) ([]byte, bool, error) {
|
func (m *mockKVStoreService) Get(key string) ([]byte, bool, error) {
|
||||||
args := m.Called(key)
|
args := m.Called(key)
|
||||||
@@ -54,21 +76,22 @@ func KVStoreGet(key string) ([]byte, bool, error) {
|
|||||||
return KVStoreMock.Get(key)
|
return KVStoreMock.Get(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete is the mock method for KVStoreDelete.
|
// GetMany is the mock method for KVStoreGetMany.
|
||||||
func (m *mockKVStoreService) Delete(key string) error {
|
func (m *mockKVStoreService) GetMany(keys []string) (map[string][]byte, error) {
|
||||||
args := m.Called(key)
|
args := m.Called(keys)
|
||||||
return args.Error(0)
|
return args.Get(0).(map[string][]byte), args.Error(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// KVStoreDelete delegates to the mock instance.
|
// KVStoreGetMany delegates to the mock instance.
|
||||||
// Delete removes a value from storage.
|
// GetMany retrieves multiple values in a single call.
|
||||||
//
|
//
|
||||||
// Parameters:
|
// Parameters:
|
||||||
// - key: The storage key
|
// - keys: The storage keys to retrieve
|
||||||
//
|
//
|
||||||
// Returns an error if the operation fails. Does not return an error if the key doesn't exist.
|
// Returns a map of key to value for keys that exist and have not expired.
|
||||||
func KVStoreDelete(key string) error {
|
// Missing or expired keys are omitted from the result.
|
||||||
return KVStoreMock.Delete(key)
|
func KVStoreGetMany(keys []string) (map[string][]byte, error) {
|
||||||
|
return KVStoreMock.GetMany(keys)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Has is the mock method for KVStoreHas.
|
// Has is the mock method for KVStoreHas.
|
||||||
@@ -105,6 +128,40 @@ func KVStoreList(prefix string) ([]string, error) {
|
|||||||
return KVStoreMock.List(prefix)
|
return KVStoreMock.List(prefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete is the mock method for KVStoreDelete.
|
||||||
|
func (m *mockKVStoreService) Delete(key string) error {
|
||||||
|
args := m.Called(key)
|
||||||
|
return args.Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVStoreDelete delegates to the mock instance.
|
||||||
|
// Delete removes a value from storage.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - key: The storage key
|
||||||
|
//
|
||||||
|
// Returns an error if the operation fails. Does not return an error if the key doesn't exist.
|
||||||
|
func KVStoreDelete(key string) error {
|
||||||
|
return KVStoreMock.Delete(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteByPrefix is the mock method for KVStoreDeleteByPrefix.
|
||||||
|
func (m *mockKVStoreService) DeleteByPrefix(prefix string) (int64, error) {
|
||||||
|
args := m.Called(prefix)
|
||||||
|
return args.Get(0).(int64), args.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVStoreDeleteByPrefix delegates to the mock instance.
|
||||||
|
// DeleteByPrefix removes all keys matching the given prefix.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - prefix: Key prefix to match (must not be empty)
|
||||||
|
//
|
||||||
|
// Returns the number of keys deleted. Includes expired keys.
|
||||||
|
func KVStoreDeleteByPrefix(prefix string) (int64, error) {
|
||||||
|
return KVStoreMock.DeleteByPrefix(prefix)
|
||||||
|
}
|
||||||
|
|
||||||
// GetStorageUsed is the mock method for KVStoreGetStorageUsed.
|
// GetStorageUsed is the mock method for KVStoreGetStorageUsed.
|
||||||
func (m *mockKVStoreService) GetStorageUsed() (int64, error) {
|
func (m *mockKVStoreService) GetStorageUsed() (int64, error) {
|
||||||
args := m.Called()
|
args := m.Called()
|
||||||
|
|||||||
277
plugins/pdk/go/host/nd_host_task.go
Normal file
277
plugins/pdk/go/host/nd_host_task.go
Normal file
@@ -0,0 +1,277 @@
|
|||||||
|
// Code generated by ndpgen. DO NOT EDIT.
|
||||||
|
//
|
||||||
|
// This file contains client wrappers for the Task host service.
|
||||||
|
// It is intended for use in Navidrome plugins built with TinyGo.
|
||||||
|
//
|
||||||
|
//go:build wasip1
|
||||||
|
|
||||||
|
package host
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
||||||
|
)
|
||||||
|
|
||||||
|
// QueueConfig represents the QueueConfig data structure.
|
||||||
|
// QueueConfig holds configuration for a task queue.
|
||||||
|
type QueueConfig struct {
|
||||||
|
Concurrency int32 `json:"concurrency"`
|
||||||
|
MaxRetries int32 `json:"maxRetries"`
|
||||||
|
BackoffMs int64 `json:"backoffMs"`
|
||||||
|
DelayMs int64 `json:"delayMs"`
|
||||||
|
RetentionMs int64 `json:"retentionMs"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskInfo represents the TaskInfo data structure.
|
||||||
|
// TaskInfo holds the current state of a task.
|
||||||
|
type TaskInfo struct {
|
||||||
|
Status string `json:"status"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
Attempt int32 `json:"attempt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// task_createqueue is the host function provided by Navidrome.
|
||||||
|
//
|
||||||
|
//go:wasmimport extism:host/user task_createqueue
|
||||||
|
func task_createqueue(uint64) uint64
|
||||||
|
|
||||||
|
// task_enqueue is the host function provided by Navidrome.
|
||||||
|
//
|
||||||
|
//go:wasmimport extism:host/user task_enqueue
|
||||||
|
func task_enqueue(uint64) uint64
|
||||||
|
|
||||||
|
// task_get is the host function provided by Navidrome.
|
||||||
|
//
|
||||||
|
//go:wasmimport extism:host/user task_get
|
||||||
|
func task_get(uint64) uint64
|
||||||
|
|
||||||
|
// task_cancel is the host function provided by Navidrome.
|
||||||
|
//
|
||||||
|
//go:wasmimport extism:host/user task_cancel
|
||||||
|
func task_cancel(uint64) uint64
|
||||||
|
|
||||||
|
// task_clearqueue is the host function provided by Navidrome.
|
||||||
|
//
|
||||||
|
//go:wasmimport extism:host/user task_clearqueue
|
||||||
|
func task_clearqueue(uint64) uint64
|
||||||
|
|
||||||
|
type taskCreateQueueRequest struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Config QueueConfig `json:"config"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type taskEnqueueRequest struct {
|
||||||
|
QueueName string `json:"queueName"`
|
||||||
|
Payload []byte `json:"payload"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type taskEnqueueResponse struct {
|
||||||
|
Result string `json:"result,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type taskGetRequest struct {
|
||||||
|
TaskID string `json:"taskId"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type taskGetResponse struct {
|
||||||
|
Result *TaskInfo `json:"result,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type taskCancelRequest struct {
|
||||||
|
TaskID string `json:"taskId"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type taskClearQueueRequest struct {
|
||||||
|
QueueName string `json:"queueName"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type taskClearQueueResponse struct {
|
||||||
|
Result int64 `json:"result,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskCreateQueue calls the task_createqueue host function.
|
||||||
|
// CreateQueue creates a named task queue with the given configuration.
|
||||||
|
// Zero-value fields in config use sensible defaults.
|
||||||
|
// If a queue with the same name already exists, returns an error.
|
||||||
|
// On startup, this also recovers any stale "running" tasks from a previous crash.
|
||||||
|
func TaskCreateQueue(name string, config QueueConfig) error {
|
||||||
|
// Marshal request to JSON
|
||||||
|
req := taskCreateQueueRequest{
|
||||||
|
Name: name,
|
||||||
|
Config: config,
|
||||||
|
}
|
||||||
|
reqBytes, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
reqMem := pdk.AllocateBytes(reqBytes)
|
||||||
|
defer reqMem.Free()
|
||||||
|
|
||||||
|
// Call the host function
|
||||||
|
responsePtr := task_createqueue(reqMem.Offset())
|
||||||
|
|
||||||
|
// Read the response from memory
|
||||||
|
responseMem := pdk.FindMemory(responsePtr)
|
||||||
|
responseBytes := responseMem.ReadBytes()
|
||||||
|
|
||||||
|
// Parse error-only response
|
||||||
|
var response struct {
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if response.Error != "" {
|
||||||
|
return errors.New(response.Error)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskEnqueue calls the task_enqueue host function.
|
||||||
|
// Enqueue adds a task to the named queue. Returns the task ID.
|
||||||
|
// payload is opaque bytes passed back to the plugin on execution.
|
||||||
|
func TaskEnqueue(queueName string, payload []byte) (string, error) {
|
||||||
|
// Marshal request to JSON
|
||||||
|
req := taskEnqueueRequest{
|
||||||
|
QueueName: queueName,
|
||||||
|
Payload: payload,
|
||||||
|
}
|
||||||
|
reqBytes, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
reqMem := pdk.AllocateBytes(reqBytes)
|
||||||
|
defer reqMem.Free()
|
||||||
|
|
||||||
|
// Call the host function
|
||||||
|
responsePtr := task_enqueue(reqMem.Offset())
|
||||||
|
|
||||||
|
// Read the response from memory
|
||||||
|
responseMem := pdk.FindMemory(responsePtr)
|
||||||
|
responseBytes := responseMem.ReadBytes()
|
||||||
|
|
||||||
|
// Parse the response
|
||||||
|
var response taskEnqueueResponse
|
||||||
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert Error field to Go error
|
||||||
|
if response.Error != "" {
|
||||||
|
return "", errors.New(response.Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
return response.Result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskGet calls the task_get host function.
|
||||||
|
// Get returns the current state of a task including its status,
|
||||||
|
// message, and attempt count.
|
||||||
|
func TaskGet(taskID string) (*TaskInfo, error) {
|
||||||
|
// Marshal request to JSON
|
||||||
|
req := taskGetRequest{
|
||||||
|
TaskID: taskID,
|
||||||
|
}
|
||||||
|
reqBytes, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
reqMem := pdk.AllocateBytes(reqBytes)
|
||||||
|
defer reqMem.Free()
|
||||||
|
|
||||||
|
// Call the host function
|
||||||
|
responsePtr := task_get(reqMem.Offset())
|
||||||
|
|
||||||
|
// Read the response from memory
|
||||||
|
responseMem := pdk.FindMemory(responsePtr)
|
||||||
|
responseBytes := responseMem.ReadBytes()
|
||||||
|
|
||||||
|
// Parse the response
|
||||||
|
var response taskGetResponse
|
||||||
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert Error field to Go error
|
||||||
|
if response.Error != "" {
|
||||||
|
return nil, errors.New(response.Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
return response.Result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskCancel calls the task_cancel host function.
|
||||||
|
// Cancel cancels a pending task. Returns error if already
|
||||||
|
// running, completed, or failed.
|
||||||
|
func TaskCancel(taskID string) error {
|
||||||
|
// Marshal request to JSON
|
||||||
|
req := taskCancelRequest{
|
||||||
|
TaskID: taskID,
|
||||||
|
}
|
||||||
|
reqBytes, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
reqMem := pdk.AllocateBytes(reqBytes)
|
||||||
|
defer reqMem.Free()
|
||||||
|
|
||||||
|
// Call the host function
|
||||||
|
responsePtr := task_cancel(reqMem.Offset())
|
||||||
|
|
||||||
|
// Read the response from memory
|
||||||
|
responseMem := pdk.FindMemory(responsePtr)
|
||||||
|
responseBytes := responseMem.ReadBytes()
|
||||||
|
|
||||||
|
// Parse error-only response
|
||||||
|
var response struct {
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if response.Error != "" {
|
||||||
|
return errors.New(response.Error)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskClearQueue calls the task_clearqueue host function.
|
||||||
|
// ClearQueue removes all pending tasks from the named queue.
|
||||||
|
// Running tasks are not affected. Returns the number of tasks removed.
|
||||||
|
func TaskClearQueue(queueName string) (int64, error) {
|
||||||
|
// Marshal request to JSON
|
||||||
|
req := taskClearQueueRequest{
|
||||||
|
QueueName: queueName,
|
||||||
|
}
|
||||||
|
reqBytes, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
reqMem := pdk.AllocateBytes(reqBytes)
|
||||||
|
defer reqMem.Free()
|
||||||
|
|
||||||
|
// Call the host function
|
||||||
|
responsePtr := task_clearqueue(reqMem.Offset())
|
||||||
|
|
||||||
|
// Read the response from memory
|
||||||
|
responseMem := pdk.FindMemory(responsePtr)
|
||||||
|
responseBytes := responseMem.ReadBytes()
|
||||||
|
|
||||||
|
// Parse the response
|
||||||
|
var response taskClearQueueResponse
|
||||||
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert Error field to Go error
|
||||||
|
if response.Error != "" {
|
||||||
|
return 0, errors.New(response.Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
return response.Result, nil
|
||||||
|
}
|
||||||
92
plugins/pdk/go/host/nd_host_task_stub.go
Normal file
92
plugins/pdk/go/host/nd_host_task_stub.go
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
// Code generated by ndpgen. DO NOT EDIT.
|
||||||
|
//
|
||||||
|
// This file contains mock implementations for non-WASM builds.
|
||||||
|
// These mocks allow IDE support, compilation, and unit testing on non-WASM platforms.
|
||||||
|
// Plugin authors can use the exported mock instances to set expectations in tests.
|
||||||
|
//
|
||||||
|
//go:build !wasip1
|
||||||
|
|
||||||
|
package host
|
||||||
|
|
||||||
|
import "github.com/stretchr/testify/mock"
|
||||||
|
|
||||||
|
// QueueConfig represents the QueueConfig data structure.
|
||||||
|
// QueueConfig holds configuration for a task queue.
|
||||||
|
type QueueConfig struct {
|
||||||
|
Concurrency int32 `json:"concurrency"`
|
||||||
|
MaxRetries int32 `json:"maxRetries"`
|
||||||
|
BackoffMs int64 `json:"backoffMs"`
|
||||||
|
DelayMs int64 `json:"delayMs"`
|
||||||
|
RetentionMs int64 `json:"retentionMs"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskInfo represents the TaskInfo data structure.
|
||||||
|
// TaskInfo holds the current state of a task.
|
||||||
|
type TaskInfo struct {
|
||||||
|
Status string `json:"status"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
Attempt int32 `json:"attempt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// mockTaskService is the mock implementation for testing.
|
||||||
|
type mockTaskService struct {
|
||||||
|
mock.Mock
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskMock is the auto-instantiated mock instance for testing.
|
||||||
|
// Use this to set expectations: host.TaskMock.On("MethodName", args...).Return(values...)
|
||||||
|
var TaskMock = &mockTaskService{}
|
||||||
|
|
||||||
|
// CreateQueue is the mock method for TaskCreateQueue.
|
||||||
|
func (m *mockTaskService) CreateQueue(name string, config QueueConfig) error {
|
||||||
|
args := m.Called(name, config)
|
||||||
|
return args.Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskCreateQueue delegates to the mock instance.
|
||||||
|
// CreateQueue creates a named task queue with the given configuration.
|
||||||
|
// Zero-value fields in config use sensible defaults.
|
||||||
|
// If a queue with the same name already exists, returns an error.
|
||||||
|
// On startup, this also recovers any stale "running" tasks from a previous crash.
|
||||||
|
func TaskCreateQueue(name string, config QueueConfig) error {
|
||||||
|
return TaskMock.CreateQueue(name, config)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enqueue is the mock method for TaskEnqueue.
|
||||||
|
func (m *mockTaskService) Enqueue(queueName string, payload []byte) (string, error) {
|
||||||
|
args := m.Called(queueName, payload)
|
||||||
|
return args.String(0), args.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskEnqueue delegates to the mock instance.
|
||||||
|
// Enqueue adds a task to the named queue. Returns the task ID.
|
||||||
|
// payload is opaque bytes passed back to the plugin on execution.
|
||||||
|
func TaskEnqueue(queueName string, payload []byte) (string, error) {
|
||||||
|
return TaskMock.Enqueue(queueName, payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get is the mock method for TaskGet.
|
||||||
|
func (m *mockTaskService) Get(taskID string) (*TaskInfo, error) {
|
||||||
|
args := m.Called(taskID)
|
||||||
|
return args.Get(0).(*TaskInfo), args.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskGet delegates to the mock instance.
|
||||||
|
// Get returns the current state of a task including its status,
|
||||||
|
// message, and attempt count.
|
||||||
|
func TaskGet(taskID string) (*TaskInfo, error) {
|
||||||
|
return TaskMock.Get(taskID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel is the mock method for TaskCancel.
|
||||||
|
func (m *mockTaskService) Cancel(taskID string) error {
|
||||||
|
args := m.Called(taskID)
|
||||||
|
return args.Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskCancel delegates to the mock instance.
|
||||||
|
// Cancel cancels a pending task. Returns error if already
|
||||||
|
// running, completed, or failed.
|
||||||
|
func TaskCancel(taskID string) error {
|
||||||
|
return TaskMock.Cancel(taskID)
|
||||||
|
}
|
||||||
79
plugins/pdk/go/taskworker/taskworker.go
Normal file
79
plugins/pdk/go/taskworker/taskworker.go
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
// Code generated by ndpgen. DO NOT EDIT.
|
||||||
|
//
|
||||||
|
// This file contains export wrappers for the TaskWorker capability.
|
||||||
|
// It is intended for use in Navidrome plugins built with TinyGo.
|
||||||
|
//
|
||||||
|
//go:build wasip1
|
||||||
|
|
||||||
|
package taskworker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TaskExecuteRequest is the request provided when a task is ready to execute.
|
||||||
|
type TaskExecuteRequest struct {
|
||||||
|
// QueueName is the name of the queue this task belongs to.
|
||||||
|
QueueName string `json:"queueName"`
|
||||||
|
// TaskID is the unique identifier for this task.
|
||||||
|
TaskID string `json:"taskId"`
|
||||||
|
// Payload is the opaque data provided when the task was enqueued.
|
||||||
|
Payload []byte `json:"payload"`
|
||||||
|
// Attempt is the current attempt number (1-based: first attempt = 1).
|
||||||
|
Attempt int32 `json:"attempt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskWorker is the marker interface for taskworker plugins.
|
||||||
|
// Implement one or more of the provider interfaces below.
|
||||||
|
// TaskWorker provides task execution handling.
|
||||||
|
// This capability allows plugins to receive callbacks when their queued tasks
|
||||||
|
// are ready to execute. Plugins that use the taskqueue host service must
|
||||||
|
// implement this capability.
|
||||||
|
type TaskWorker interface{}
|
||||||
|
|
||||||
|
// TaskExecuteProvider provides the OnTaskExecute function.
|
||||||
|
type TaskExecuteProvider interface {
|
||||||
|
OnTaskExecute(TaskExecuteRequest) (string, error)
|
||||||
|
} // Internal implementation holders
|
||||||
|
var (
|
||||||
|
taskExecuteImpl func(TaskExecuteRequest) (string, error)
|
||||||
|
)
|
||||||
|
|
||||||
|
// Register registers a taskworker implementation.
|
||||||
|
// The implementation is checked for optional provider interfaces.
|
||||||
|
func Register(impl TaskWorker) {
|
||||||
|
if p, ok := impl.(TaskExecuteProvider); ok {
|
||||||
|
taskExecuteImpl = p.OnTaskExecute
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotImplementedCode is the standard return code for unimplemented functions.
|
||||||
|
// The host recognizes this and skips the plugin gracefully.
|
||||||
|
const NotImplementedCode int32 = -2
|
||||||
|
|
||||||
|
//go:wasmexport nd_task_execute
|
||||||
|
func _NdTaskExecute() int32 {
|
||||||
|
if taskExecuteImpl == nil {
|
||||||
|
// Return standard code - host will skip this plugin gracefully
|
||||||
|
return NotImplementedCode
|
||||||
|
}
|
||||||
|
|
||||||
|
var input TaskExecuteRequest
|
||||||
|
if err := pdk.InputJSON(&input); err != nil {
|
||||||
|
pdk.SetError(err)
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
output, err := taskExecuteImpl(input)
|
||||||
|
if err != nil {
|
||||||
|
pdk.SetError(err)
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pdk.OutputJSON(output); err != nil {
|
||||||
|
pdk.SetError(err)
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0
|
||||||
|
}
|
||||||
41
plugins/pdk/go/taskworker/taskworker_stub.go
Normal file
41
plugins/pdk/go/taskworker/taskworker_stub.go
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
// Code generated by ndpgen. DO NOT EDIT.
|
||||||
|
//
|
||||||
|
// This file provides stub implementations for non-WASM platforms.
|
||||||
|
// It allows Go plugins to compile and run tests outside of WASM,
|
||||||
|
// but the actual functionality is only available in WASM builds.
|
||||||
|
//
|
||||||
|
//go:build !wasip1
|
||||||
|
|
||||||
|
package taskworker
|
||||||
|
|
||||||
|
// TaskExecuteRequest is the request provided when a task is ready to execute.
|
||||||
|
type TaskExecuteRequest struct {
|
||||||
|
// QueueName is the name of the queue this task belongs to.
|
||||||
|
QueueName string `json:"queueName"`
|
||||||
|
// TaskID is the unique identifier for this task.
|
||||||
|
TaskID string `json:"taskId"`
|
||||||
|
// Payload is the opaque data provided when the task was enqueued.
|
||||||
|
Payload []byte `json:"payload"`
|
||||||
|
// Attempt is the current attempt number (1-based: first attempt = 1).
|
||||||
|
Attempt int32 `json:"attempt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskWorker is the marker interface for taskworker plugins.
|
||||||
|
// Implement one or more of the provider interfaces below.
|
||||||
|
// TaskWorker provides task execution handling.
|
||||||
|
// This capability allows plugins to receive callbacks when their queued tasks
|
||||||
|
// are ready to execute. Plugins that use the taskqueue host service must
|
||||||
|
// implement this capability.
|
||||||
|
type TaskWorker interface{}
|
||||||
|
|
||||||
|
// TaskExecuteProvider provides the OnTaskExecute function.
|
||||||
|
type TaskExecuteProvider interface {
|
||||||
|
OnTaskExecute(TaskExecuteRequest) (string, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotImplementedCode is the standard return code for unimplemented functions.
|
||||||
|
const NotImplementedCode int32 = -2
|
||||||
|
|
||||||
|
// Register is a no-op on non-WASM platforms.
|
||||||
|
// This stub allows code to compile outside of WASM.
|
||||||
|
func Register(_ TaskWorker) {}
|
||||||
@@ -26,14 +26,20 @@ def _kvstore_set(offset: int) -> int:
|
|||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
|
@extism.import_fn("extism:host/user", "kvstore_setwithttl")
|
||||||
|
def _kvstore_setwithttl(offset: int) -> int:
|
||||||
|
"""Raw host function - do not call directly."""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
@extism.import_fn("extism:host/user", "kvstore_get")
|
@extism.import_fn("extism:host/user", "kvstore_get")
|
||||||
def _kvstore_get(offset: int) -> int:
|
def _kvstore_get(offset: int) -> int:
|
||||||
"""Raw host function - do not call directly."""
|
"""Raw host function - do not call directly."""
|
||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
@extism.import_fn("extism:host/user", "kvstore_delete")
|
@extism.import_fn("extism:host/user", "kvstore_getmany")
|
||||||
def _kvstore_delete(offset: int) -> int:
|
def _kvstore_getmany(offset: int) -> int:
|
||||||
"""Raw host function - do not call directly."""
|
"""Raw host function - do not call directly."""
|
||||||
...
|
...
|
||||||
|
|
||||||
@@ -50,6 +56,18 @@ def _kvstore_list(offset: int) -> int:
|
|||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
|
@extism.import_fn("extism:host/user", "kvstore_delete")
|
||||||
|
def _kvstore_delete(offset: int) -> int:
|
||||||
|
"""Raw host function - do not call directly."""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
@extism.import_fn("extism:host/user", "kvstore_deletebyprefix")
|
||||||
|
def _kvstore_deletebyprefix(offset: int) -> int:
|
||||||
|
"""Raw host function - do not call directly."""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
@extism.import_fn("extism:host/user", "kvstore_getstorageused")
|
@extism.import_fn("extism:host/user", "kvstore_getstorageused")
|
||||||
def _kvstore_getstorageused(offset: int) -> int:
|
def _kvstore_getstorageused(offset: int) -> int:
|
||||||
"""Raw host function - do not call directly."""
|
"""Raw host function - do not call directly."""
|
||||||
@@ -94,6 +112,43 @@ Returns an error if the storage limit would be exceeded or the operation fails.
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def kvstore_set_with_ttl(key: str, value: bytes, ttl_seconds: int) -> None:
|
||||||
|
"""SetWithTTL stores a byte value with the given key and a time-to-live.
|
||||||
|
|
||||||
|
After ttlSeconds, the key is treated as non-existent and will be
|
||||||
|
cleaned up lazily. ttlSeconds must be greater than 0.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
- key: The storage key (max 256 bytes, UTF-8)
|
||||||
|
- value: The byte slice to store
|
||||||
|
- ttlSeconds: Time-to-live in seconds (must be > 0)
|
||||||
|
|
||||||
|
Returns an error if the storage limit would be exceeded or the operation fails.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: str parameter.
|
||||||
|
value: bytes parameter.
|
||||||
|
ttl_seconds: int parameter.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HostFunctionError: If the host function returns an error.
|
||||||
|
"""
|
||||||
|
request = {
|
||||||
|
"key": key,
|
||||||
|
"value": base64.b64encode(value).decode("ascii"),
|
||||||
|
"ttlSeconds": ttl_seconds,
|
||||||
|
}
|
||||||
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
|
response_offset = _kvstore_setwithttl(request_mem.offset)
|
||||||
|
response_mem = extism.memory.find(response_offset)
|
||||||
|
response = json.loads(extism.memory.string(response_mem))
|
||||||
|
|
||||||
|
if response.get("error"):
|
||||||
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def kvstore_get(key: str) -> KVStoreGetResult:
|
def kvstore_get(key: str) -> KVStoreGetResult:
|
||||||
"""Get retrieves a byte value from storage.
|
"""Get retrieves a byte value from storage.
|
||||||
|
|
||||||
@@ -129,32 +184,37 @@ Returns the value and whether the key exists.
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def kvstore_delete(key: str) -> None:
|
def kvstore_get_many(keys: Any) -> Any:
|
||||||
"""Delete removes a value from storage.
|
"""GetMany retrieves multiple values in a single call.
|
||||||
|
|
||||||
Parameters:
|
Parameters:
|
||||||
- key: The storage key
|
- keys: The storage keys to retrieve
|
||||||
|
|
||||||
Returns an error if the operation fails. Does not return an error if the key doesn't exist.
|
Returns a map of key to value for keys that exist and have not expired.
|
||||||
|
Missing or expired keys are omitted from the result.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
key: str parameter.
|
keys: Any parameter.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Any: The result value.
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
HostFunctionError: If the host function returns an error.
|
HostFunctionError: If the host function returns an error.
|
||||||
"""
|
"""
|
||||||
request = {
|
request = {
|
||||||
"key": key,
|
"keys": keys,
|
||||||
}
|
}
|
||||||
request_bytes = json.dumps(request).encode("utf-8")
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
request_mem = extism.memory.alloc(request_bytes)
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
response_offset = _kvstore_delete(request_mem.offset)
|
response_offset = _kvstore_getmany(request_mem.offset)
|
||||||
response_mem = extism.memory.find(response_offset)
|
response_mem = extism.memory.find(response_offset)
|
||||||
response = json.loads(extism.memory.string(response_mem))
|
response = json.loads(extism.memory.string(response_mem))
|
||||||
|
|
||||||
if response.get("error"):
|
if response.get("error"):
|
||||||
raise HostFunctionError(response["error"])
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
|
return response.get("values", None)
|
||||||
|
|
||||||
|
|
||||||
def kvstore_has(key: str) -> bool:
|
def kvstore_has(key: str) -> bool:
|
||||||
@@ -221,6 +281,66 @@ Returns a slice of matching keys.
|
|||||||
return response.get("keys", None)
|
return response.get("keys", None)
|
||||||
|
|
||||||
|
|
||||||
|
def kvstore_delete(key: str) -> None:
|
||||||
|
"""Delete removes a value from storage.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
- key: The storage key
|
||||||
|
|
||||||
|
Returns an error if the operation fails. Does not return an error if the key doesn't exist.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: str parameter.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HostFunctionError: If the host function returns an error.
|
||||||
|
"""
|
||||||
|
request = {
|
||||||
|
"key": key,
|
||||||
|
}
|
||||||
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
|
response_offset = _kvstore_delete(request_mem.offset)
|
||||||
|
response_mem = extism.memory.find(response_offset)
|
||||||
|
response = json.loads(extism.memory.string(response_mem))
|
||||||
|
|
||||||
|
if response.get("error"):
|
||||||
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def kvstore_delete_by_prefix(prefix: str) -> int:
|
||||||
|
"""DeleteByPrefix removes all keys matching the given prefix.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
- prefix: Key prefix to match (must not be empty)
|
||||||
|
|
||||||
|
Returns the number of keys deleted. Includes expired keys.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
prefix: str parameter.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: The result value.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HostFunctionError: If the host function returns an error.
|
||||||
|
"""
|
||||||
|
request = {
|
||||||
|
"prefix": prefix,
|
||||||
|
}
|
||||||
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
|
response_offset = _kvstore_deletebyprefix(request_mem.offset)
|
||||||
|
response_mem = extism.memory.find(response_offset)
|
||||||
|
response = json.loads(extism.memory.string(response_mem))
|
||||||
|
|
||||||
|
if response.get("error"):
|
||||||
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
|
return response.get("deletedCount", 0)
|
||||||
|
|
||||||
|
|
||||||
def kvstore_get_storage_used() -> int:
|
def kvstore_get_storage_used() -> int:
|
||||||
"""GetStorageUsed returns the total storage used by this plugin in bytes.
|
"""GetStorageUsed returns the total storage used by this plugin in bytes.
|
||||||
|
|
||||||
|
|||||||
187
plugins/pdk/python/host/nd_host_task.py
Normal file
187
plugins/pdk/python/host/nd_host_task.py
Normal file
@@ -0,0 +1,187 @@
|
|||||||
|
# Code generated by ndpgen. DO NOT EDIT.
|
||||||
|
#
|
||||||
|
# This file contains client wrappers for the Task host service.
|
||||||
|
# It is intended for use in Navidrome plugins built with extism-py.
|
||||||
|
#
|
||||||
|
# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly.
|
||||||
|
# The @extism.import_fn decorators are only detected when defined in the plugin's
|
||||||
|
# main __init__.py file. Copy the needed functions from this file into your plugin.
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import extism
|
||||||
|
import json
|
||||||
|
import base64
|
||||||
|
|
||||||
|
|
||||||
|
class HostFunctionError(Exception):
|
||||||
|
"""Raised when a host function returns an error."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@extism.import_fn("extism:host/user", "task_createqueue")
|
||||||
|
def _task_createqueue(offset: int) -> int:
|
||||||
|
"""Raw host function - do not call directly."""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
@extism.import_fn("extism:host/user", "task_enqueue")
|
||||||
|
def _task_enqueue(offset: int) -> int:
|
||||||
|
"""Raw host function - do not call directly."""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
@extism.import_fn("extism:host/user", "task_get")
|
||||||
|
def _task_get(offset: int) -> int:
|
||||||
|
"""Raw host function - do not call directly."""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
@extism.import_fn("extism:host/user", "task_cancel")
|
||||||
|
def _task_cancel(offset: int) -> int:
|
||||||
|
"""Raw host function - do not call directly."""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
@extism.import_fn("extism:host/user", "task_clearqueue")
|
||||||
|
def _task_clearqueue(offset: int) -> int:
|
||||||
|
"""Raw host function - do not call directly."""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
def task_create_queue(name: str, config: Any) -> None:
|
||||||
|
"""CreateQueue creates a named task queue with the given configuration.
|
||||||
|
Zero-value fields in config use sensible defaults.
|
||||||
|
If a queue with the same name already exists, returns an error.
|
||||||
|
On startup, this also recovers any stale "running" tasks from a previous crash.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: str parameter.
|
||||||
|
config: Any parameter.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HostFunctionError: If the host function returns an error.
|
||||||
|
"""
|
||||||
|
request = {
|
||||||
|
"name": name,
|
||||||
|
"config": config,
|
||||||
|
}
|
||||||
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
|
response_offset = _task_createqueue(request_mem.offset)
|
||||||
|
response_mem = extism.memory.find(response_offset)
|
||||||
|
response = json.loads(extism.memory.string(response_mem))
|
||||||
|
|
||||||
|
if response.get("error"):
|
||||||
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def task_enqueue(queue_name: str, payload: bytes) -> str:
|
||||||
|
"""Enqueue adds a task to the named queue. Returns the task ID.
|
||||||
|
payload is opaque bytes passed back to the plugin on execution.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
queue_name: str parameter.
|
||||||
|
payload: bytes parameter.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The result value.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HostFunctionError: If the host function returns an error.
|
||||||
|
"""
|
||||||
|
request = {
|
||||||
|
"queueName": queue_name,
|
||||||
|
"payload": base64.b64encode(payload).decode("ascii"),
|
||||||
|
}
|
||||||
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
|
response_offset = _task_enqueue(request_mem.offset)
|
||||||
|
response_mem = extism.memory.find(response_offset)
|
||||||
|
response = json.loads(extism.memory.string(response_mem))
|
||||||
|
|
||||||
|
if response.get("error"):
|
||||||
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
|
return response.get("result", "")
|
||||||
|
|
||||||
|
|
||||||
|
def task_get(task_id: str) -> Any:
|
||||||
|
"""Get returns the current state of a task including its status,
|
||||||
|
message, and attempt count.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task_id: str parameter.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Any: The result value.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HostFunctionError: If the host function returns an error.
|
||||||
|
"""
|
||||||
|
request = {
|
||||||
|
"taskId": task_id,
|
||||||
|
}
|
||||||
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
|
response_offset = _task_get(request_mem.offset)
|
||||||
|
response_mem = extism.memory.find(response_offset)
|
||||||
|
response = json.loads(extism.memory.string(response_mem))
|
||||||
|
|
||||||
|
if response.get("error"):
|
||||||
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
|
return response.get("result", None)
|
||||||
|
|
||||||
|
|
||||||
|
def task_cancel(task_id: str) -> None:
|
||||||
|
"""Cancel cancels a pending task. Returns error if already
|
||||||
|
running, completed, or failed.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task_id: str parameter.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HostFunctionError: If the host function returns an error.
|
||||||
|
"""
|
||||||
|
request = {
|
||||||
|
"taskId": task_id,
|
||||||
|
}
|
||||||
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
|
response_offset = _task_cancel(request_mem.offset)
|
||||||
|
response_mem = extism.memory.find(response_offset)
|
||||||
|
response = json.loads(extism.memory.string(response_mem))
|
||||||
|
|
||||||
|
if response.get("error"):
|
||||||
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
|
|
||||||
|
def task_clear_queue(queue_name: str) -> int:
|
||||||
|
"""ClearQueue removes all pending tasks from the named queue.
|
||||||
|
Running tasks are not affected. Returns the number of tasks removed.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
queue_name: str parameter.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: The number of tasks removed.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HostFunctionError: If the host function returns an error.
|
||||||
|
"""
|
||||||
|
request = {
|
||||||
|
"queueName": queue_name,
|
||||||
|
}
|
||||||
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
|
response_offset = _task_clearqueue(request_mem.offset)
|
||||||
|
response_mem = extism.memory.find(response_offset)
|
||||||
|
response = json.loads(extism.memory.string(response_mem))
|
||||||
|
|
||||||
|
if response.get("error"):
|
||||||
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
|
return response.get("result", 0)
|
||||||
@@ -9,4 +9,5 @@ pub mod lifecycle;
|
|||||||
pub mod metadata;
|
pub mod metadata;
|
||||||
pub mod scheduler;
|
pub mod scheduler;
|
||||||
pub mod scrobbler;
|
pub mod scrobbler;
|
||||||
|
pub mod taskworker;
|
||||||
pub mod websocket;
|
pub mod websocket;
|
||||||
|
|||||||
102
plugins/pdk/rust/nd-pdk-capabilities/src/taskworker.rs
Normal file
102
plugins/pdk/rust/nd-pdk-capabilities/src/taskworker.rs
Normal file
@@ -0,0 +1,102 @@
|
|||||||
|
// Code generated by ndpgen. DO NOT EDIT.
|
||||||
|
//
|
||||||
|
// This file contains export wrappers for the TaskWorker capability.
|
||||||
|
// It is intended for use in Navidrome plugins built with extism-pdk.
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
mod base64_bytes {
|
||||||
|
use serde::{self, Deserialize, Deserializer, Serializer};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
serializer.serialize_str(&BASE64.encode(bytes))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let s = String::deserialize(deserializer)?;
|
||||||
|
BASE64.decode(&s).map_err(serde::de::Error::custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper functions for skip_serializing_if with numeric types
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn is_zero_i32(value: &i32) -> bool { *value == 0 }
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn is_zero_u32(value: &u32) -> bool { *value == 0 }
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn is_zero_i64(value: &i64) -> bool { *value == 0 }
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn is_zero_u64(value: &u64) -> bool { *value == 0 }
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn is_zero_f32(value: &f32) -> bool { *value == 0.0 }
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn is_zero_f64(value: &f64) -> bool { *value == 0.0 }
|
||||||
|
/// TaskExecuteRequest is the request provided when a task is ready to execute.
|
||||||
|
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct TaskExecuteRequest {
|
||||||
|
/// QueueName is the name of the queue this task belongs to.
|
||||||
|
#[serde(default)]
|
||||||
|
pub queue_name: String,
|
||||||
|
/// TaskID is the unique identifier for this task.
|
||||||
|
#[serde(default)]
|
||||||
|
pub task_id: String,
|
||||||
|
/// Payload is the opaque data provided when the task was enqueued.
|
||||||
|
#[serde(default)]
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
|
pub payload: Vec<u8>,
|
||||||
|
/// Attempt is the current attempt number (1-based: first attempt = 1).
|
||||||
|
#[serde(default)]
|
||||||
|
pub attempt: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Error represents an error from a capability method.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Error {
|
||||||
|
pub message: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for Error {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "{}", self.message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for Error {}
|
||||||
|
|
||||||
|
impl Error {
|
||||||
|
pub fn new(message: impl Into<String>) -> Self {
|
||||||
|
Self { message: message.into() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// TaskExecuteProvider provides the OnTaskExecute function.
|
||||||
|
pub trait TaskExecuteProvider {
|
||||||
|
fn on_task_execute(&self, req: TaskExecuteRequest) -> Result<String, Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register the on_task_execute export.
|
||||||
|
/// This macro generates the WASM export function for this method.
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! register_taskworker_task_execute {
|
||||||
|
($plugin_type:ty) => {
|
||||||
|
#[extism_pdk::plugin_fn]
|
||||||
|
pub fn nd_task_execute(
|
||||||
|
req: extism_pdk::Json<$crate::taskworker::TaskExecuteRequest>
|
||||||
|
) -> extism_pdk::FnResult<extism_pdk::Json<String>> {
|
||||||
|
let plugin = <$plugin_type>::default();
|
||||||
|
let result = $crate::taskworker::TaskExecuteProvider::on_task_execute(&plugin, req.into_inner())?;
|
||||||
|
Ok(extism_pdk::Json(result))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -40,6 +40,7 @@
|
|||||||
//! - [`library`] - provides access to music library metadata for plugins.
|
//! - [`library`] - provides access to music library metadata for plugins.
|
||||||
//! - [`scheduler`] - provides task scheduling capabilities for plugins.
|
//! - [`scheduler`] - provides task scheduling capabilities for plugins.
|
||||||
//! - [`subsonicapi`] - provides access to Navidrome's Subsonic API from plugins.
|
//! - [`subsonicapi`] - provides access to Navidrome's Subsonic API from plugins.
|
||||||
|
//! - [`task`] - provides persistent task queues for plugins.
|
||||||
//! - [`users`] - provides access to user information for plugins.
|
//! - [`users`] - provides access to user information for plugins.
|
||||||
//! - [`websocket`] - provides WebSocket communication capabilities for plugins.
|
//! - [`websocket`] - provides WebSocket communication capabilities for plugins.
|
||||||
|
|
||||||
@@ -99,6 +100,13 @@ pub mod subsonicapi {
|
|||||||
pub use super::nd_host_subsonicapi::*;
|
pub use super::nd_host_subsonicapi::*;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
|
mod nd_host_task;
|
||||||
|
/// provides persistent task queues for plugins.
|
||||||
|
pub mod task {
|
||||||
|
pub use super::nd_host_task::*;
|
||||||
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
mod nd_host_users;
|
mod nd_host_users;
|
||||||
/// provides access to user information for plugins.
|
/// provides access to user information for plugins.
|
||||||
|
|||||||
@@ -44,6 +44,22 @@ struct KVStoreSetResponse {
|
|||||||
error: Option<String>,
|
error: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct KVStoreSetWithTTLRequest {
|
||||||
|
key: String,
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
|
value: Vec<u8>,
|
||||||
|
ttl_seconds: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct KVStoreSetWithTTLResponse {
|
||||||
|
#[serde(default)]
|
||||||
|
error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct KVStoreGetRequest {
|
struct KVStoreGetRequest {
|
||||||
@@ -64,13 +80,15 @@ struct KVStoreGetResponse {
|
|||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct KVStoreDeleteRequest {
|
struct KVStoreGetManyRequest {
|
||||||
key: String,
|
keys: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct KVStoreDeleteResponse {
|
struct KVStoreGetManyResponse {
|
||||||
|
#[serde(default)]
|
||||||
|
values: std::collections::HashMap<String, Vec<u8>>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
error: Option<String>,
|
error: Option<String>,
|
||||||
}
|
}
|
||||||
@@ -105,6 +123,34 @@ struct KVStoreListResponse {
|
|||||||
error: Option<String>,
|
error: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct KVStoreDeleteRequest {
|
||||||
|
key: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct KVStoreDeleteResponse {
|
||||||
|
#[serde(default)]
|
||||||
|
error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct KVStoreDeleteByPrefixRequest {
|
||||||
|
prefix: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct KVStoreDeleteByPrefixResponse {
|
||||||
|
#[serde(default)]
|
||||||
|
deleted_count: i64,
|
||||||
|
#[serde(default)]
|
||||||
|
error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct KVStoreGetStorageUsedResponse {
|
struct KVStoreGetStorageUsedResponse {
|
||||||
@@ -117,10 +163,13 @@ struct KVStoreGetStorageUsedResponse {
|
|||||||
#[host_fn]
|
#[host_fn]
|
||||||
extern "ExtismHost" {
|
extern "ExtismHost" {
|
||||||
fn kvstore_set(input: Json<KVStoreSetRequest>) -> Json<KVStoreSetResponse>;
|
fn kvstore_set(input: Json<KVStoreSetRequest>) -> Json<KVStoreSetResponse>;
|
||||||
|
fn kvstore_setwithttl(input: Json<KVStoreSetWithTTLRequest>) -> Json<KVStoreSetWithTTLResponse>;
|
||||||
fn kvstore_get(input: Json<KVStoreGetRequest>) -> Json<KVStoreGetResponse>;
|
fn kvstore_get(input: Json<KVStoreGetRequest>) -> Json<KVStoreGetResponse>;
|
||||||
fn kvstore_delete(input: Json<KVStoreDeleteRequest>) -> Json<KVStoreDeleteResponse>;
|
fn kvstore_getmany(input: Json<KVStoreGetManyRequest>) -> Json<KVStoreGetManyResponse>;
|
||||||
fn kvstore_has(input: Json<KVStoreHasRequest>) -> Json<KVStoreHasResponse>;
|
fn kvstore_has(input: Json<KVStoreHasRequest>) -> Json<KVStoreHasResponse>;
|
||||||
fn kvstore_list(input: Json<KVStoreListRequest>) -> Json<KVStoreListResponse>;
|
fn kvstore_list(input: Json<KVStoreListRequest>) -> Json<KVStoreListResponse>;
|
||||||
|
fn kvstore_delete(input: Json<KVStoreDeleteRequest>) -> Json<KVStoreDeleteResponse>;
|
||||||
|
fn kvstore_deletebyprefix(input: Json<KVStoreDeleteByPrefixRequest>) -> Json<KVStoreDeleteByPrefixResponse>;
|
||||||
fn kvstore_getstorageused(input: Json<serde_json::Value>) -> Json<KVStoreGetStorageUsedResponse>;
|
fn kvstore_getstorageused(input: Json<serde_json::Value>) -> Json<KVStoreGetStorageUsedResponse>;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -153,6 +202,41 @@ pub fn set(key: &str, value: Vec<u8>) -> Result<(), Error> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// SetWithTTL stores a byte value with the given key and a time-to-live.
|
||||||
|
///
|
||||||
|
/// After ttlSeconds, the key is treated as non-existent and will be
|
||||||
|
/// cleaned up lazily. ttlSeconds must be greater than 0.
|
||||||
|
///
|
||||||
|
/// Parameters:
|
||||||
|
/// - key: The storage key (max 256 bytes, UTF-8)
|
||||||
|
/// - value: The byte slice to store
|
||||||
|
/// - ttlSeconds: Time-to-live in seconds (must be > 0)
|
||||||
|
///
|
||||||
|
/// Returns an error if the storage limit would be exceeded or the operation fails.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `key` - String parameter.
|
||||||
|
/// * `value` - Vec<u8> parameter.
|
||||||
|
/// * `ttl_seconds` - i64 parameter.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns an error if the host function call fails.
|
||||||
|
pub fn set_with_ttl(key: &str, value: Vec<u8>, ttl_seconds: i64) -> Result<(), Error> {
|
||||||
|
let response = unsafe {
|
||||||
|
kvstore_setwithttl(Json(KVStoreSetWithTTLRequest {
|
||||||
|
key: key.to_owned(),
|
||||||
|
value: value,
|
||||||
|
ttl_seconds: ttl_seconds,
|
||||||
|
}))?
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(err) = response.0.error {
|
||||||
|
return Err(Error::msg(err));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Get retrieves a byte value from storage.
|
/// Get retrieves a byte value from storage.
|
||||||
///
|
///
|
||||||
/// Parameters:
|
/// Parameters:
|
||||||
@@ -186,22 +270,26 @@ pub fn get(key: &str) -> Result<Option<Vec<u8>>, Error> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Delete removes a value from storage.
|
/// GetMany retrieves multiple values in a single call.
|
||||||
///
|
///
|
||||||
/// Parameters:
|
/// Parameters:
|
||||||
/// - key: The storage key
|
/// - keys: The storage keys to retrieve
|
||||||
///
|
///
|
||||||
/// Returns an error if the operation fails. Does not return an error if the key doesn't exist.
|
/// Returns a map of key to value for keys that exist and have not expired.
|
||||||
|
/// Missing or expired keys are omitted from the result.
|
||||||
///
|
///
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
/// * `key` - String parameter.
|
/// * `keys` - Vec<String> parameter.
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// The values value.
|
||||||
///
|
///
|
||||||
/// # Errors
|
/// # Errors
|
||||||
/// Returns an error if the host function call fails.
|
/// Returns an error if the host function call fails.
|
||||||
pub fn delete(key: &str) -> Result<(), Error> {
|
pub fn get_many(keys: Vec<String>) -> Result<std::collections::HashMap<String, Vec<u8>>, Error> {
|
||||||
let response = unsafe {
|
let response = unsafe {
|
||||||
kvstore_delete(Json(KVStoreDeleteRequest {
|
kvstore_getmany(Json(KVStoreGetManyRequest {
|
||||||
key: key.to_owned(),
|
keys: keys,
|
||||||
}))?
|
}))?
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -209,7 +297,7 @@ pub fn delete(key: &str) -> Result<(), Error> {
|
|||||||
return Err(Error::msg(err));
|
return Err(Error::msg(err));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(response.0.values)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Has checks if a key exists in storage.
|
/// Has checks if a key exists in storage.
|
||||||
@@ -270,6 +358,61 @@ pub fn list(prefix: &str) -> Result<Vec<String>, Error> {
|
|||||||
Ok(response.0.keys)
|
Ok(response.0.keys)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete removes a value from storage.
|
||||||
|
///
|
||||||
|
/// Parameters:
|
||||||
|
/// - key: The storage key
|
||||||
|
///
|
||||||
|
/// Returns an error if the operation fails. Does not return an error if the key doesn't exist.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `key` - String parameter.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns an error if the host function call fails.
|
||||||
|
pub fn delete(key: &str) -> Result<(), Error> {
|
||||||
|
let response = unsafe {
|
||||||
|
kvstore_delete(Json(KVStoreDeleteRequest {
|
||||||
|
key: key.to_owned(),
|
||||||
|
}))?
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(err) = response.0.error {
|
||||||
|
return Err(Error::msg(err));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// DeleteByPrefix removes all keys matching the given prefix.
|
||||||
|
///
|
||||||
|
/// Parameters:
|
||||||
|
/// - prefix: Key prefix to match (must not be empty)
|
||||||
|
///
|
||||||
|
/// Returns the number of keys deleted. Includes expired keys.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `prefix` - String parameter.
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// The deleted_count value.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns an error if the host function call fails.
|
||||||
|
pub fn delete_by_prefix(prefix: &str) -> Result<i64, Error> {
|
||||||
|
let response = unsafe {
|
||||||
|
kvstore_deletebyprefix(Json(KVStoreDeleteByPrefixRequest {
|
||||||
|
prefix: prefix.to_owned(),
|
||||||
|
}))?
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(err) = response.0.error {
|
||||||
|
return Err(Error::msg(err));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(response.0.deleted_count)
|
||||||
|
}
|
||||||
|
|
||||||
/// GetStorageUsed returns the total storage used by this plugin in bytes.
|
/// GetStorageUsed returns the total storage used by this plugin in bytes.
|
||||||
///
|
///
|
||||||
/// # Returns
|
/// # Returns
|
||||||
|
|||||||
258
plugins/pdk/rust/nd-pdk-host/src/nd_host_task.rs
Normal file
258
plugins/pdk/rust/nd-pdk-host/src/nd_host_task.rs
Normal file
@@ -0,0 +1,258 @@
|
|||||||
|
// Code generated by ndpgen. DO NOT EDIT.
|
||||||
|
//
|
||||||
|
// This file contains client wrappers for the Task host service.
|
||||||
|
// It is intended for use in Navidrome plugins built with extism-pdk.
|
||||||
|
|
||||||
|
use extism_pdk::*;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
mod base64_bytes {
|
||||||
|
use serde::{self, Deserialize, Deserializer, Serializer};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
serializer.serialize_str(&BASE64.encode(bytes))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let s = String::deserialize(deserializer)?;
|
||||||
|
BASE64.decode(&s).map_err(serde::de::Error::custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// QueueConfig holds configuration for a task queue.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct QueueConfig {
|
||||||
|
pub concurrency: i32,
|
||||||
|
pub max_retries: i32,
|
||||||
|
pub backoff_ms: i64,
|
||||||
|
pub delay_ms: i64,
|
||||||
|
pub retention_ms: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// TaskInfo holds the current state of a task.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct TaskInfo {
|
||||||
|
pub status: String,
|
||||||
|
pub message: String,
|
||||||
|
pub attempt: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskCreateQueueRequest {
|
||||||
|
name: String,
|
||||||
|
config: QueueConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskCreateQueueResponse {
|
||||||
|
#[serde(default)]
|
||||||
|
error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskEnqueueRequest {
|
||||||
|
queue_name: String,
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
|
payload: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskEnqueueResponse {
|
||||||
|
#[serde(default)]
|
||||||
|
result: String,
|
||||||
|
#[serde(default)]
|
||||||
|
error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskGetRequest {
|
||||||
|
task_id: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskGetResponse {
|
||||||
|
#[serde(default)]
|
||||||
|
result: Option<TaskInfo>,
|
||||||
|
#[serde(default)]
|
||||||
|
error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskCancelRequest {
|
||||||
|
task_id: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskCancelResponse {
|
||||||
|
#[serde(default)]
|
||||||
|
error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskClearQueueRequest {
|
||||||
|
queue_name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskClearQueueResponse {
|
||||||
|
#[serde(default)]
|
||||||
|
result: i64,
|
||||||
|
#[serde(default)]
|
||||||
|
error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[host_fn]
|
||||||
|
extern "ExtismHost" {
|
||||||
|
fn task_createqueue(input: Json<TaskCreateQueueRequest>) -> Json<TaskCreateQueueResponse>;
|
||||||
|
fn task_enqueue(input: Json<TaskEnqueueRequest>) -> Json<TaskEnqueueResponse>;
|
||||||
|
fn task_get(input: Json<TaskGetRequest>) -> Json<TaskGetResponse>;
|
||||||
|
fn task_cancel(input: Json<TaskCancelRequest>) -> Json<TaskCancelResponse>;
|
||||||
|
fn task_clearqueue(input: Json<TaskClearQueueRequest>) -> Json<TaskClearQueueResponse>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// CreateQueue creates a named task queue with the given configuration.
|
||||||
|
/// Zero-value fields in config use sensible defaults.
|
||||||
|
/// If a queue with the same name already exists, returns an error.
|
||||||
|
/// On startup, this also recovers any stale "running" tasks from a previous crash.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `name` - String parameter.
|
||||||
|
/// * `config` - QueueConfig parameter.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns an error if the host function call fails.
|
||||||
|
pub fn create_queue(name: &str, config: QueueConfig) -> Result<(), Error> {
|
||||||
|
let response = unsafe {
|
||||||
|
task_createqueue(Json(TaskCreateQueueRequest {
|
||||||
|
name: name.to_owned(),
|
||||||
|
config: config,
|
||||||
|
}))?
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(err) = response.0.error {
|
||||||
|
return Err(Error::msg(err));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Enqueue adds a task to the named queue. Returns the task ID.
|
||||||
|
/// payload is opaque bytes passed back to the plugin on execution.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `queue_name` - String parameter.
|
||||||
|
/// * `payload` - Vec<u8> parameter.
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// The result value.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns an error if the host function call fails.
|
||||||
|
pub fn enqueue(queue_name: &str, payload: Vec<u8>) -> Result<String, Error> {
|
||||||
|
let response = unsafe {
|
||||||
|
task_enqueue(Json(TaskEnqueueRequest {
|
||||||
|
queue_name: queue_name.to_owned(),
|
||||||
|
payload: payload,
|
||||||
|
}))?
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(err) = response.0.error {
|
||||||
|
return Err(Error::msg(err));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(response.0.result)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get returns the current state of a task including its status,
|
||||||
|
/// message, and attempt count.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `task_id` - String parameter.
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// The result value.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns an error if the host function call fails.
|
||||||
|
pub fn get(task_id: &str) -> Result<Option<TaskInfo>, Error> {
|
||||||
|
let response = unsafe {
|
||||||
|
task_get(Json(TaskGetRequest {
|
||||||
|
task_id: task_id.to_owned(),
|
||||||
|
}))?
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(err) = response.0.error {
|
||||||
|
return Err(Error::msg(err));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(response.0.result)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cancel cancels a pending task. Returns error if already
|
||||||
|
/// running, completed, or failed.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `task_id` - String parameter.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns an error if the host function call fails.
|
||||||
|
pub fn cancel(task_id: &str) -> Result<(), Error> {
|
||||||
|
let response = unsafe {
|
||||||
|
task_cancel(Json(TaskCancelRequest {
|
||||||
|
task_id: task_id.to_owned(),
|
||||||
|
}))?
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(err) = response.0.error {
|
||||||
|
return Err(Error::msg(err));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// ClearQueue removes all pending tasks from the named queue.
|
||||||
|
/// Running tasks are not affected. Returns the number of tasks removed.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `queue_name` - String parameter.
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// The number of tasks removed.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns an error if the host function call fails.
|
||||||
|
pub fn clear_queue(queue_name: &str) -> Result<i64, Error> {
|
||||||
|
let response = unsafe {
|
||||||
|
task_clearqueue(Json(TaskClearQueueRequest {
|
||||||
|
queue_name: queue_name.to_owned(),
|
||||||
|
}))?
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(err) = response.0.error {
|
||||||
|
return Err(Error::msg(err));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(response.0.result)
|
||||||
|
}
|
||||||
52
plugins/testdata/test-kvstore/main.go
vendored
52
plugins/testdata/test-kvstore/main.go
vendored
@@ -9,19 +9,23 @@ import (
|
|||||||
|
|
||||||
// TestKVStoreInput is the input for nd_test_kvstore callback.
|
// TestKVStoreInput is the input for nd_test_kvstore callback.
|
||||||
type TestKVStoreInput struct {
|
type TestKVStoreInput struct {
|
||||||
Operation string `json:"operation"` // "set", "get", "delete", "has", "list", "get_storage_used"
|
Operation string `json:"operation"` // "set", "get", "delete", "has", "list", "get_storage_used", "set_with_ttl", "delete_by_prefix", "get_many"
|
||||||
Key string `json:"key"` // Storage key
|
Key string `json:"key"` // Storage key
|
||||||
Value []byte `json:"value"` // For set operations
|
Value []byte `json:"value"` // For set operations
|
||||||
Prefix string `json:"prefix"` // For list operation
|
Prefix string `json:"prefix"` // For list/delete_by_prefix operations
|
||||||
|
TTLSeconds int64 `json:"ttl_seconds,omitempty"` // For set_with_ttl
|
||||||
|
Keys []string `json:"keys,omitempty"` // For get_many
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestKVStoreOutput is the output from nd_test_kvstore callback.
|
// TestKVStoreOutput is the output from nd_test_kvstore callback.
|
||||||
type TestKVStoreOutput struct {
|
type TestKVStoreOutput struct {
|
||||||
Value []byte `json:"value,omitempty"`
|
Value []byte `json:"value,omitempty"`
|
||||||
Exists bool `json:"exists,omitempty"`
|
Values map[string][]byte `json:"values,omitempty"`
|
||||||
Keys []string `json:"keys,omitempty"`
|
Exists bool `json:"exists,omitempty"`
|
||||||
StorageUsed int64 `json:"storage_used,omitempty"`
|
Keys []string `json:"keys,omitempty"`
|
||||||
Error *string `json:"error,omitempty"`
|
StorageUsed int64 `json:"storage_used,omitempty"`
|
||||||
|
DeletedCount int64 `json:"deleted_count,omitempty"`
|
||||||
|
Error *string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// nd_test_kvstore is the test callback that tests the kvstore host functions.
|
// nd_test_kvstore is the test callback that tests the kvstore host functions.
|
||||||
@@ -96,6 +100,36 @@ func ndTestKVStore() int32 {
|
|||||||
pdk.OutputJSON(TestKVStoreOutput{StorageUsed: bytesUsed})
|
pdk.OutputJSON(TestKVStoreOutput{StorageUsed: bytesUsed})
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
case "set_with_ttl":
|
||||||
|
err := host.KVStoreSetWithTTL(input.Key, input.Value, input.TTLSeconds)
|
||||||
|
if err != nil {
|
||||||
|
errStr := err.Error()
|
||||||
|
pdk.OutputJSON(TestKVStoreOutput{Error: &errStr})
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
pdk.OutputJSON(TestKVStoreOutput{})
|
||||||
|
return 0
|
||||||
|
|
||||||
|
case "delete_by_prefix":
|
||||||
|
deletedCount, err := host.KVStoreDeleteByPrefix(input.Prefix)
|
||||||
|
if err != nil {
|
||||||
|
errStr := err.Error()
|
||||||
|
pdk.OutputJSON(TestKVStoreOutput{Error: &errStr})
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
pdk.OutputJSON(TestKVStoreOutput{DeletedCount: deletedCount})
|
||||||
|
return 0
|
||||||
|
|
||||||
|
case "get_many":
|
||||||
|
values, err := host.KVStoreGetMany(input.Keys)
|
||||||
|
if err != nil {
|
||||||
|
errStr := err.Error()
|
||||||
|
pdk.OutputJSON(TestKVStoreOutput{Error: &errStr})
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
pdk.OutputJSON(TestKVStoreOutput{Values: values})
|
||||||
|
return 0
|
||||||
|
|
||||||
default:
|
default:
|
||||||
errStr := "unknown operation: " + input.Operation
|
errStr := "unknown operation: " + input.Operation
|
||||||
pdk.OutputJSON(TestKVStoreOutput{Error: &errStr})
|
pdk.OutputJSON(TestKVStoreOutput{Error: &errStr})
|
||||||
|
|||||||
16
plugins/testdata/test-taskqueue/go.mod
vendored
Normal file
16
plugins/testdata/test-taskqueue/go.mod
vendored
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
module test-taskqueue
|
||||||
|
|
||||||
|
go 1.25
|
||||||
|
|
||||||
|
require github.com/navidrome/navidrome/plugins/pdk/go v0.0.0
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
|
github.com/extism/go-pdk v1.1.3 // indirect
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
|
github.com/stretchr/objx v0.5.2 // indirect
|
||||||
|
github.com/stretchr/testify v1.11.1 // indirect
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
|
)
|
||||||
|
|
||||||
|
replace github.com/navidrome/navidrome/plugins/pdk/go => ../../pdk/go
|
||||||
14
plugins/testdata/test-taskqueue/go.sum
vendored
Normal file
14
plugins/testdata/test-taskqueue/go.sum
vendored
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/extism/go-pdk v1.1.3 h1:hfViMPWrqjN6u67cIYRALZTZLk/enSPpNKa+rZ9X2SQ=
|
||||||
|
github.com/extism/go-pdk v1.1.3/go.mod h1:Gz+LIU/YCKnKXhgge8yo5Yu1F/lbv7KtKFkiCSzW/P4=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
|
||||||
|
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
||||||
|
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||||
|
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
114
plugins/testdata/test-taskqueue/main.go
vendored
Normal file
114
plugins/testdata/test-taskqueue/main.go
vendored
Normal file
@@ -0,0 +1,114 @@
|
|||||||
|
// Test TaskQueue plugin for Navidrome plugin system integration tests.
|
||||||
|
// Build with: tinygo build -o ../test-taskqueue.wasm -target wasip1 -buildmode=c-shared .
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/navidrome/navidrome/plugins/pdk/go/host"
|
||||||
|
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
||||||
|
"github.com/navidrome/navidrome/plugins/pdk/go/taskworker"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
taskworker.Register(&handler{})
|
||||||
|
}
|
||||||
|
|
||||||
|
type handler struct{}
|
||||||
|
|
||||||
|
func (h *handler) OnTaskExecute(req taskworker.TaskExecuteRequest) (string, error) {
|
||||||
|
payload := string(req.Payload)
|
||||||
|
if payload == "fail" {
|
||||||
|
return "", fmt.Errorf("task failed as instructed")
|
||||||
|
}
|
||||||
|
if payload == "fail-then-succeed" && req.Attempt < 2 {
|
||||||
|
return "", fmt.Errorf("transient failure")
|
||||||
|
}
|
||||||
|
return "completed successfully", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test helper types
|
||||||
|
type TestInput struct {
|
||||||
|
Operation string `json:"operation"`
|
||||||
|
QueueName string `json:"queueName,omitempty"`
|
||||||
|
Config *host.QueueConfig `json:"config,omitempty"`
|
||||||
|
Payload []byte `json:"payload,omitempty"`
|
||||||
|
TaskID string `json:"taskId,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type TestOutput struct {
|
||||||
|
TaskID string `json:"taskId,omitempty"`
|
||||||
|
Status string `json:"status,omitempty"`
|
||||||
|
Message string `json:"message,omitempty"`
|
||||||
|
Attempt int32 `json:"attempt,omitempty"`
|
||||||
|
Cleared int64 `json:"cleared,omitempty"`
|
||||||
|
Error *string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
//go:wasmexport nd_test_taskqueue
|
||||||
|
func ndTestTaskQueue() int32 {
|
||||||
|
var input TestInput
|
||||||
|
if err := pdk.InputJSON(&input); err != nil {
|
||||||
|
errStr := err.Error()
|
||||||
|
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
switch input.Operation {
|
||||||
|
case "create_queue":
|
||||||
|
config := host.QueueConfig{}
|
||||||
|
if input.Config != nil {
|
||||||
|
config = *input.Config
|
||||||
|
}
|
||||||
|
err := host.TaskCreateQueue(input.QueueName, config)
|
||||||
|
if err != nil {
|
||||||
|
errStr := err.Error()
|
||||||
|
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
pdk.OutputJSON(TestOutput{})
|
||||||
|
|
||||||
|
case "enqueue":
|
||||||
|
taskID, err := host.TaskEnqueue(input.QueueName, input.Payload)
|
||||||
|
if err != nil {
|
||||||
|
errStr := err.Error()
|
||||||
|
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
pdk.OutputJSON(TestOutput{TaskID: taskID})
|
||||||
|
|
||||||
|
case "get_task_status":
|
||||||
|
info, err := host.TaskGet(input.TaskID)
|
||||||
|
if err != nil {
|
||||||
|
errStr := err.Error()
|
||||||
|
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
pdk.OutputJSON(TestOutput{Status: info.Status, Message: info.Message, Attempt: info.Attempt})
|
||||||
|
|
||||||
|
case "cancel_task":
|
||||||
|
err := host.TaskCancel(input.TaskID)
|
||||||
|
if err != nil {
|
||||||
|
errStr := err.Error()
|
||||||
|
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
pdk.OutputJSON(TestOutput{})
|
||||||
|
|
||||||
|
case "clear_queue":
|
||||||
|
cleared, err := host.TaskClearQueue(input.QueueName)
|
||||||
|
if err != nil {
|
||||||
|
errStr := err.Error()
|
||||||
|
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
pdk.OutputJSON(TestOutput{Cleared: cleared})
|
||||||
|
|
||||||
|
default:
|
||||||
|
errStr := "unknown operation: " + input.Operation
|
||||||
|
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {}
|
||||||
12
plugins/testdata/test-taskqueue/manifest.json
vendored
Normal file
12
plugins/testdata/test-taskqueue/manifest.json
vendored
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
{
|
||||||
|
"name": "Test TaskQueue Plugin",
|
||||||
|
"author": "Navidrome Test",
|
||||||
|
"version": "1.0.0",
|
||||||
|
"description": "A test plugin for TaskQueue integration testing",
|
||||||
|
"permissions": {
|
||||||
|
"taskqueue": {
|
||||||
|
"reason": "For testing task queue operations",
|
||||||
|
"maxConcurrency": 10
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user