diff --git a/plugins/capabilities/taskworker.go b/plugins/capabilities/taskworker.go index 8fc2dc9fa..c53d50174 100644 --- a/plugins/capabilities/taskworker.go +++ b/plugins/capabilities/taskworker.go @@ -8,9 +8,10 @@ package capabilities //nd:capability name=taskworker type TaskWorker interface { // OnTaskExecute is called when a queued task is ready to run. + // The returned string is a status/result message stored in the tasks table. // Return an error to trigger retry (if retries are configured). //nd:export name=nd_task_execute - OnTaskExecute(TaskExecuteRequest) (TaskExecuteResponse, error) + OnTaskExecute(TaskExecuteRequest) (string, error) } // TaskExecuteRequest is the request provided when a task is ready to execute. @@ -24,10 +25,3 @@ type TaskExecuteRequest struct { // Attempt is the current attempt number (1-based: first attempt = 1). Attempt int32 `json:"attempt"` } - -// TaskExecuteResponse is the response from task execution. -type TaskExecuteResponse struct { - // Error, if non-empty, indicates the task failed. The task will be retried - // if retries are configured and attempts remain. - Error string `json:"error,omitempty"` -} diff --git a/plugins/capabilities/taskworker.yaml b/plugins/capabilities/taskworker.yaml index 760ca420b..f10fd0794 100644 --- a/plugins/capabilities/taskworker.yaml +++ b/plugins/capabilities/taskworker.yaml @@ -3,12 +3,13 @@ exports: nd_task_execute: description: |- OnTaskExecute is called when a queued task is ready to run. + The returned string is a status/result message stored in the tasks table. Return an error to trigger retry (if retries are configured). input: $ref: '#/components/schemas/TaskExecuteRequest' contentType: application/json output: - $ref: '#/components/schemas/TaskExecuteResponse' + type: string contentType: application/json components: schemas: @@ -35,11 +36,3 @@ components: - taskId - payload - attempt - TaskExecuteResponse: - description: TaskExecuteResponse is the response from task execution. - properties: - error: - type: string - description: |- - Error, if non-empty, indicates the task failed. The task will be retried - if retries are configured and attempts remain. diff --git a/plugins/host/taskqueue.go b/plugins/host/task.go similarity index 71% rename from plugins/host/taskqueue.go rename to plugins/host/task.go index 63df8713b..2ec8da20c 100644 --- a/plugins/host/taskqueue.go +++ b/plugins/host/task.go @@ -2,6 +2,17 @@ package host import "context" +// TaskInfo holds the current state of a task. +type TaskInfo struct { + // Status is the current task status: "pending", "running", + // "completed", "failed", or "cancelled". + Status string `json:"status"` + // Message is the status/result message returned by the plugin callback. + Message string `json:"message"` + // Attempt is the current or last attempt number (1-based). + Attempt int32 `json:"attempt"` +} + // QueueConfig holds configuration for a task queue. type QueueConfig struct { // Concurrency is the max number of parallel workers. Default: 1. @@ -24,15 +35,15 @@ type QueueConfig struct { RetentionMs int64 `json:"retentionMs"` } -// TaskQueueService provides persistent task queues for plugins. +// TaskService provides persistent task queues for plugins. // // This service allows plugins to create named queues with configurable concurrency, // retry policies, and rate limiting. Tasks are persisted to SQLite and survive // server restarts. When a task is ready to execute, the host calls the plugin's // nd_task_execute callback function. // -//nd:hostservice name=TaskQueue permission=taskqueue -type TaskQueueService interface { +//nd:hostservice name=Task permission=taskqueue +type TaskService interface { // CreateQueue creates a named task queue with the given configuration. // Zero-value fields in config use sensible defaults. // If a queue with the same name already exists, returns an error. @@ -45,13 +56,13 @@ type TaskQueueService interface { //nd:hostfunc Enqueue(ctx context.Context, queueName string, payload []byte) (string, error) - // GetTaskStatus returns the status of a task: "pending", "running", - // "completed", "failed", or "cancelled". + // Get returns the current state of a task including its status, + // message, and attempt count. //nd:hostfunc - GetTaskStatus(ctx context.Context, taskID string) (string, error) + Get(ctx context.Context, taskID string) (*TaskInfo, error) - // CancelTask cancels a pending task. Returns error if already + // Cancel cancels a pending task. Returns error if already // running, completed, or failed. //nd:hostfunc - CancelTask(ctx context.Context, taskID string) error + Cancel(ctx context.Context, taskID string) error } diff --git a/plugins/host/task_gen.go b/plugins/host/task_gen.go new file mode 100644 index 000000000..1b769e827 --- /dev/null +++ b/plugins/host/task_gen.go @@ -0,0 +1,220 @@ +// Code generated by ndpgen. DO NOT EDIT. + +package host + +import ( + "context" + "encoding/json" + + extism "github.com/extism/go-sdk" +) + +// TaskCreateQueueRequest is the request type for Task.CreateQueue. +type TaskCreateQueueRequest struct { + Name string `json:"name"` + Config QueueConfig `json:"config"` +} + +// TaskCreateQueueResponse is the response type for Task.CreateQueue. +type TaskCreateQueueResponse struct { + Error string `json:"error,omitempty"` +} + +// TaskEnqueueRequest is the request type for Task.Enqueue. +type TaskEnqueueRequest struct { + QueueName string `json:"queueName"` + Payload []byte `json:"payload"` +} + +// TaskEnqueueResponse is the response type for Task.Enqueue. +type TaskEnqueueResponse struct { + Result string `json:"result,omitempty"` + Error string `json:"error,omitempty"` +} + +// TaskGetRequest is the request type for Task.Get. +type TaskGetRequest struct { + TaskID string `json:"taskId"` +} + +// TaskGetResponse is the response type for Task.Get. +type TaskGetResponse struct { + Result *TaskInfo `json:"result,omitempty"` + Error string `json:"error,omitempty"` +} + +// TaskCancelRequest is the request type for Task.Cancel. +type TaskCancelRequest struct { + TaskID string `json:"taskId"` +} + +// TaskCancelResponse is the response type for Task.Cancel. +type TaskCancelResponse struct { + Error string `json:"error,omitempty"` +} + +// RegisterTaskHostFunctions registers Task service host functions. +// The returned host functions should be added to the plugin's configuration. +func RegisterTaskHostFunctions(service TaskService) []extism.HostFunction { + return []extism.HostFunction{ + newTaskCreateQueueHostFunction(service), + newTaskEnqueueHostFunction(service), + newTaskGetHostFunction(service), + newTaskCancelHostFunction(service), + } +} + +func newTaskCreateQueueHostFunction(service TaskService) extism.HostFunction { + return extism.NewHostFunctionWithStack( + "task_createqueue", + func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { + // Read JSON request from plugin memory + reqBytes, err := p.ReadBytes(stack[0]) + if err != nil { + taskWriteError(p, stack, err) + return + } + var req TaskCreateQueueRequest + if err := json.Unmarshal(reqBytes, &req); err != nil { + taskWriteError(p, stack, err) + return + } + + // Call the service method + if svcErr := service.CreateQueue(ctx, req.Name, req.Config); svcErr != nil { + taskWriteError(p, stack, svcErr) + return + } + + // Write JSON response to plugin memory + resp := TaskCreateQueueResponse{} + taskWriteResponse(p, stack, resp) + }, + []extism.ValueType{extism.ValueTypePTR}, + []extism.ValueType{extism.ValueTypePTR}, + ) +} + +func newTaskEnqueueHostFunction(service TaskService) extism.HostFunction { + return extism.NewHostFunctionWithStack( + "task_enqueue", + func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { + // Read JSON request from plugin memory + reqBytes, err := p.ReadBytes(stack[0]) + if err != nil { + taskWriteError(p, stack, err) + return + } + var req TaskEnqueueRequest + if err := json.Unmarshal(reqBytes, &req); err != nil { + taskWriteError(p, stack, err) + return + } + + // Call the service method + result, svcErr := service.Enqueue(ctx, req.QueueName, req.Payload) + if svcErr != nil { + taskWriteError(p, stack, svcErr) + return + } + + // Write JSON response to plugin memory + resp := TaskEnqueueResponse{ + Result: result, + } + taskWriteResponse(p, stack, resp) + }, + []extism.ValueType{extism.ValueTypePTR}, + []extism.ValueType{extism.ValueTypePTR}, + ) +} + +func newTaskGetHostFunction(service TaskService) extism.HostFunction { + return extism.NewHostFunctionWithStack( + "task_get", + func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { + // Read JSON request from plugin memory + reqBytes, err := p.ReadBytes(stack[0]) + if err != nil { + taskWriteError(p, stack, err) + return + } + var req TaskGetRequest + if err := json.Unmarshal(reqBytes, &req); err != nil { + taskWriteError(p, stack, err) + return + } + + // Call the service method + result, svcErr := service.Get(ctx, req.TaskID) + if svcErr != nil { + taskWriteError(p, stack, svcErr) + return + } + + // Write JSON response to plugin memory + resp := TaskGetResponse{ + Result: result, + } + taskWriteResponse(p, stack, resp) + }, + []extism.ValueType{extism.ValueTypePTR}, + []extism.ValueType{extism.ValueTypePTR}, + ) +} + +func newTaskCancelHostFunction(service TaskService) extism.HostFunction { + return extism.NewHostFunctionWithStack( + "task_cancel", + func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { + // Read JSON request from plugin memory + reqBytes, err := p.ReadBytes(stack[0]) + if err != nil { + taskWriteError(p, stack, err) + return + } + var req TaskCancelRequest + if err := json.Unmarshal(reqBytes, &req); err != nil { + taskWriteError(p, stack, err) + return + } + + // Call the service method + if svcErr := service.Cancel(ctx, req.TaskID); svcErr != nil { + taskWriteError(p, stack, svcErr) + return + } + + // Write JSON response to plugin memory + resp := TaskCancelResponse{} + taskWriteResponse(p, stack, resp) + }, + []extism.ValueType{extism.ValueTypePTR}, + []extism.ValueType{extism.ValueTypePTR}, + ) +} + +// taskWriteResponse writes a JSON response to plugin memory. +func taskWriteResponse(p *extism.CurrentPlugin, stack []uint64, resp any) { + respBytes, err := json.Marshal(resp) + if err != nil { + taskWriteError(p, stack, err) + return + } + respPtr, err := p.WriteBytes(respBytes) + if err != nil { + stack[0] = 0 + return + } + stack[0] = respPtr +} + +// taskWriteError writes an error response to plugin memory. +func taskWriteError(p *extism.CurrentPlugin, stack []uint64, err error) { + errResp := struct { + Error string `json:"error"` + }{Error: err.Error()} + respBytes, _ := json.Marshal(errResp) + respPtr, _ := p.WriteBytes(respBytes) + stack[0] = respPtr +} diff --git a/plugins/host/taskqueue_gen.go b/plugins/host/taskqueue_gen.go deleted file mode 100644 index 8edf6967c..000000000 --- a/plugins/host/taskqueue_gen.go +++ /dev/null @@ -1,220 +0,0 @@ -// Code generated by ndpgen. DO NOT EDIT. - -package host - -import ( - "context" - "encoding/json" - - extism "github.com/extism/go-sdk" -) - -// TaskQueueCreateQueueRequest is the request type for TaskQueue.CreateQueue. -type TaskQueueCreateQueueRequest struct { - Name string `json:"name"` - Config QueueConfig `json:"config"` -} - -// TaskQueueCreateQueueResponse is the response type for TaskQueue.CreateQueue. -type TaskQueueCreateQueueResponse struct { - Error string `json:"error,omitempty"` -} - -// TaskQueueEnqueueRequest is the request type for TaskQueue.Enqueue. -type TaskQueueEnqueueRequest struct { - QueueName string `json:"queueName"` - Payload []byte `json:"payload"` -} - -// TaskQueueEnqueueResponse is the response type for TaskQueue.Enqueue. -type TaskQueueEnqueueResponse struct { - Result string `json:"result,omitempty"` - Error string `json:"error,omitempty"` -} - -// TaskQueueGetTaskStatusRequest is the request type for TaskQueue.GetTaskStatus. -type TaskQueueGetTaskStatusRequest struct { - TaskID string `json:"taskId"` -} - -// TaskQueueGetTaskStatusResponse is the response type for TaskQueue.GetTaskStatus. -type TaskQueueGetTaskStatusResponse struct { - Result string `json:"result,omitempty"` - Error string `json:"error,omitempty"` -} - -// TaskQueueCancelTaskRequest is the request type for TaskQueue.CancelTask. -type TaskQueueCancelTaskRequest struct { - TaskID string `json:"taskId"` -} - -// TaskQueueCancelTaskResponse is the response type for TaskQueue.CancelTask. -type TaskQueueCancelTaskResponse struct { - Error string `json:"error,omitempty"` -} - -// RegisterTaskQueueHostFunctions registers TaskQueue service host functions. -// The returned host functions should be added to the plugin's configuration. -func RegisterTaskQueueHostFunctions(service TaskQueueService) []extism.HostFunction { - return []extism.HostFunction{ - newTaskQueueCreateQueueHostFunction(service), - newTaskQueueEnqueueHostFunction(service), - newTaskQueueGetTaskStatusHostFunction(service), - newTaskQueueCancelTaskHostFunction(service), - } -} - -func newTaskQueueCreateQueueHostFunction(service TaskQueueService) extism.HostFunction { - return extism.NewHostFunctionWithStack( - "taskqueue_createqueue", - func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { - // Read JSON request from plugin memory - reqBytes, err := p.ReadBytes(stack[0]) - if err != nil { - taskqueueWriteError(p, stack, err) - return - } - var req TaskQueueCreateQueueRequest - if err := json.Unmarshal(reqBytes, &req); err != nil { - taskqueueWriteError(p, stack, err) - return - } - - // Call the service method - if svcErr := service.CreateQueue(ctx, req.Name, req.Config); svcErr != nil { - taskqueueWriteError(p, stack, svcErr) - return - } - - // Write JSON response to plugin memory - resp := TaskQueueCreateQueueResponse{} - taskqueueWriteResponse(p, stack, resp) - }, - []extism.ValueType{extism.ValueTypePTR}, - []extism.ValueType{extism.ValueTypePTR}, - ) -} - -func newTaskQueueEnqueueHostFunction(service TaskQueueService) extism.HostFunction { - return extism.NewHostFunctionWithStack( - "taskqueue_enqueue", - func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { - // Read JSON request from plugin memory - reqBytes, err := p.ReadBytes(stack[0]) - if err != nil { - taskqueueWriteError(p, stack, err) - return - } - var req TaskQueueEnqueueRequest - if err := json.Unmarshal(reqBytes, &req); err != nil { - taskqueueWriteError(p, stack, err) - return - } - - // Call the service method - result, svcErr := service.Enqueue(ctx, req.QueueName, req.Payload) - if svcErr != nil { - taskqueueWriteError(p, stack, svcErr) - return - } - - // Write JSON response to plugin memory - resp := TaskQueueEnqueueResponse{ - Result: result, - } - taskqueueWriteResponse(p, stack, resp) - }, - []extism.ValueType{extism.ValueTypePTR}, - []extism.ValueType{extism.ValueTypePTR}, - ) -} - -func newTaskQueueGetTaskStatusHostFunction(service TaskQueueService) extism.HostFunction { - return extism.NewHostFunctionWithStack( - "taskqueue_gettaskstatus", - func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { - // Read JSON request from plugin memory - reqBytes, err := p.ReadBytes(stack[0]) - if err != nil { - taskqueueWriteError(p, stack, err) - return - } - var req TaskQueueGetTaskStatusRequest - if err := json.Unmarshal(reqBytes, &req); err != nil { - taskqueueWriteError(p, stack, err) - return - } - - // Call the service method - result, svcErr := service.GetTaskStatus(ctx, req.TaskID) - if svcErr != nil { - taskqueueWriteError(p, stack, svcErr) - return - } - - // Write JSON response to plugin memory - resp := TaskQueueGetTaskStatusResponse{ - Result: result, - } - taskqueueWriteResponse(p, stack, resp) - }, - []extism.ValueType{extism.ValueTypePTR}, - []extism.ValueType{extism.ValueTypePTR}, - ) -} - -func newTaskQueueCancelTaskHostFunction(service TaskQueueService) extism.HostFunction { - return extism.NewHostFunctionWithStack( - "taskqueue_canceltask", - func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { - // Read JSON request from plugin memory - reqBytes, err := p.ReadBytes(stack[0]) - if err != nil { - taskqueueWriteError(p, stack, err) - return - } - var req TaskQueueCancelTaskRequest - if err := json.Unmarshal(reqBytes, &req); err != nil { - taskqueueWriteError(p, stack, err) - return - } - - // Call the service method - if svcErr := service.CancelTask(ctx, req.TaskID); svcErr != nil { - taskqueueWriteError(p, stack, svcErr) - return - } - - // Write JSON response to plugin memory - resp := TaskQueueCancelTaskResponse{} - taskqueueWriteResponse(p, stack, resp) - }, - []extism.ValueType{extism.ValueTypePTR}, - []extism.ValueType{extism.ValueTypePTR}, - ) -} - -// taskqueueWriteResponse writes a JSON response to plugin memory. -func taskqueueWriteResponse(p *extism.CurrentPlugin, stack []uint64, resp any) { - respBytes, err := json.Marshal(resp) - if err != nil { - taskqueueWriteError(p, stack, err) - return - } - respPtr, err := p.WriteBytes(respBytes) - if err != nil { - stack[0] = 0 - return - } - stack[0] = respPtr -} - -// taskqueueWriteError writes an error response to plugin memory. -func taskqueueWriteError(p *extism.CurrentPlugin, stack []uint64, err error) { - errResp := struct { - Error string `json:"error"` - }{Error: err.Error()} - respBytes, _ := json.Marshal(errResp) - respPtr, _ := p.WriteBytes(respBytes) - stack[0] = respPtr -} diff --git a/plugins/host_taskqueue.go b/plugins/host_taskqueue.go index f1e422ba0..62d0362b9 100644 --- a/plugins/host_taskqueue.go +++ b/plugins/host_taskqueue.go @@ -77,7 +77,7 @@ type taskQueueServiceImpl struct { queues map[string]*queueState // For testing: override how callbacks are invoked - invokeCallbackFn func(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) error + invokeCallbackFn func(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error) } // newTaskQueueService creates a new taskQueueServiceImpl with its own SQLite database. @@ -140,7 +140,8 @@ func createTaskQueueSchema(db *sql.DB) error { max_retries INTEGER NOT NULL, next_run_at INTEGER NOT NULL, created_at INTEGER NOT NULL, - updated_at INTEGER NOT NULL + updated_at INTEGER NOT NULL, + message TEXT NOT NULL DEFAULT '' ); CREATE INDEX IF NOT EXISTS idx_tasks_dequeue ON tasks(queue_name, status, next_run_at); @@ -289,21 +290,22 @@ func (s *taskQueueServiceImpl) Enqueue(ctx context.Context, queueName string, pa return taskID, nil } -// GetTaskStatus returns the status of a task. -func (s *taskQueueServiceImpl) GetTaskStatus(ctx context.Context, taskID string) (string, error) { - var status string - err := s.db.QueryRowContext(ctx, `SELECT status FROM tasks WHERE id = ?`, taskID).Scan(&status) +// Get returns the current state of a task. +func (s *taskQueueServiceImpl) Get(ctx context.Context, taskID string) (*host.TaskInfo, error) { + var info host.TaskInfo + err := s.db.QueryRowContext(ctx, `SELECT status, message, attempt FROM tasks WHERE id = ?`, taskID). + Scan(&info.Status, &info.Message, &info.Attempt) if errors.Is(err, sql.ErrNoRows) { - return "", fmt.Errorf("task %q not found", taskID) + return nil, fmt.Errorf("task %q not found", taskID) } if err != nil { - return "", fmt.Errorf("getting task status: %w", err) + return nil, fmt.Errorf("getting task info: %w", err) } - return status, nil + return &info, nil } -// CancelTask cancels a pending task. -func (s *taskQueueServiceImpl) CancelTask(ctx context.Context, taskID string) error { +// Cancel cancels a pending task. +func (s *taskQueueServiceImpl) Cancel(ctx context.Context, taskID string) error { now := time.Now().UnixMilli() result, err := s.db.ExecContext(ctx, ` UPDATE tasks SET status = ?, updated_at = ? WHERE id = ? AND status = ? @@ -396,7 +398,7 @@ func (s *taskQueueServiceImpl) processTask(queueName string, qs *queueState) boo // Invoke callback log.Debug(s.ctx, "Executing task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID, "attempt", attempt) - callbackErr := s.invokeCallbackFn(s.ctx, queueName, taskID, payload, attempt) + message, callbackErr := s.invokeCallbackFn(s.ctx, queueName, taskID, payload, attempt) // If context was cancelled (shutdown), revert task to pending for recovery if s.ctx.Err() != nil { @@ -405,28 +407,33 @@ func (s *taskQueueServiceImpl) processTask(queueName string, qs *queueState) boo } if callbackErr == nil { - s.completeTask(queueName, taskID) + s.completeTask(queueName, taskID, message) } else { - s.handleTaskFailure(queueName, taskID, attempt, maxRetries, qs, callbackErr) + s.handleTaskFailure(queueName, taskID, attempt, maxRetries, qs, callbackErr, message) } return true } -func (s *taskQueueServiceImpl) completeTask(queueName, taskID string) { +func (s *taskQueueServiceImpl) completeTask(queueName, taskID, message string) { now := time.Now().UnixMilli() - if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, updated_at = ? WHERE id = ?`, taskStatusCompleted, now, taskID); err != nil { + if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusCompleted, message, now, taskID); err != nil { log.Error(s.ctx, "Failed to mark task as completed", "plugin", s.pluginName, "taskID", taskID, err) } log.Debug(s.ctx, "Task completed", "plugin", s.pluginName, "queue", queueName, "taskID", taskID) } -func (s *taskQueueServiceImpl) handleTaskFailure(queueName, taskID string, attempt, maxRetries int32, qs *queueState, callbackErr error) { +func (s *taskQueueServiceImpl) handleTaskFailure(queueName, taskID string, attempt, maxRetries int32, qs *queueState, callbackErr error, message string) { log.Warn(s.ctx, "Task execution failed", "plugin", s.pluginName, "queue", queueName, "taskID", taskID, "attempt", attempt, "maxRetries", maxRetries, "err", callbackErr) + // Use error message as fallback if no message was provided + if message == "" { + message = callbackErr.Error() + } + now := time.Now().UnixMilli() if attempt > maxRetries { - if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, updated_at = ? WHERE id = ?`, taskStatusFailed, now, taskID); err != nil { + if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusFailed, message, now, taskID); err != nil { log.Error(s.ctx, "Failed to mark task as failed", "plugin", s.pluginName, "taskID", taskID, err) } log.Warn(s.ctx, "Task failed after all retries", "plugin", s.pluginName, "queue", queueName, "taskID", taskID) @@ -462,13 +469,13 @@ func (s *taskQueueServiceImpl) revertTaskToPending(taskID string) { } // defaultInvokeCallback calls the plugin's nd_task_execute function. -func (s *taskQueueServiceImpl) defaultInvokeCallback(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) error { +func (s *taskQueueServiceImpl) defaultInvokeCallback(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error) { s.manager.mu.RLock() p, ok := s.manager.plugins[s.pluginName] s.manager.mu.RUnlock() if !ok { - return fmt.Errorf("plugin %s not loaded", s.pluginName) + return "", fmt.Errorf("plugin %s not loaded", s.pluginName) } input := capabilities.TaskExecuteRequest{ @@ -478,14 +485,11 @@ func (s *taskQueueServiceImpl) defaultInvokeCallback(ctx context.Context, queueN Attempt: attempt, } - result, err := callPluginFunction[capabilities.TaskExecuteRequest, capabilities.TaskExecuteResponse](ctx, p, FuncTaskWorkerCallback, input) + message, err := callPluginFunction[capabilities.TaskExecuteRequest, string](ctx, p, FuncTaskWorkerCallback, input) if err != nil { - return err + return "", err } - if result.Error != "" { - return fmt.Errorf("%s", result.Error) - } - return nil + return message, nil } // cleanupLoop periodically removes terminal tasks past their retention period. @@ -558,5 +562,5 @@ func (s *taskQueueServiceImpl) Close() error { } // Compile-time verification -var _ host.TaskQueueService = (*taskQueueServiceImpl)(nil) +var _ host.TaskService = (*taskQueueServiceImpl)(nil) var _ io.Closer = (*taskQueueServiceImpl)(nil) diff --git a/plugins/host_taskqueue_test.go b/plugins/host_taskqueue_test.go index a66c25085..bcb581cef 100644 --- a/plugins/host_taskqueue_test.go +++ b/plugins/host_taskqueue_test.go @@ -106,8 +106,8 @@ var _ = Describe("TaskQueueService", func() { }) It("accepts queue name at maximum length", func() { - service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error { - return nil + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + return "", nil } exactName := strings.Repeat("a", maxQueueNameLength) err := service.CreateQueue(ctx, exactName, host.QueueConfig{}) @@ -131,8 +131,8 @@ var _ = Describe("TaskQueueService", func() { Describe("CreateQueue defaults with negative values", func() { It("applies default RetentionMs for negative value", func() { - service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error { - return nil + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + return "", nil } err := service.CreateQueue(ctx, "neg-retention", host.QueueConfig{ RetentionMs: -500, @@ -203,8 +203,8 @@ var _ = Describe("TaskQueueService", func() { Describe("Enqueue", func() { BeforeEach(func() { // Use a no-op callback to prevent actual execution attempts - service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error { - return nil + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + return "", nil } err := service.CreateQueue(ctx, "enqueue-test", host.QueueConfig{}) Expect(err).ToNot(HaveOccurred()) @@ -240,9 +240,9 @@ var _ = Describe("TaskQueueService", func() { Describe("GetTaskStatus", func() { BeforeEach(func() { // Use a callback that blocks until context is cancelled so tasks stay pending - service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) error { + service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) (string, error) { <-ctx.Done() - return ctx.Err() + return "", ctx.Err() } }) @@ -255,13 +255,14 @@ var _ = Describe("TaskQueueService", func() { // The task may get picked up quickly; check initial status // Since the callback blocks, it should be either pending or running - status, err := service.GetTaskStatus(ctx, taskID) + info, err := service.Get(ctx, taskID) Expect(err).ToNot(HaveOccurred()) - Expect(status).To(BeElementOf("pending", "running")) + Expect(info).ToNot(BeNil()) + Expect(info.Status).To(BeElementOf("pending", "running")) }) It("returns error for unknown task ID", func() { - _, err := service.GetTaskStatus(ctx, "nonexistent-id") + _, err := service.Get(ctx, "nonexistent-id") Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("not found")) }) @@ -270,19 +271,19 @@ var _ = Describe("TaskQueueService", func() { Describe("CancelTask", func() { BeforeEach(func() { // Block callback so tasks stay in pending/running - service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) error { + service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) (string, error) { <-ctx.Done() - return ctx.Err() + return "", ctx.Err() } }) It("cancels a pending task", func() { // Block the callback so the first task occupies the worker started := make(chan struct{}) - service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) error { + service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) (string, error) { close(started) <-ctx.Done() - return ctx.Err() + return "", ctx.Err() } err := service.CreateQueue(ctx, "cancel-test", host.QueueConfig{ @@ -301,24 +302,24 @@ var _ = Describe("TaskQueueService", func() { taskID, err := service.Enqueue(ctx, "cancel-test", []byte("cancel-me")) Expect(err).ToNot(HaveOccurred()) - err = service.CancelTask(ctx, taskID) + err = service.Cancel(ctx, taskID) Expect(err).ToNot(HaveOccurred()) - status, err := service.GetTaskStatus(ctx, taskID) + info, err := service.Get(ctx, taskID) Expect(err).ToNot(HaveOccurred()) - Expect(status).To(Equal("cancelled")) + Expect(info.Status).To(Equal("cancelled")) }) It("returns error for unknown task ID", func() { - err := service.CancelTask(ctx, "nonexistent-id") + err := service.Cancel(ctx, "nonexistent-id") Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("not found")) }) It("returns error for non-pending task", func() { // Create a queue where tasks complete immediately - service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error { - return nil + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + return "", nil } err := service.CreateQueue(ctx, "completed-test", host.QueueConfig{}) Expect(err).ToNot(HaveOccurred()) @@ -328,12 +329,15 @@ var _ = Describe("TaskQueueService", func() { // Wait for task to complete Eventually(func() string { - status, _ := service.GetTaskStatus(ctx, taskID) - return status + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed")) // Try to cancel completed task - err = service.CancelTask(ctx, taskID) + err = service.Cancel(ctx, taskID) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("cannot be cancelled")) }) @@ -346,13 +350,13 @@ var _ = Describe("TaskQueueService", func() { var receivedPayload []byte var receivedAttempt int32 - service.invokeCallbackFn = func(_ context.Context, queueName, taskID string, payload []byte, attempt int32) error { + service.invokeCallbackFn = func(_ context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error) { callCount.Add(1) receivedQueueName = queueName receivedTaskID = taskID receivedPayload = payload receivedAttempt = attempt - return nil + return "", nil } err := service.CreateQueue(ctx, "worker-test", host.QueueConfig{}) @@ -362,8 +366,11 @@ var _ = Describe("TaskQueueService", func() { Expect(err).ToNot(HaveOccurred()) Eventually(func() string { - status, _ := service.GetTaskStatus(ctx, taskID) - return status + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed")) Expect(callCount.Load()).To(Equal(int32(1))) @@ -374,13 +381,92 @@ var _ = Describe("TaskQueueService", func() { }) }) + Describe("Message storage", func() { + It("stores message on successful completion", func() { + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + return "task completed successfully", nil + } + + err := service.CreateQueue(ctx, "msg-success", host.QueueConfig{}) + Expect(err).ToNot(HaveOccurred()) + + taskID, err := service.Enqueue(ctx, "msg-success", []byte("data")) + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() string { + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status + }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed")) + + info, err := service.Get(ctx, taskID) + Expect(err).ToNot(HaveOccurred()) + Expect(info.Message).To(Equal("task completed successfully")) + Expect(info.Attempt).To(Equal(int32(1))) + }) + + It("stores error message on failure", func() { + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + return "", fmt.Errorf("something went wrong") + } + + err := service.CreateQueue(ctx, "msg-fail", host.QueueConfig{ + MaxRetries: 0, + }) + Expect(err).ToNot(HaveOccurred()) + + taskID, err := service.Enqueue(ctx, "msg-fail", []byte("data")) + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() string { + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status + }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("failed")) + + info, err := service.Get(ctx, taskID) + Expect(err).ToNot(HaveOccurred()) + Expect(info.Message).To(Equal("something went wrong")) + }) + + It("uses explicit message over error message on failure", func() { + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + return "partial progress made", fmt.Errorf("timeout exceeded") + } + + err := service.CreateQueue(ctx, "msg-fail-with-msg", host.QueueConfig{ + MaxRetries: 0, + }) + Expect(err).ToNot(HaveOccurred()) + + taskID, err := service.Enqueue(ctx, "msg-fail-with-msg", []byte("data")) + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() string { + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status + }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("failed")) + + info, err := service.Get(ctx, taskID) + Expect(err).ToNot(HaveOccurred()) + Expect(info.Message).To(Equal("partial progress made")) + }) + }) + Describe("Retry on failure", func() { It("retries and eventually fails after exhausting retries", func() { var callCount atomic.Int32 - service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error { + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { callCount.Add(1) - return fmt.Errorf("task failed") + return "", fmt.Errorf("task failed") } err := service.CreateQueue(ctx, "retry-test", host.QueueConfig{ @@ -393,8 +479,11 @@ var _ = Describe("TaskQueueService", func() { Expect(err).ToNot(HaveOccurred()) Eventually(func() string { - status, _ := service.GetTaskStatus(ctx, taskID) - return status + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status }).WithTimeout(10 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("failed")) // 1 initial attempt + 2 retries = 3 total calls @@ -406,12 +495,12 @@ var _ = Describe("TaskQueueService", func() { It("retries and succeeds on second attempt", func() { var callCount atomic.Int32 - service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, attempt int32) error { + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, attempt int32) (string, error) { callCount.Add(1) if attempt == 1 { - return fmt.Errorf("temporary error") + return "", fmt.Errorf("temporary error") } - return nil + return "success", nil } err := service.CreateQueue(ctx, "retry-succeed", host.QueueConfig{ @@ -424,8 +513,11 @@ var _ = Describe("TaskQueueService", func() { Expect(err).ToNot(HaveOccurred()) Eventually(func() string { - status, _ := service.GetTaskStatus(ctx, taskID) - return status + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status }).WithTimeout(10 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed")) Expect(callCount.Load()).To(Equal(int32(2))) @@ -436,9 +528,9 @@ var _ = Describe("TaskQueueService", func() { It("caps backoff at maxRetentionMs to prevent overflow", func() { var callCount atomic.Int32 - service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error { + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { callCount.Add(1) - return fmt.Errorf("always fail") + return "", fmt.Errorf("always fail") } err := service.CreateQueue(ctx, "backoff-overflow", host.QueueConfig{ @@ -471,11 +563,11 @@ var _ = Describe("TaskQueueService", func() { var mu sync.Mutex var dispatchTimes []time.Time - service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error { + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { mu.Lock() dispatchTimes = append(dispatchTimes, time.Now()) mu.Unlock() - return nil + return "", nil } err := service.CreateQueue(ctx, "delay-concurrent", host.QueueConfig{ @@ -518,9 +610,9 @@ var _ = Describe("TaskQueueService", func() { Describe("Shutdown recovery", func() { It("resets stale running tasks on CreateQueue", func() { // Create a first service and queue, enqueue a task - service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) error { + service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) (string, error) { <-ctx.Done() - return ctx.Err() + return "", ctx.Err() } err := service.CreateQueue(ctx, "recovery-queue", host.QueueConfig{}) Expect(err).ToNot(HaveOccurred()) @@ -530,8 +622,11 @@ var _ = Describe("TaskQueueService", func() { // Wait for the task to start running Eventually(func() string { - status, _ := service.GetTaskStatus(ctx, taskID) - return status + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("running")) // Close the service (simulates crash - tasks left in running state) @@ -549,8 +644,8 @@ var _ = Describe("TaskQueueService", func() { Expect(err).ToNot(HaveOccurred()) // Override callback to succeed - service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error { - return nil + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { + return "", nil } // Re-create the queue - the upsert handles the existing row from the old service @@ -559,8 +654,11 @@ var _ = Describe("TaskQueueService", func() { // The stale running task should now be reset to pending and eventually completed Eventually(func() string { - status, _ := service.GetTaskStatus(ctx, taskID) - return status + info, err := service.Get(ctx, taskID) + if err != nil || info == nil { + return "" + } + return info.Status }).WithTimeout(10 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed")) }) }) @@ -598,8 +696,8 @@ var _ = Describe("TaskQueueService", func() { Expect(err).ToNot(HaveOccurred()) // Both services should be able to create queues with the same name independently - service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error { return nil } - service2.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error { return nil } + service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { return "", nil } + service2.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) (string, error) { return "", nil } err = service.CreateQueue(ctx, "shared-name", host.QueueConfig{}) Expect(err).ToNot(HaveOccurred()) @@ -616,13 +714,19 @@ var _ = Describe("TaskQueueService", func() { // Both should complete Eventually(func() string { - status, _ := service.GetTaskStatus(ctx, taskID1) - return status + info, err := service.Get(ctx, taskID1) + if err != nil || info == nil { + return "" + } + return info.Status }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed")) Eventually(func() string { - status, _ := service2.GetTaskStatus(ctx, taskID2) - return status + info, err := service2.Get(ctx, taskID2) + if err != nil || info == nil { + return "" + } + return info.Status }).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed")) }) }) @@ -702,9 +806,11 @@ var _ = Describe("TaskQueueService Integration", Ordered, func() { } type testTaskQueueOutput struct { - TaskID string `json:"taskId,omitempty"` - Status string `json:"status,omitempty"` - Error *string `json:"error,omitempty"` + TaskID string `json:"taskId,omitempty"` + Status string `json:"status,omitempty"` + Message string `json:"message,omitempty"` + Attempt int32 `json:"attempt,omitempty"` + Error *string `json:"error,omitempty"` } callTestTaskQueue := func(ctx context.Context, input testTaskQueueInput) (*testTaskQueueOutput, error) { diff --git a/plugins/manager_loader.go b/plugins/manager_loader.go index 516533896..2afcc25e2 100644 --- a/plugins/manager_loader.go +++ b/plugins/manager_loader.go @@ -129,7 +129,7 @@ var hostServices = []hostServiceEntry{ }, }, { - name: "TaskQueue", + name: "Task", hasPermission: func(p *Permissions) bool { return p != nil && p.Taskqueue != nil }, create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) { perm := ctx.permissions.Taskqueue @@ -139,10 +139,10 @@ var hostServices = []hostServiceEntry{ } service, err := newTaskQueueService(ctx.pluginName, ctx.manager, maxConcurrency) if err != nil { - log.Error("Failed to create TaskQueue service", "plugin", ctx.pluginName, err) + log.Error("Failed to create Task service", "plugin", ctx.pluginName, err) return nil, nil } - return host.RegisterTaskQueueHostFunctions(service), service + return host.RegisterTaskHostFunctions(service), service }, }, } diff --git a/plugins/manifest.go b/plugins/manifest.go index 830829c28..375e73e7f 100644 --- a/plugins/manifest.go +++ b/plugins/manifest.go @@ -72,7 +72,7 @@ func ValidateWithCapabilities(m *Manifest, capabilities []Capability) error { } } - // TaskQueue permission requires TaskWorker capability + // Task (taskqueue) permission requires TaskWorker capability if m.Permissions != nil && m.Permissions.Taskqueue != nil { if !hasCapability(capabilities, CapabilityTaskWorker) { return fmt.Errorf("'taskqueue' permission requires plugin to export '%s' function", FuncTaskWorkerCallback) diff --git a/plugins/pdk/go/host/doc.go b/plugins/pdk/go/host/doc.go index 0571006d4..5781a04c1 100644 --- a/plugins/pdk/go/host/doc.go +++ b/plugins/pdk/go/host/doc.go @@ -43,7 +43,7 @@ The following host services are available: - Library: provides access to music library metadata for plugins. - Scheduler: provides task scheduling capabilities for plugins. - SubsonicAPI: provides access to Navidrome's Subsonic API from plugins. - - TaskQueue: provides persistent task queues for plugins. + - Task: provides persistent task queues for plugins. - Users: provides access to user information for plugins. - WebSocket: provides WebSocket communication capabilities for plugins. diff --git a/plugins/pdk/go/host/nd_host_taskqueue.go b/plugins/pdk/go/host/nd_host_task.go similarity index 61% rename from plugins/pdk/go/host/nd_host_taskqueue.go rename to plugins/pdk/go/host/nd_host_task.go index b21fdac42..bdd03d690 100644 --- a/plugins/pdk/go/host/nd_host_taskqueue.go +++ b/plugins/pdk/go/host/nd_host_task.go @@ -1,6 +1,6 @@ // Code generated by ndpgen. DO NOT EDIT. // -// This file contains client wrappers for the TaskQueue host service. +// This file contains client wrappers for the Task host service. // It is intended for use in Navidrome plugins built with TinyGo. // //go:build wasip1 @@ -24,62 +24,70 @@ type QueueConfig struct { RetentionMs int64 `json:"retentionMs"` } -// taskqueue_createqueue is the host function provided by Navidrome. -// -//go:wasmimport extism:host/user taskqueue_createqueue -func taskqueue_createqueue(uint64) uint64 +// TaskInfo represents the TaskInfo data structure. +// TaskInfo holds the current state of a task. +type TaskInfo struct { + Status string `json:"status"` + Message string `json:"message"` + Attempt int32 `json:"attempt"` +} -// taskqueue_enqueue is the host function provided by Navidrome. +// task_createqueue is the host function provided by Navidrome. // -//go:wasmimport extism:host/user taskqueue_enqueue -func taskqueue_enqueue(uint64) uint64 +//go:wasmimport extism:host/user task_createqueue +func task_createqueue(uint64) uint64 -// taskqueue_gettaskstatus is the host function provided by Navidrome. +// task_enqueue is the host function provided by Navidrome. // -//go:wasmimport extism:host/user taskqueue_gettaskstatus -func taskqueue_gettaskstatus(uint64) uint64 +//go:wasmimport extism:host/user task_enqueue +func task_enqueue(uint64) uint64 -// taskqueue_canceltask is the host function provided by Navidrome. +// task_get is the host function provided by Navidrome. // -//go:wasmimport extism:host/user taskqueue_canceltask -func taskqueue_canceltask(uint64) uint64 +//go:wasmimport extism:host/user task_get +func task_get(uint64) uint64 -type taskQueueCreateQueueRequest struct { +// task_cancel is the host function provided by Navidrome. +// +//go:wasmimport extism:host/user task_cancel +func task_cancel(uint64) uint64 + +type taskCreateQueueRequest struct { Name string `json:"name"` Config QueueConfig `json:"config"` } -type taskQueueEnqueueRequest struct { +type taskEnqueueRequest struct { QueueName string `json:"queueName"` Payload []byte `json:"payload"` } -type taskQueueEnqueueResponse struct { +type taskEnqueueResponse struct { Result string `json:"result,omitempty"` Error string `json:"error,omitempty"` } -type taskQueueGetTaskStatusRequest struct { +type taskGetRequest struct { TaskID string `json:"taskId"` } -type taskQueueGetTaskStatusResponse struct { - Result string `json:"result,omitempty"` - Error string `json:"error,omitempty"` +type taskGetResponse struct { + Result *TaskInfo `json:"result,omitempty"` + Error string `json:"error,omitempty"` } -type taskQueueCancelTaskRequest struct { +type taskCancelRequest struct { TaskID string `json:"taskId"` } -// TaskQueueCreateQueue calls the taskqueue_createqueue host function. +// TaskCreateQueue calls the task_createqueue host function. // CreateQueue creates a named task queue with the given configuration. // Zero-value fields in config use sensible defaults. // If a queue with the same name already exists, returns an error. // On startup, this also recovers any stale "running" tasks from a previous crash. -func TaskQueueCreateQueue(name string, config QueueConfig) error { +func TaskCreateQueue(name string, config QueueConfig) error { // Marshal request to JSON - req := taskQueueCreateQueueRequest{ + req := taskCreateQueueRequest{ Name: name, Config: config, } @@ -91,7 +99,7 @@ func TaskQueueCreateQueue(name string, config QueueConfig) error { defer reqMem.Free() // Call the host function - responsePtr := taskqueue_createqueue(reqMem.Offset()) + responsePtr := task_createqueue(reqMem.Offset()) // Read the response from memory responseMem := pdk.FindMemory(responsePtr) @@ -110,12 +118,12 @@ func TaskQueueCreateQueue(name string, config QueueConfig) error { return nil } -// TaskQueueEnqueue calls the taskqueue_enqueue host function. +// TaskEnqueue calls the task_enqueue host function. // Enqueue adds a task to the named queue. Returns the task ID. // payload is opaque bytes passed back to the plugin on execution. -func TaskQueueEnqueue(queueName string, payload []byte) (string, error) { +func TaskEnqueue(queueName string, payload []byte) (string, error) { // Marshal request to JSON - req := taskQueueEnqueueRequest{ + req := taskEnqueueRequest{ QueueName: queueName, Payload: payload, } @@ -127,14 +135,14 @@ func TaskQueueEnqueue(queueName string, payload []byte) (string, error) { defer reqMem.Free() // Call the host function - responsePtr := taskqueue_enqueue(reqMem.Offset()) + responsePtr := task_enqueue(reqMem.Offset()) // Read the response from memory responseMem := pdk.FindMemory(responsePtr) responseBytes := responseMem.ReadBytes() // Parse the response - var response taskQueueEnqueueResponse + var response taskEnqueueResponse if err := json.Unmarshal(responseBytes, &response); err != nil { return "", err } @@ -147,48 +155,48 @@ func TaskQueueEnqueue(queueName string, payload []byte) (string, error) { return response.Result, nil } -// TaskQueueGetTaskStatus calls the taskqueue_gettaskstatus host function. -// GetTaskStatus returns the status of a task: "pending", "running", -// "completed", "failed", or "cancelled". -func TaskQueueGetTaskStatus(taskID string) (string, error) { +// TaskGet calls the task_get host function. +// Get returns the current state of a task including its status, +// message, and attempt count. +func TaskGet(taskID string) (*TaskInfo, error) { // Marshal request to JSON - req := taskQueueGetTaskStatusRequest{ + req := taskGetRequest{ TaskID: taskID, } reqBytes, err := json.Marshal(req) if err != nil { - return "", err + return nil, err } reqMem := pdk.AllocateBytes(reqBytes) defer reqMem.Free() // Call the host function - responsePtr := taskqueue_gettaskstatus(reqMem.Offset()) + responsePtr := task_get(reqMem.Offset()) // Read the response from memory responseMem := pdk.FindMemory(responsePtr) responseBytes := responseMem.ReadBytes() // Parse the response - var response taskQueueGetTaskStatusResponse + var response taskGetResponse if err := json.Unmarshal(responseBytes, &response); err != nil { - return "", err + return nil, err } // Convert Error field to Go error if response.Error != "" { - return "", errors.New(response.Error) + return nil, errors.New(response.Error) } return response.Result, nil } -// TaskQueueCancelTask calls the taskqueue_canceltask host function. -// CancelTask cancels a pending task. Returns error if already +// TaskCancel calls the task_cancel host function. +// Cancel cancels a pending task. Returns error if already // running, completed, or failed. -func TaskQueueCancelTask(taskID string) error { +func TaskCancel(taskID string) error { // Marshal request to JSON - req := taskQueueCancelTaskRequest{ + req := taskCancelRequest{ TaskID: taskID, } reqBytes, err := json.Marshal(req) @@ -199,7 +207,7 @@ func TaskQueueCancelTask(taskID string) error { defer reqMem.Free() // Call the host function - responsePtr := taskqueue_canceltask(reqMem.Offset()) + responsePtr := task_cancel(reqMem.Offset()) // Read the response from memory responseMem := pdk.FindMemory(responsePtr) diff --git a/plugins/pdk/go/host/nd_host_task_stub.go b/plugins/pdk/go/host/nd_host_task_stub.go new file mode 100644 index 000000000..cd53de6b1 --- /dev/null +++ b/plugins/pdk/go/host/nd_host_task_stub.go @@ -0,0 +1,92 @@ +// Code generated by ndpgen. DO NOT EDIT. +// +// This file contains mock implementations for non-WASM builds. +// These mocks allow IDE support, compilation, and unit testing on non-WASM platforms. +// Plugin authors can use the exported mock instances to set expectations in tests. +// +//go:build !wasip1 + +package host + +import "github.com/stretchr/testify/mock" + +// QueueConfig represents the QueueConfig data structure. +// QueueConfig holds configuration for a task queue. +type QueueConfig struct { + Concurrency int32 `json:"concurrency"` + MaxRetries int32 `json:"maxRetries"` + BackoffMs int64 `json:"backoffMs"` + DelayMs int64 `json:"delayMs"` + RetentionMs int64 `json:"retentionMs"` +} + +// TaskInfo represents the TaskInfo data structure. +// TaskInfo holds the current state of a task. +type TaskInfo struct { + Status string `json:"status"` + Message string `json:"message"` + Attempt int32 `json:"attempt"` +} + +// mockTaskService is the mock implementation for testing. +type mockTaskService struct { + mock.Mock +} + +// TaskMock is the auto-instantiated mock instance for testing. +// Use this to set expectations: host.TaskMock.On("MethodName", args...).Return(values...) +var TaskMock = &mockTaskService{} + +// CreateQueue is the mock method for TaskCreateQueue. +func (m *mockTaskService) CreateQueue(name string, config QueueConfig) error { + args := m.Called(name, config) + return args.Error(0) +} + +// TaskCreateQueue delegates to the mock instance. +// CreateQueue creates a named task queue with the given configuration. +// Zero-value fields in config use sensible defaults. +// If a queue with the same name already exists, returns an error. +// On startup, this also recovers any stale "running" tasks from a previous crash. +func TaskCreateQueue(name string, config QueueConfig) error { + return TaskMock.CreateQueue(name, config) +} + +// Enqueue is the mock method for TaskEnqueue. +func (m *mockTaskService) Enqueue(queueName string, payload []byte) (string, error) { + args := m.Called(queueName, payload) + return args.String(0), args.Error(1) +} + +// TaskEnqueue delegates to the mock instance. +// Enqueue adds a task to the named queue. Returns the task ID. +// payload is opaque bytes passed back to the plugin on execution. +func TaskEnqueue(queueName string, payload []byte) (string, error) { + return TaskMock.Enqueue(queueName, payload) +} + +// Get is the mock method for TaskGet. +func (m *mockTaskService) Get(taskID string) (*TaskInfo, error) { + args := m.Called(taskID) + return args.Get(0).(*TaskInfo), args.Error(1) +} + +// TaskGet delegates to the mock instance. +// Get returns the current state of a task including its status, +// message, and attempt count. +func TaskGet(taskID string) (*TaskInfo, error) { + return TaskMock.Get(taskID) +} + +// Cancel is the mock method for TaskCancel. +func (m *mockTaskService) Cancel(taskID string) error { + args := m.Called(taskID) + return args.Error(0) +} + +// TaskCancel delegates to the mock instance. +// Cancel cancels a pending task. Returns error if already +// running, completed, or failed. +func TaskCancel(taskID string) error { + return TaskMock.Cancel(taskID) +} diff --git a/plugins/pdk/go/host/nd_host_taskqueue_stub.go b/plugins/pdk/go/host/nd_host_taskqueue_stub.go deleted file mode 100644 index 4ebc36085..000000000 --- a/plugins/pdk/go/host/nd_host_taskqueue_stub.go +++ /dev/null @@ -1,84 +0,0 @@ -// Code generated by ndpgen. DO NOT EDIT. -// -// This file contains mock implementations for non-WASM builds. -// These mocks allow IDE support, compilation, and unit testing on non-WASM platforms. -// Plugin authors can use the exported mock instances to set expectations in tests. -// -//go:build !wasip1 - -package host - -import "github.com/stretchr/testify/mock" - -// QueueConfig represents the QueueConfig data structure. -// QueueConfig holds configuration for a task queue. -type QueueConfig struct { - Concurrency int32 `json:"concurrency"` - MaxRetries int32 `json:"maxRetries"` - BackoffMs int64 `json:"backoffMs"` - DelayMs int64 `json:"delayMs"` - RetentionMs int64 `json:"retentionMs"` -} - -// mockTaskQueueService is the mock implementation for testing. -type mockTaskQueueService struct { - mock.Mock -} - -// TaskQueueMock is the auto-instantiated mock instance for testing. -// Use this to set expectations: host.TaskQueueMock.On("MethodName", args...).Return(values...) -var TaskQueueMock = &mockTaskQueueService{} - -// CreateQueue is the mock method for TaskQueueCreateQueue. -func (m *mockTaskQueueService) CreateQueue(name string, config QueueConfig) error { - args := m.Called(name, config) - return args.Error(0) -} - -// TaskQueueCreateQueue delegates to the mock instance. -// CreateQueue creates a named task queue with the given configuration. -// Zero-value fields in config use sensible defaults. -// If a queue with the same name already exists, returns an error. -// On startup, this also recovers any stale "running" tasks from a previous crash. -func TaskQueueCreateQueue(name string, config QueueConfig) error { - return TaskQueueMock.CreateQueue(name, config) -} - -// Enqueue is the mock method for TaskQueueEnqueue. -func (m *mockTaskQueueService) Enqueue(queueName string, payload []byte) (string, error) { - args := m.Called(queueName, payload) - return args.String(0), args.Error(1) -} - -// TaskQueueEnqueue delegates to the mock instance. -// Enqueue adds a task to the named queue. Returns the task ID. -// payload is opaque bytes passed back to the plugin on execution. -func TaskQueueEnqueue(queueName string, payload []byte) (string, error) { - return TaskQueueMock.Enqueue(queueName, payload) -} - -// GetTaskStatus is the mock method for TaskQueueGetTaskStatus. -func (m *mockTaskQueueService) GetTaskStatus(taskID string) (string, error) { - args := m.Called(taskID) - return args.String(0), args.Error(1) -} - -// TaskQueueGetTaskStatus delegates to the mock instance. -// GetTaskStatus returns the status of a task: "pending", "running", -// "completed", "failed", or "cancelled". -func TaskQueueGetTaskStatus(taskID string) (string, error) { - return TaskQueueMock.GetTaskStatus(taskID) -} - -// CancelTask is the mock method for TaskQueueCancelTask. -func (m *mockTaskQueueService) CancelTask(taskID string) error { - args := m.Called(taskID) - return args.Error(0) -} - -// TaskQueueCancelTask delegates to the mock instance. -// CancelTask cancels a pending task. Returns error if already -// running, completed, or failed. -func TaskQueueCancelTask(taskID string) error { - return TaskQueueMock.CancelTask(taskID) -} diff --git a/plugins/pdk/go/taskworker/taskworker.go b/plugins/pdk/go/taskworker/taskworker.go index da7f76b62..5d09a3209 100644 --- a/plugins/pdk/go/taskworker/taskworker.go +++ b/plugins/pdk/go/taskworker/taskworker.go @@ -23,13 +23,6 @@ type TaskExecuteRequest struct { Attempt int32 `json:"attempt"` } -// TaskExecuteResponse is the response from task execution. -type TaskExecuteResponse struct { - // Error, if non-empty, indicates the task failed. The task will be retried - // if retries are configured and attempts remain. - Error string `json:"error,omitempty"` -} - // TaskWorker is the marker interface for taskworker plugins. // Implement one or more of the provider interfaces below. // TaskWorker provides task execution handling. @@ -40,10 +33,10 @@ type TaskWorker interface{} // TaskExecuteProvider provides the OnTaskExecute function. type TaskExecuteProvider interface { - OnTaskExecute(TaskExecuteRequest) (TaskExecuteResponse, error) + OnTaskExecute(TaskExecuteRequest) (string, error) } // Internal implementation holders var ( - taskExecuteImpl func(TaskExecuteRequest) (TaskExecuteResponse, error) + taskExecuteImpl func(TaskExecuteRequest) (string, error) ) // Register registers a taskworker implementation. diff --git a/plugins/pdk/go/taskworker/taskworker_stub.go b/plugins/pdk/go/taskworker/taskworker_stub.go index 9944f2d1f..e45054e8e 100644 --- a/plugins/pdk/go/taskworker/taskworker_stub.go +++ b/plugins/pdk/go/taskworker/taskworker_stub.go @@ -20,13 +20,6 @@ type TaskExecuteRequest struct { Attempt int32 `json:"attempt"` } -// TaskExecuteResponse is the response from task execution. -type TaskExecuteResponse struct { - // Error, if non-empty, indicates the task failed. The task will be retried - // if retries are configured and attempts remain. - Error string `json:"error,omitempty"` -} - // TaskWorker is the marker interface for taskworker plugins. // Implement one or more of the provider interfaces below. // TaskWorker provides task execution handling. @@ -37,7 +30,7 @@ type TaskWorker interface{} // TaskExecuteProvider provides the OnTaskExecute function. type TaskExecuteProvider interface { - OnTaskExecute(TaskExecuteRequest) (TaskExecuteResponse, error) + OnTaskExecute(TaskExecuteRequest) (string, error) } // NotImplementedCode is the standard return code for unimplemented functions. diff --git a/plugins/pdk/python/host/nd_host_taskqueue.py b/plugins/pdk/python/host/nd_host_task.py similarity index 72% rename from plugins/pdk/python/host/nd_host_taskqueue.py rename to plugins/pdk/python/host/nd_host_task.py index 5b7471b5d..b4b8a7245 100644 --- a/plugins/pdk/python/host/nd_host_taskqueue.py +++ b/plugins/pdk/python/host/nd_host_task.py @@ -1,6 +1,6 @@ # Code generated by ndpgen. DO NOT EDIT. # -# This file contains client wrappers for the TaskQueue host service. +# This file contains client wrappers for the Task host service. # It is intended for use in Navidrome plugins built with extism-py. # # IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly. @@ -12,6 +12,7 @@ from typing import Any import extism import json +import base64 class HostFunctionError(Exception): @@ -19,31 +20,31 @@ class HostFunctionError(Exception): pass -@extism.import_fn("extism:host/user", "taskqueue_createqueue") -def _taskqueue_createqueue(offset: int) -> int: +@extism.import_fn("extism:host/user", "task_createqueue") +def _task_createqueue(offset: int) -> int: """Raw host function - do not call directly.""" ... -@extism.import_fn("extism:host/user", "taskqueue_enqueue") -def _taskqueue_enqueue(offset: int) -> int: +@extism.import_fn("extism:host/user", "task_enqueue") +def _task_enqueue(offset: int) -> int: """Raw host function - do not call directly.""" ... -@extism.import_fn("extism:host/user", "taskqueue_gettaskstatus") -def _taskqueue_gettaskstatus(offset: int) -> int: +@extism.import_fn("extism:host/user", "task_get") +def _task_get(offset: int) -> int: """Raw host function - do not call directly.""" ... -@extism.import_fn("extism:host/user", "taskqueue_canceltask") -def _taskqueue_canceltask(offset: int) -> int: +@extism.import_fn("extism:host/user", "task_cancel") +def _task_cancel(offset: int) -> int: """Raw host function - do not call directly.""" ... -def taskqueue_create_queue(name: str, config: Any) -> None: +def task_create_queue(name: str, config: Any) -> None: """CreateQueue creates a named task queue with the given configuration. Zero-value fields in config use sensible defaults. If a queue with the same name already exists, returns an error. @@ -62,7 +63,7 @@ On startup, this also recovers any stale "running" tasks from a previous crash. } request_bytes = json.dumps(request).encode("utf-8") request_mem = extism.memory.alloc(request_bytes) - response_offset = _taskqueue_createqueue(request_mem.offset) + response_offset = _task_createqueue(request_mem.offset) response_mem = extism.memory.find(response_offset) response = json.loads(extism.memory.string(response_mem)) @@ -71,7 +72,7 @@ On startup, this also recovers any stale "running" tasks from a previous crash. -def taskqueue_enqueue(queue_name: str, payload: bytes) -> str: +def task_enqueue(queue_name: str, payload: bytes) -> str: """Enqueue adds a task to the named queue. Returns the task ID. payload is opaque bytes passed back to the plugin on execution. @@ -87,11 +88,11 @@ payload is opaque bytes passed back to the plugin on execution. """ request = { "queueName": queue_name, - "payload": payload, + "payload": base64.b64encode(payload).decode("ascii"), } request_bytes = json.dumps(request).encode("utf-8") request_mem = extism.memory.alloc(request_bytes) - response_offset = _taskqueue_enqueue(request_mem.offset) + response_offset = _task_enqueue(request_mem.offset) response_mem = extism.memory.find(response_offset) response = json.loads(extism.memory.string(response_mem)) @@ -101,15 +102,15 @@ payload is opaque bytes passed back to the plugin on execution. return response.get("result", "") -def taskqueue_get_task_status(task_id: str) -> str: - """GetTaskStatus returns the status of a task: "pending", "running", -"completed", "failed", or "cancelled". +def task_get(task_id: str) -> Any: + """Get returns the current state of a task including its status, +message, and attempt count. Args: task_id: str parameter. Returns: - str: The result value. + Any: The result value. Raises: HostFunctionError: If the host function returns an error. @@ -119,18 +120,18 @@ def taskqueue_get_task_status(task_id: str) -> str: } request_bytes = json.dumps(request).encode("utf-8") request_mem = extism.memory.alloc(request_bytes) - response_offset = _taskqueue_gettaskstatus(request_mem.offset) + response_offset = _task_get(request_mem.offset) response_mem = extism.memory.find(response_offset) response = json.loads(extism.memory.string(response_mem)) if response.get("error"): raise HostFunctionError(response["error"]) - return response.get("result", "") + return response.get("result", None) -def taskqueue_cancel_task(task_id: str) -> None: - """CancelTask cancels a pending task. Returns error if already +def task_cancel(task_id: str) -> None: + """Cancel cancels a pending task. Returns error if already running, completed, or failed. Args: @@ -144,7 +145,7 @@ running, completed, or failed. } request_bytes = json.dumps(request).encode("utf-8") request_mem = extism.memory.alloc(request_bytes) - response_offset = _taskqueue_canceltask(request_mem.offset) + response_offset = _task_cancel(request_mem.offset) response_mem = extism.memory.find(response_offset) response = json.loads(extism.memory.string(response_mem)) diff --git a/plugins/pdk/rust/nd-pdk-capabilities/src/taskworker.rs b/plugins/pdk/rust/nd-pdk-capabilities/src/taskworker.rs index 961d44b85..e8aa106a2 100644 --- a/plugins/pdk/rust/nd-pdk-capabilities/src/taskworker.rs +++ b/plugins/pdk/rust/nd-pdk-capabilities/src/taskworker.rs @@ -4,6 +4,29 @@ // It is intended for use in Navidrome plugins built with extism-pdk. use serde::{Deserialize, Serialize}; +use base64::Engine as _; +use base64::engine::general_purpose::STANDARD as BASE64; + +mod base64_bytes { + use serde::{self, Deserialize, Deserializer, Serializer}; + use base64::Engine as _; + use base64::engine::general_purpose::STANDARD as BASE64; + + pub fn serialize(bytes: &Vec, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&BASE64.encode(bytes)) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + BASE64.decode(&s).map_err(serde::de::Error::custom) + } +} // Helper functions for skip_serializing_if with numeric types #[allow(dead_code)] @@ -30,20 +53,12 @@ pub struct TaskExecuteRequest { pub task_id: String, /// Payload is the opaque data provided when the task was enqueued. #[serde(default)] + #[serde(with = "base64_bytes")] pub payload: Vec, /// Attempt is the current attempt number (1-based: first attempt = 1). #[serde(default)] pub attempt: i32, } -/// TaskExecuteResponse is the response from task execution. -#[derive(Debug, Clone, Default, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct TaskExecuteResponse { - /// Error, if non-empty, indicates the task failed. The task will be retried - /// if retries are configured and attempts remain. - #[serde(default, skip_serializing_if = "String::is_empty")] - pub error: String, -} /// Error represents an error from a capability method. #[derive(Debug)] @@ -67,7 +82,7 @@ impl Error { /// TaskExecuteProvider provides the OnTaskExecute function. pub trait TaskExecuteProvider { - fn on_task_execute(&self, req: TaskExecuteRequest) -> Result; + fn on_task_execute(&self, req: TaskExecuteRequest) -> Result; } /// Register the on_task_execute export. @@ -78,7 +93,7 @@ macro_rules! register_taskworker_task_execute { #[extism_pdk::plugin_fn] pub fn nd_task_execute( req: extism_pdk::Json<$crate::taskworker::TaskExecuteRequest> - ) -> extism_pdk::FnResult> { + ) -> extism_pdk::FnResult> { let plugin = <$plugin_type>::default(); let result = $crate::taskworker::TaskExecuteProvider::on_task_execute(&plugin, req.into_inner())?; Ok(extism_pdk::Json(result)) diff --git a/plugins/pdk/rust/nd-pdk-host/src/lib.rs b/plugins/pdk/rust/nd-pdk-host/src/lib.rs index f4c3fb2e5..3a31bc489 100644 --- a/plugins/pdk/rust/nd-pdk-host/src/lib.rs +++ b/plugins/pdk/rust/nd-pdk-host/src/lib.rs @@ -40,7 +40,7 @@ //! - [`library`] - provides access to music library metadata for plugins. //! - [`scheduler`] - provides task scheduling capabilities for plugins. //! - [`subsonicapi`] - provides access to Navidrome's Subsonic API from plugins. -//! - [`taskqueue`] - provides persistent task queues for plugins. +//! - [`task`] - provides persistent task queues for plugins. //! - [`users`] - provides access to user information for plugins. //! - [`websocket`] - provides WebSocket communication capabilities for plugins. @@ -101,10 +101,10 @@ pub mod subsonicapi { } #[doc(hidden)] -mod nd_host_taskqueue; +mod nd_host_task; /// provides persistent task queues for plugins. -pub mod taskqueue { - pub use super::nd_host_taskqueue::*; +pub mod task { + pub use super::nd_host_task::*; } #[doc(hidden)] diff --git a/plugins/pdk/rust/nd-pdk-host/src/nd_host_taskqueue.rs b/plugins/pdk/rust/nd-pdk-host/src/nd_host_task.rs similarity index 63% rename from plugins/pdk/rust/nd-pdk-host/src/nd_host_taskqueue.rs rename to plugins/pdk/rust/nd-pdk-host/src/nd_host_task.rs index ea2194f2f..051a85dec 100644 --- a/plugins/pdk/rust/nd-pdk-host/src/nd_host_taskqueue.rs +++ b/plugins/pdk/rust/nd-pdk-host/src/nd_host_task.rs @@ -1,10 +1,33 @@ // Code generated by ndpgen. DO NOT EDIT. // -// This file contains client wrappers for the TaskQueue host service. +// This file contains client wrappers for the Task host service. // It is intended for use in Navidrome plugins built with extism-pdk. use extism_pdk::*; use serde::{Deserialize, Serialize}; +use base64::Engine as _; +use base64::engine::general_purpose::STANDARD as BASE64; + +mod base64_bytes { + use serde::{self, Deserialize, Deserializer, Serializer}; + use base64::Engine as _; + use base64::engine::general_purpose::STANDARD as BASE64; + + pub fn serialize(bytes: &Vec, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&BASE64.encode(bytes)) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + BASE64.decode(&s).map_err(serde::de::Error::custom) + } +} /// QueueConfig holds configuration for a task queue. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -17,30 +40,40 @@ pub struct QueueConfig { pub retention_ms: i64, } +/// TaskInfo holds the current state of a task. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TaskInfo { + pub status: String, + pub message: String, + pub attempt: i32, +} + #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] -struct TaskQueueCreateQueueRequest { +struct TaskCreateQueueRequest { name: String, config: QueueConfig, } #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] -struct TaskQueueCreateQueueResponse { +struct TaskCreateQueueResponse { #[serde(default)] error: Option, } #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] -struct TaskQueueEnqueueRequest { +struct TaskEnqueueRequest { queue_name: String, + #[serde(with = "base64_bytes")] payload: Vec, } #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] -struct TaskQueueEnqueueResponse { +struct TaskEnqueueResponse { #[serde(default)] result: String, #[serde(default)] @@ -49,38 +82,38 @@ struct TaskQueueEnqueueResponse { #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] -struct TaskQueueGetTaskStatusRequest { +struct TaskGetRequest { task_id: String, } #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] -struct TaskQueueGetTaskStatusResponse { +struct TaskGetResponse { #[serde(default)] - result: String, + result: Option, #[serde(default)] error: Option, } #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] -struct TaskQueueCancelTaskRequest { +struct TaskCancelRequest { task_id: String, } #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] -struct TaskQueueCancelTaskResponse { +struct TaskCancelResponse { #[serde(default)] error: Option, } #[host_fn] extern "ExtismHost" { - fn taskqueue_createqueue(input: Json) -> Json; - fn taskqueue_enqueue(input: Json) -> Json; - fn taskqueue_gettaskstatus(input: Json) -> Json; - fn taskqueue_canceltask(input: Json) -> Json; + fn task_createqueue(input: Json) -> Json; + fn task_enqueue(input: Json) -> Json; + fn task_get(input: Json) -> Json; + fn task_cancel(input: Json) -> Json; } /// CreateQueue creates a named task queue with the given configuration. @@ -96,7 +129,7 @@ extern "ExtismHost" { /// Returns an error if the host function call fails. pub fn create_queue(name: &str, config: QueueConfig) -> Result<(), Error> { let response = unsafe { - taskqueue_createqueue(Json(TaskQueueCreateQueueRequest { + task_createqueue(Json(TaskCreateQueueRequest { name: name.to_owned(), config: config, }))? @@ -123,7 +156,7 @@ pub fn create_queue(name: &str, config: QueueConfig) -> Result<(), Error> { /// Returns an error if the host function call fails. pub fn enqueue(queue_name: &str, payload: Vec) -> Result { let response = unsafe { - taskqueue_enqueue(Json(TaskQueueEnqueueRequest { + task_enqueue(Json(TaskEnqueueRequest { queue_name: queue_name.to_owned(), payload: payload, }))? @@ -136,8 +169,8 @@ pub fn enqueue(queue_name: &str, payload: Vec) -> Result { Ok(response.0.result) } -/// GetTaskStatus returns the status of a task: "pending", "running", -/// "completed", "failed", or "cancelled". +/// Get returns the current state of a task including its status, +/// message, and attempt count. /// /// # Arguments /// * `task_id` - String parameter. @@ -147,9 +180,9 @@ pub fn enqueue(queue_name: &str, payload: Vec) -> Result { /// /// # Errors /// Returns an error if the host function call fails. -pub fn get_task_status(task_id: &str) -> Result { +pub fn get(task_id: &str) -> Result, Error> { let response = unsafe { - taskqueue_gettaskstatus(Json(TaskQueueGetTaskStatusRequest { + task_get(Json(TaskGetRequest { task_id: task_id.to_owned(), }))? }; @@ -161,7 +194,7 @@ pub fn get_task_status(task_id: &str) -> Result { Ok(response.0.result) } -/// CancelTask cancels a pending task. Returns error if already +/// Cancel cancels a pending task. Returns error if already /// running, completed, or failed. /// /// # Arguments @@ -169,9 +202,9 @@ pub fn get_task_status(task_id: &str) -> Result { /// /// # Errors /// Returns an error if the host function call fails. -pub fn cancel_task(task_id: &str) -> Result<(), Error> { +pub fn cancel(task_id: &str) -> Result<(), Error> { let response = unsafe { - taskqueue_canceltask(Json(TaskQueueCancelTaskRequest { + task_cancel(Json(TaskCancelRequest { task_id: task_id.to_owned(), }))? }; diff --git a/plugins/testdata/test-taskqueue/main.go b/plugins/testdata/test-taskqueue/main.go index cfc4be645..fdd067c19 100644 --- a/plugins/testdata/test-taskqueue/main.go +++ b/plugins/testdata/test-taskqueue/main.go @@ -3,6 +3,8 @@ package main import ( + "fmt" + "github.com/navidrome/navidrome/plugins/pdk/go/host" "github.com/navidrome/navidrome/plugins/pdk/go/pdk" "github.com/navidrome/navidrome/plugins/pdk/go/taskworker" @@ -14,15 +16,15 @@ func init() { type handler struct{} -func (h *handler) OnTaskExecute(req taskworker.TaskExecuteRequest) (taskworker.TaskExecuteResponse, error) { +func (h *handler) OnTaskExecute(req taskworker.TaskExecuteRequest) (string, error) { payload := string(req.Payload) if payload == "fail" { - return taskworker.TaskExecuteResponse{Error: "task failed as instructed"}, nil + return "", fmt.Errorf("task failed as instructed") } if payload == "fail-then-succeed" && req.Attempt < 2 { - return taskworker.TaskExecuteResponse{Error: "transient failure"}, nil + return "", fmt.Errorf("transient failure") } - return taskworker.TaskExecuteResponse{}, nil + return "completed successfully", nil } // Test helper types @@ -35,9 +37,11 @@ type TestInput struct { } type TestOutput struct { - TaskID string `json:"taskId,omitempty"` - Status string `json:"status,omitempty"` - Error *string `json:"error,omitempty"` + TaskID string `json:"taskId,omitempty"` + Status string `json:"status,omitempty"` + Message string `json:"message,omitempty"` + Attempt int32 `json:"attempt,omitempty"` + Error *string `json:"error,omitempty"` } //go:wasmexport nd_test_taskqueue @@ -55,7 +59,7 @@ func ndTestTaskQueue() int32 { if input.Config != nil { config = *input.Config } - err := host.TaskQueueCreateQueue(input.QueueName, config) + err := host.TaskCreateQueue(input.QueueName, config) if err != nil { errStr := err.Error() pdk.OutputJSON(TestOutput{Error: &errStr}) @@ -64,7 +68,7 @@ func ndTestTaskQueue() int32 { pdk.OutputJSON(TestOutput{}) case "enqueue": - taskID, err := host.TaskQueueEnqueue(input.QueueName, input.Payload) + taskID, err := host.TaskEnqueue(input.QueueName, input.Payload) if err != nil { errStr := err.Error() pdk.OutputJSON(TestOutput{Error: &errStr}) @@ -73,16 +77,16 @@ func ndTestTaskQueue() int32 { pdk.OutputJSON(TestOutput{TaskID: taskID}) case "get_task_status": - status, err := host.TaskQueueGetTaskStatus(input.TaskID) + info, err := host.TaskGet(input.TaskID) if err != nil { errStr := err.Error() pdk.OutputJSON(TestOutput{Error: &errStr}) return 0 } - pdk.OutputJSON(TestOutput{Status: status}) + pdk.OutputJSON(TestOutput{Status: info.Status, Message: info.Message, Attempt: info.Attempt}) case "cancel_task": - err := host.TaskQueueCancelTask(input.TaskID) + err := host.TaskCancel(input.TaskID) if err != nil { errStr := err.Error() pdk.OutputJSON(TestOutput{Error: &errStr})