Compare commits

...

20 Commits

Author SHA1 Message Date
Deluan
c2e8b39392 feat(plugins): update TaskWorker interface to return status messages and refactor task queue service
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-27 20:25:52 -05:00
Deluan
1974d1276e refactor(plugins): simplify goroutine management in task queue service
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-27 19:32:06 -05:00
Deluan
d7ace6f95f feat(plugins): increase maxConcurrency for task queue and handle budget exhaustion
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-27 19:32:06 -05:00
Deluan
a196ec9a59 refactor(plugins): streamline task queue configuration and error handling
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-27 19:32:06 -05:00
Deluan
132928abb6 fix(plugins): use context-aware database execution in TaskQueue host service
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-27 19:32:06 -05:00
Deluan
173aa9b979 refactor(plugins): remove capability check for TaskWorker in TaskQueue host service
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-27 19:32:06 -05:00
Deluan
3b2133c134 fix(plugins): harden TaskQueue host service with validation and safety improvements
Add input validation (queue name length, payload size limits), extract
status string constants to eliminate raw SQL literals, make CreateQueue
idempotent via upsert for crash recovery, fix RetentionMs default check
for negative values, cap exponential backoff at 1 hour to prevent
overflow, and replace manual mutex-based delay enforcement with
rate.Limiter from golang.org/x/time/rate for correct concurrent worker
serialization.
2026-02-27 19:32:06 -05:00
Deluan
74bacf6879 docs: document TaskQueue module for persistent task queues
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-27 19:32:06 -05:00
Deluan
55ef58da83 feat(plugins): add integration tests for TaskQueue host service 2026-02-27 19:32:06 -05:00
Deluan
2bfbe6fde1 feat(plugins): add test-taskqueue plugin for integration testing 2026-02-27 19:32:06 -05:00
Deluan
03cce614fd feat(plugins): register TaskQueue host service in manager 2026-02-27 19:32:06 -05:00
Deluan
36a8cb37ca feat(plugins): require TaskWorker capability for taskqueue permission 2026-02-27 19:32:06 -05:00
Deluan
11d2b3b51c feat(plugins): implement TaskQueue service with SQLite persistence and workers
Per-plugin SQLite database with queues and tasks tables. Worker goroutines
dequeue tasks and invoke nd_task_execute callback. Exponential backoff
retries, rate limiting via delayMs, automatic cleanup of terminal tasks.
2026-02-27 19:32:06 -05:00
Deluan
b308c71f38 feat(plugins): add taskqueue permission to manifest schema
Add TaskQueuePermission with maxConcurrency option.
2026-02-27 19:32:06 -05:00
Deluan
591f3a333b feat(plugins): define TaskWorker capability for task execution callbacks 2026-02-27 19:32:06 -05:00
Deluan
36b58a9a10 feat(plugins): define TaskQueue host service interface
Add the TaskQueueService interface with CreateQueue, Enqueue,
GetTaskStatus, and CancelTask methods plus QueueConfig struct.
2026-02-27 19:32:06 -05:00
Deluan Quintão
bd8032b327 fix(plugins): add base64 handling for []byte and remove raw=true (#5121)
* fix(plugins): add base64 handling for []byte and remove raw=true

Go's json.Marshal automatically base64-encodes []byte fields, but Rust's
serde_json serializes Vec<u8> as a JSON array and Python's json.dumps
raises TypeError on bytes. This fixes both directions of plugin
communication by adding proper base64 encoding/decoding in generated
client code.

For Rust templates (client and capability): adds a base64_bytes serde
helper module with #[serde(with = "base64_bytes")] on all Vec<u8> fields,
and adds base64 as a dependency. For Python templates: wraps bytes params
with base64.b64encode() and responses with base64.b64decode().

Also removes the raw=true binary framing protocol from all templates,
the parser, and the Method type. The raw mechanism added complexity that
is no longer needed once []byte works properly over JSON.

* fix(plugins): update production code and tests for base64 migration

Remove raw=true annotation from SubsonicAPI.CallRaw, delete all raw
test fixtures, remove raw-related test cases from parser, generator, and
integration tests, and add new test cases validating base64 handling
for Rust and Python templates.

* fix(plugins): update golden files and regenerate production code

Update golden test fixtures for codec and comprehensive services to
include base64 handling for []byte fields. Regenerate all production
PDK code (Go, Rust, Python) and host wrappers to use standard JSON
with base64-encoded byte fields instead of binary framing protocol.

* refactor: remove base64 helper duplication from rust template

Signed-off-by: Deluan <deluan@navidrome.org>

* fix(plugins): add base64 dependency to capabilities' Cargo.toml

Signed-off-by: Deluan <deluan@navidrome.org>

---------

Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-27 19:00:19 -05:00
Deluan
582d1b3cd9 refactor(plugins): validate scheduler capability at load time
Move scheduler capability check from runtime (when callback fires) to
load-time validation in ValidateWithCapabilities. This ensures plugins
declaring the scheduler permission must export the nd_scheduler_callback
function, failing fast with a clear error instead of silently skipping
callbacks at runtime.
2026-02-26 16:30:50 -05:00
Deluan
cdd3432788 refactor(http): rename HTTP client files and update struct names for consistency
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-26 16:19:37 -05:00
Deluan Quintão
5bc2bbb70e feat(subsonic): append album version to names in Subsonic API (#5111)
* feat(subsonic): append album version to album names in Subsonic API responses

Add AppendAlbumVersion config option (default: true) that appends the
album version tag to album names in Subsonic API responses, similar to
how AppendSubtitle works for track titles. This affects album names in
childFromAlbum and buildAlbumID3 responses.

Signed-off-by: Deluan <deluan@navidrome.org>

* feat(subsonic): append album version to media file album names in Subsonic API

Add FullAlbumName() to MediaFile that appends the album version tag,
mirroring the Album.FullName() behavior. Use it in childFromMediaFile
and fakePath to ensure media file responses also show the album version.

Signed-off-by: Deluan <deluan@navidrome.org>

* fix(subsonic): use len() check for album version tag to prevent panic on empty slice

Use len(tags) > 0 instead of != nil to safely guard against empty
slices when accessing the first element of the album version tag.

Signed-off-by: Deluan <deluan@navidrome.org>

* fix(subsonic): use FullName in buildAlbumDirectory and deduplicate FullName calls

Apply album.FullName() in buildAlbumDirectory (getMusicDirectory) so
album names are consistent across all Subsonic endpoints. Also compute
al.FullName() once in childFromAlbum to avoid redundant calls.

Signed-off-by: Deluan <deluan@navidrome.org>

* fix: use len() check in MediaFile.FullTitle() to prevent panic on empty slice

Apply the same safety improvement as FullAlbumName() and Album.FullName()
for consistency.

Signed-off-by: Deluan <deluan@navidrome.org>

* test: add tests for Album.FullName, MediaFile.FullTitle, and MediaFile.FullAlbumName

Cover all cases: config enabled/disabled, tag present, tag absent, and
empty tag slice.

Signed-off-by: Deluan <deluan@navidrome.org>

---------

Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-26 10:50:12 -05:00
73 changed files with 3759 additions and 882 deletions

View File

@@ -155,6 +155,7 @@ type scannerOptions struct {
type subsonicOptions struct { type subsonicOptions struct {
AppendSubtitle bool AppendSubtitle bool
AppendAlbumVersion bool
ArtistParticipations bool ArtistParticipations bool
DefaultReportRealPath bool DefaultReportRealPath bool
EnableAverageRating bool EnableAverageRating bool
@@ -689,6 +690,7 @@ func setViperDefaults() {
viper.SetDefault("scanner.followsymlinks", true) viper.SetDefault("scanner.followsymlinks", true)
viper.SetDefault("scanner.purgemissing", consts.PurgeMissingNever) viper.SetDefault("scanner.purgemissing", consts.PurgeMissingNever)
viper.SetDefault("subsonic.appendsubtitle", true) viper.SetDefault("subsonic.appendsubtitle", true)
viper.SetDefault("subsonic.appendalbumversion", true)
viper.SetDefault("subsonic.artistparticipations", false) viper.SetDefault("subsonic.artistparticipations", false)
viper.SetDefault("subsonic.defaultreportrealpath", false) viper.SetDefault("subsonic.defaultreportrealpath", false)
viper.SetDefault("subsonic.enableaveragerating", true) viper.SetDefault("subsonic.enableaveragerating", true)

View File

@@ -1,11 +1,14 @@
package model package model
import ( import (
"fmt"
"iter" "iter"
"math" "math"
"sync" "sync"
"time" "time"
"github.com/navidrome/navidrome/conf"
"github.com/gohugoio/hashstructure" "github.com/gohugoio/hashstructure"
) )
@@ -70,6 +73,13 @@ func (a Album) CoverArtID() ArtworkID {
return artworkIDFromAlbum(a) return artworkIDFromAlbum(a)
} }
func (a Album) FullName() string {
if conf.Server.Subsonic.AppendAlbumVersion && len(a.Tags[TagAlbumVersion]) > 0 {
return fmt.Sprintf("%s (%s)", a.Name, a.Tags[TagAlbumVersion][0])
}
return a.Name
}
// Equals compares two Album structs, ignoring calculated fields // Equals compares two Album structs, ignoring calculated fields
func (a Album) Equals(other Album) bool { func (a Album) Equals(other Album) bool {
// Normalize float32 values to avoid false negatives // Normalize float32 values to avoid false negatives

View File

@@ -3,11 +3,30 @@ package model_test
import ( import (
"encoding/json" "encoding/json"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/conf/configtest"
. "github.com/navidrome/navidrome/model" . "github.com/navidrome/navidrome/model"
. "github.com/onsi/ginkgo/v2" . "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
) )
var _ = Describe("Album", func() {
BeforeEach(func() {
DeferCleanup(configtest.SetupConfig())
})
DescribeTable("FullName",
func(enabled bool, tags Tags, expected string) {
conf.Server.Subsonic.AppendAlbumVersion = enabled
a := Album{Name: "Album", Tags: tags}
Expect(a.FullName()).To(Equal(expected))
},
Entry("appends version when enabled and tag is present", true, Tags{TagAlbumVersion: []string{"Remastered"}}, "Album (Remastered)"),
Entry("returns just name when disabled", false, Tags{TagAlbumVersion: []string{"Remastered"}}, "Album"),
Entry("returns just name when tag is absent", true, Tags{}, "Album"),
Entry("returns just name when tag is an empty slice", true, Tags{TagAlbumVersion: []string{}}, "Album"),
)
})
var _ = Describe("Albums", func() { var _ = Describe("Albums", func() {
var albums Albums var albums Albums

View File

@@ -95,12 +95,19 @@ type MediaFile struct {
} }
func (mf MediaFile) FullTitle() string { func (mf MediaFile) FullTitle() string {
if conf.Server.Subsonic.AppendSubtitle && mf.Tags[TagSubtitle] != nil { if conf.Server.Subsonic.AppendSubtitle && len(mf.Tags[TagSubtitle]) > 0 {
return fmt.Sprintf("%s (%s)", mf.Title, mf.Tags[TagSubtitle][0]) return fmt.Sprintf("%s (%s)", mf.Title, mf.Tags[TagSubtitle][0])
} }
return mf.Title return mf.Title
} }
func (mf MediaFile) FullAlbumName() string {
if conf.Server.Subsonic.AppendAlbumVersion && len(mf.Tags[TagAlbumVersion]) > 0 {
return fmt.Sprintf("%s (%s)", mf.Album, mf.Tags[TagAlbumVersion][0])
}
return mf.Album
}
func (mf MediaFile) ContentType() string { func (mf MediaFile) ContentType() string {
return mime.TypeByExtension("." + mf.Suffix) return mime.TypeByExtension("." + mf.Suffix)
} }

View File

@@ -475,7 +475,29 @@ var _ = Describe("MediaFile", func() {
DeferCleanup(configtest.SetupConfig()) DeferCleanup(configtest.SetupConfig())
conf.Server.EnableMediaFileCoverArt = true conf.Server.EnableMediaFileCoverArt = true
}) })
Describe(".CoverArtId()", func() { DescribeTable("FullTitle",
func(enabled bool, tags Tags, expected string) {
conf.Server.Subsonic.AppendSubtitle = enabled
mf := MediaFile{Title: "Song", Tags: tags}
Expect(mf.FullTitle()).To(Equal(expected))
},
Entry("appends subtitle when enabled and tag is present", true, Tags{TagSubtitle: []string{"Live"}}, "Song (Live)"),
Entry("returns just title when disabled", false, Tags{TagSubtitle: []string{"Live"}}, "Song"),
Entry("returns just title when tag is absent", true, Tags{}, "Song"),
Entry("returns just title when tag is an empty slice", true, Tags{TagSubtitle: []string{}}, "Song"),
)
DescribeTable("FullAlbumName",
func(enabled bool, tags Tags, expected string) {
conf.Server.Subsonic.AppendAlbumVersion = enabled
mf := MediaFile{Album: "Album", Tags: tags}
Expect(mf.FullAlbumName()).To(Equal(expected))
},
Entry("appends version when enabled and tag is present", true, Tags{TagAlbumVersion: []string{"Deluxe Edition"}}, "Album (Deluxe Edition)"),
Entry("returns just album name when disabled", false, Tags{TagAlbumVersion: []string{"Deluxe Edition"}}, "Album"),
Entry("returns just album name when tag is absent", true, Tags{}, "Album"),
Entry("returns just album name when tag is an empty slice", true, Tags{TagAlbumVersion: []string{}}, "Album"),
)
Describe("CoverArtId()", func() {
It("returns its own id if it HasCoverArt", func() { It("returns its own id if it HasCoverArt", func() {
mf := MediaFile{ID: "111", AlbumID: "1", HasCoverArt: true} mf := MediaFile{ID: "111", AlbumID: "1", HasCoverArt: true}
id := mf.CoverArtID() id := mf.CoverArtID()

View File

@@ -0,0 +1,27 @@
package capabilities
// TaskWorker provides task execution handling.
// This capability allows plugins to receive callbacks when their queued tasks
// are ready to execute. Plugins that use the taskqueue host service must
// implement this capability.
//
//nd:capability name=taskworker
type TaskWorker interface {
// OnTaskExecute is called when a queued task is ready to run.
// The returned string is a status/result message stored in the tasks table.
// Return an error to trigger retry (if retries are configured).
//nd:export name=nd_task_execute
OnTaskExecute(TaskExecuteRequest) (string, error)
}
// TaskExecuteRequest is the request provided when a task is ready to execute.
type TaskExecuteRequest struct {
// QueueName is the name of the queue this task belongs to.
QueueName string `json:"queueName"`
// TaskID is the unique identifier for this task.
TaskID string `json:"taskId"`
// Payload is the opaque data provided when the task was enqueued.
Payload []byte `json:"payload"`
// Attempt is the current attempt number (1-based: first attempt = 1).
Attempt int32 `json:"attempt"`
}

View File

@@ -0,0 +1,38 @@
version: v1-draft
exports:
nd_task_execute:
description: |-
OnTaskExecute is called when a queued task is ready to run.
The returned string is a status/result message stored in the tasks table.
Return an error to trigger retry (if retries are configured).
input:
$ref: '#/components/schemas/TaskExecuteRequest'
contentType: application/json
output:
type: string
contentType: application/json
components:
schemas:
TaskExecuteRequest:
description: TaskExecuteRequest is the request provided when a task is ready to execute.
properties:
queueName:
type: string
description: QueueName is the name of the queue this task belongs to.
taskId:
type: string
description: TaskID is the unique identifier for this task.
payload:
type: array
description: Payload is the opaque data provided when the task was enqueued.
items:
type: object
attempt:
type: integer
format: int32
description: 'Attempt is the current attempt number (1-based: first attempt = 1).'
required:
- queueName
- taskId
- payload
- attempt

View File

@@ -282,9 +282,6 @@ type ServiceB interface {
Entry("option pattern (value, exists bool)", Entry("option pattern (value, exists bool)",
"config_service.go.txt", "config_client_expected.go.txt", "config_client_expected.py", "config_client_expected.rs"), "config_service.go.txt", "config_client_expected.go.txt", "config_client_expected.py", "config_client_expected.rs"),
Entry("raw=true binary response",
"raw_service.go.txt", "raw_client_expected.go.txt", "raw_client_expected.py", "raw_client_expected.rs"),
) )
It("generates compilable client code for comprehensive service", func() { It("generates compilable client code for comprehensive service", func() {

View File

@@ -256,6 +256,15 @@ func GenerateClientRust(svc Service) ([]byte, error) {
return nil, fmt.Errorf("parsing template: %w", err) return nil, fmt.Errorf("parsing template: %w", err)
} }
partialContent, err := templatesFS.ReadFile("templates/base64_bytes.rs.tmpl")
if err != nil {
return nil, fmt.Errorf("reading base64_bytes partial: %w", err)
}
tmpl, err = tmpl.Parse(string(partialContent))
if err != nil {
return nil, fmt.Errorf("parsing base64_bytes partial: %w", err)
}
data := templateData{ data := templateData{
Service: svc, Service: svc,
} }
@@ -622,6 +631,15 @@ func GenerateCapabilityRust(cap Capability) ([]byte, error) {
return nil, fmt.Errorf("parsing template: %w", err) return nil, fmt.Errorf("parsing template: %w", err)
} }
partialContent, err := templatesFS.ReadFile("templates/base64_bytes.rs.tmpl")
if err != nil {
return nil, fmt.Errorf("reading base64_bytes partial: %w", err)
}
tmpl, err = tmpl.Parse(string(partialContent))
if err != nil {
return nil, fmt.Errorf("parsing base64_bytes partial: %w", err)
}
data := capabilityTemplateData{ data := capabilityTemplateData{
Package: cap.Name, Package: cap.Name,
Capability: cap, Capability: cap,

View File

@@ -264,96 +264,6 @@ var _ = Describe("Generator", func() {
Expect(codeStr).To(ContainSubstring(`extism "github.com/extism/go-sdk"`)) Expect(codeStr).To(ContainSubstring(`extism "github.com/extism/go-sdk"`))
}) })
It("should generate binary framing for raw=true methods", func() {
svc := Service{
Name: "Stream",
Permission: "stream",
Interface: "StreamService",
Methods: []Method{
{
Name: "GetStream",
HasError: true,
Raw: true,
Params: []Param{NewParam("uri", "string")},
Returns: []Param{
NewParam("contentType", "string"),
NewParam("data", "[]byte"),
},
},
},
}
code, err := GenerateHost(svc, "host")
Expect(err).NotTo(HaveOccurred())
_, err = format.Source(code)
Expect(err).NotTo(HaveOccurred())
codeStr := string(code)
// Should include encoding/binary import for raw methods
Expect(codeStr).To(ContainSubstring(`"encoding/binary"`))
// Should NOT generate a response type for raw methods
Expect(codeStr).NotTo(ContainSubstring("type StreamGetStreamResponse struct"))
// Should generate request type (request is still JSON)
Expect(codeStr).To(ContainSubstring("type StreamGetStreamRequest struct"))
// Should build binary frame [0x00][4-byte CT len][CT][data]
Expect(codeStr).To(ContainSubstring("frame[0] = 0x00"))
Expect(codeStr).To(ContainSubstring("binary.BigEndian.PutUint32"))
// Should have writeRawError helper
Expect(codeStr).To(ContainSubstring("streamWriteRawError"))
// Should use writeRawError instead of writeError for raw methods
Expect(codeStr).To(ContainSubstring("streamWriteRawError(p, stack"))
})
It("should generate both writeError and writeRawError for mixed services", func() {
svc := Service{
Name: "API",
Permission: "api",
Interface: "APIService",
Methods: []Method{
{
Name: "Call",
HasError: true,
Params: []Param{NewParam("uri", "string")},
Returns: []Param{NewParam("response", "string")},
},
{
Name: "CallRaw",
HasError: true,
Raw: true,
Params: []Param{NewParam("uri", "string")},
Returns: []Param{
NewParam("contentType", "string"),
NewParam("data", "[]byte"),
},
},
},
}
code, err := GenerateHost(svc, "host")
Expect(err).NotTo(HaveOccurred())
_, err = format.Source(code)
Expect(err).NotTo(HaveOccurred())
codeStr := string(code)
// Should have both helpers
Expect(codeStr).To(ContainSubstring("apiWriteResponse"))
Expect(codeStr).To(ContainSubstring("apiWriteError"))
Expect(codeStr).To(ContainSubstring("apiWriteRawError"))
// Should generate response type for non-raw method only
Expect(codeStr).To(ContainSubstring("type APICallResponse struct"))
Expect(codeStr).NotTo(ContainSubstring("type APICallRawResponse struct"))
})
It("should always include json import for JSON protocol", func() { It("should always include json import for JSON protocol", func() {
// All services use JSON protocol, so json import is always needed // All services use JSON protocol, so json import is always needed
svc := Service{ svc := Service{
@@ -717,49 +627,7 @@ var _ = Describe("Generator", func() {
Expect(codeStr).To(ContainSubstring(`response.get("boolVal", False)`)) Expect(codeStr).To(ContainSubstring(`response.get("boolVal", False)`))
}) })
It("should generate binary frame parsing for raw methods", func() { It("should not import base64 for non-byte services", func() {
svc := Service{
Name: "Stream",
Permission: "stream",
Interface: "StreamService",
Methods: []Method{
{
Name: "GetStream",
HasError: true,
Raw: true,
Params: []Param{NewParam("uri", "string")},
Returns: []Param{
NewParam("contentType", "string"),
NewParam("data", "[]byte"),
},
Doc: "GetStream returns raw binary stream data.",
},
},
}
code, err := GenerateClientPython(svc)
Expect(err).NotTo(HaveOccurred())
codeStr := string(code)
// Should import Tuple and struct for raw methods
Expect(codeStr).To(ContainSubstring("from typing import Any, Tuple"))
Expect(codeStr).To(ContainSubstring("import struct"))
// Should return Tuple[str, bytes]
Expect(codeStr).To(ContainSubstring("-> Tuple[str, bytes]:"))
// Should parse binary frame instead of JSON
Expect(codeStr).To(ContainSubstring("response_bytes = response_mem.bytes()"))
Expect(codeStr).To(ContainSubstring("response_bytes[0] == 0x01"))
Expect(codeStr).To(ContainSubstring("struct.unpack"))
Expect(codeStr).To(ContainSubstring("return content_type, data"))
// Should NOT use json.loads for response
Expect(codeStr).NotTo(ContainSubstring("json.loads(extism.memory.string(response_mem))"))
})
It("should not import Tuple or struct for non-raw services", func() {
svc := Service{ svc := Service{
Name: "Test", Name: "Test",
Permission: "test", Permission: "test",
@@ -779,8 +647,37 @@ var _ = Describe("Generator", func() {
codeStr := string(code) codeStr := string(code)
Expect(codeStr).NotTo(ContainSubstring("Tuple")) Expect(codeStr).NotTo(ContainSubstring("import base64"))
Expect(codeStr).NotTo(ContainSubstring("import struct")) })
It("should generate base64 encoding/decoding for byte fields", func() {
svc := Service{
Name: "Codec",
Permission: "codec",
Interface: "CodecService",
Methods: []Method{
{
Name: "Encode",
HasError: true,
Params: []Param{NewParam("data", "[]byte")},
Returns: []Param{NewParam("result", "[]byte")},
},
},
}
code, err := GenerateClientPython(svc)
Expect(err).NotTo(HaveOccurred())
codeStr := string(code)
// Should import base64
Expect(codeStr).To(ContainSubstring("import base64"))
// Should base64-encode byte params in request
Expect(codeStr).To(ContainSubstring(`base64.b64encode(data).decode("ascii")`))
// Should base64-decode byte returns in response
Expect(codeStr).To(ContainSubstring(`base64.b64decode(response.get("result", ""))`))
}) })
}) })
@@ -939,46 +836,6 @@ var _ = Describe("Generator", func() {
Expect(codeStr).To(ContainSubstring("github.com/navidrome/navidrome/plugins/pdk/go/pdk")) Expect(codeStr).To(ContainSubstring("github.com/navidrome/navidrome/plugins/pdk/go/pdk"))
}) })
It("should include encoding/binary import for raw methods", func() {
svc := Service{
Name: "Stream",
Permission: "stream",
Interface: "StreamService",
Methods: []Method{
{
Name: "GetStream",
HasError: true,
Raw: true,
Params: []Param{NewParam("uri", "string")},
Returns: []Param{
NewParam("contentType", "string"),
NewParam("data", "[]byte"),
},
},
},
}
code, err := GenerateClientGo(svc, "host")
Expect(err).NotTo(HaveOccurred())
codeStr := string(code)
// Should include encoding/binary for raw binary frame parsing
Expect(codeStr).To(ContainSubstring(`"encoding/binary"`))
// Should NOT generate response type struct for raw methods
Expect(codeStr).NotTo(ContainSubstring("streamGetStreamResponse struct"))
// Should still generate request type
Expect(codeStr).To(ContainSubstring("streamGetStreamRequest struct"))
// Should parse binary frame
Expect(codeStr).To(ContainSubstring("responseBytes[0] == 0x01"))
Expect(codeStr).To(ContainSubstring("binary.BigEndian.Uint32"))
// Should return (string, []byte, error)
Expect(codeStr).To(ContainSubstring("func StreamGetStream(uri string) (string, []byte, error)"))
})
}) })
Describe("GenerateClientGoStub", func() { Describe("GenerateClientGoStub", func() {
@@ -1748,22 +1605,17 @@ var _ = Describe("Rust Generation", func() {
Expect(codeStr).NotTo(ContainSubstring("Option<bool>")) Expect(codeStr).NotTo(ContainSubstring("Option<bool>"))
}) })
It("should generate raw extern C import and binary frame parsing for raw methods", func() { It("should generate base64 serde for Vec<u8> fields", func() {
svc := Service{ svc := Service{
Name: "Stream", Name: "Codec",
Permission: "stream", Permission: "codec",
Interface: "StreamService", Interface: "CodecService",
Methods: []Method{ Methods: []Method{
{ {
Name: "GetStream", Name: "Encode",
HasError: true, HasError: true,
Raw: true, Params: []Param{NewParam("data", "[]byte")},
Params: []Param{NewParam("uri", "string")}, Returns: []Param{NewParam("result", "[]byte")},
Returns: []Param{
NewParam("contentType", "string"),
NewParam("data", "[]byte"),
},
Doc: "GetStream returns raw binary stream data.",
}, },
}, },
} }
@@ -1773,24 +1625,36 @@ var _ = Describe("Rust Generation", func() {
codeStr := string(code) codeStr := string(code)
// Should use extern "C" with wasm_import_module for raw methods, not #[host_fn] extern "ExtismHost" // Should generate base64_bytes serde module
Expect(codeStr).To(ContainSubstring(`#[link(wasm_import_module = "extism:host/user")]`)) Expect(codeStr).To(ContainSubstring("mod base64_bytes"))
Expect(codeStr).To(ContainSubstring(`extern "C"`)) Expect(codeStr).To(ContainSubstring("use base64::Engine as _"))
Expect(codeStr).To(ContainSubstring("fn stream_getstream(offset: u64) -> u64"))
// Should NOT generate response type for raw methods // Should add serde(with = "base64_bytes") on Vec<u8> fields
Expect(codeStr).NotTo(ContainSubstring("StreamGetStreamResponse")) Expect(codeStr).To(ContainSubstring(`#[serde(with = "base64_bytes")]`))
})
// Should generate request type (request is still JSON) It("should not generate base64 module when no byte fields", func() {
Expect(codeStr).To(ContainSubstring("struct StreamGetStreamRequest")) svc := Service{
Name: "Test",
Permission: "test",
Interface: "TestService",
Methods: []Method{
{
Name: "Call",
HasError: true,
Params: []Param{NewParam("uri", "string")},
Returns: []Param{NewParam("response", "string")},
},
},
}
// Should return Result<(String, Vec<u8>), Error> code, err := GenerateClientRust(svc)
Expect(codeStr).To(ContainSubstring("Result<(String, Vec<u8>), Error>")) Expect(err).NotTo(HaveOccurred())
// Should parse binary frame codeStr := string(code)
Expect(codeStr).To(ContainSubstring("response_bytes[0] == 0x01"))
Expect(codeStr).To(ContainSubstring("u32::from_be_bytes")) Expect(codeStr).NotTo(ContainSubstring("mod base64_bytes"))
Expect(codeStr).To(ContainSubstring("String::from_utf8_lossy")) Expect(codeStr).NotTo(ContainSubstring("use base64"))
}) })
}) })
}) })

View File

@@ -761,7 +761,6 @@ func parseMethod(name string, funcType *ast.FuncType, annotation map[string]stri
m := Method{ m := Method{
Name: name, Name: name,
ExportName: annotation["name"], ExportName: annotation["name"],
Raw: annotation["raw"] == "true",
Doc: doc, Doc: doc,
} }
@@ -800,13 +799,6 @@ func parseMethod(name string, funcType *ast.FuncType, annotation map[string]stri
} }
} }
// Validate raw=true methods: must return exactly (string, []byte, error)
if m.Raw {
if !m.HasError || len(m.Returns) != 2 || m.Returns[0].Type != "string" || m.Returns[1].Type != "[]byte" {
return m, fmt.Errorf("raw=true method %s must return (string, []byte, error) — content-type, data, error", name)
}
}
return m, nil return m, nil
} }

View File

@@ -122,119 +122,6 @@ type TestService interface {
Expect(services[0].Methods[0].Name).To(Equal("Exported")) Expect(services[0].Methods[0].Name).To(Equal("Exported"))
}) })
It("should parse raw=true annotation", func() {
src := `package host
import "context"
//nd:hostservice name=Stream permission=stream
type StreamService interface {
//nd:hostfunc raw=true
GetStream(ctx context.Context, uri string) (contentType string, data []byte, err error)
}
`
err := os.WriteFile(filepath.Join(tmpDir, "stream.go"), []byte(src), 0600)
Expect(err).NotTo(HaveOccurred())
services, err := ParseDirectory(tmpDir)
Expect(err).NotTo(HaveOccurred())
Expect(services).To(HaveLen(1))
m := services[0].Methods[0]
Expect(m.Name).To(Equal("GetStream"))
Expect(m.Raw).To(BeTrue())
Expect(m.HasError).To(BeTrue())
Expect(m.Returns).To(HaveLen(2))
Expect(m.Returns[0].Name).To(Equal("contentType"))
Expect(m.Returns[0].Type).To(Equal("string"))
Expect(m.Returns[1].Name).To(Equal("data"))
Expect(m.Returns[1].Type).To(Equal("[]byte"))
})
It("should set Raw=false when raw annotation is absent", func() {
src := `package host
import "context"
//nd:hostservice name=Test permission=test
type TestService interface {
//nd:hostfunc
Call(ctx context.Context, uri string) (response string, err error)
}
`
err := os.WriteFile(filepath.Join(tmpDir, "test.go"), []byte(src), 0600)
Expect(err).NotTo(HaveOccurred())
services, err := ParseDirectory(tmpDir)
Expect(err).NotTo(HaveOccurred())
Expect(services[0].Methods[0].Raw).To(BeFalse())
})
It("should reject raw=true with invalid return signature", func() {
src := `package host
import "context"
//nd:hostservice name=Test permission=test
type TestService interface {
//nd:hostfunc raw=true
BadRaw(ctx context.Context, uri string) (result string, err error)
}
`
err := os.WriteFile(filepath.Join(tmpDir, "test.go"), []byte(src), 0600)
Expect(err).NotTo(HaveOccurred())
_, err = ParseDirectory(tmpDir)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("raw=true"))
Expect(err.Error()).To(ContainSubstring("must return (string, []byte, error)"))
})
It("should reject raw=true without error return", func() {
src := `package host
import "context"
//nd:hostservice name=Test permission=test
type TestService interface {
//nd:hostfunc raw=true
BadRaw(ctx context.Context, uri string) (contentType string, data []byte)
}
`
err := os.WriteFile(filepath.Join(tmpDir, "test.go"), []byte(src), 0600)
Expect(err).NotTo(HaveOccurred())
_, err = ParseDirectory(tmpDir)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("raw=true"))
})
It("should parse mixed raw and non-raw methods", func() {
src := `package host
import "context"
//nd:hostservice name=API permission=api
type APIService interface {
//nd:hostfunc
Call(ctx context.Context, uri string) (responseJSON string, err error)
//nd:hostfunc raw=true
CallRaw(ctx context.Context, uri string) (contentType string, data []byte, err error)
}
`
err := os.WriteFile(filepath.Join(tmpDir, "api.go"), []byte(src), 0600)
Expect(err).NotTo(HaveOccurred())
services, err := ParseDirectory(tmpDir)
Expect(err).NotTo(HaveOccurred())
Expect(services).To(HaveLen(1))
Expect(services[0].Methods).To(HaveLen(2))
Expect(services[0].Methods[0].Raw).To(BeFalse())
Expect(services[0].Methods[1].Raw).To(BeTrue())
Expect(services[0].HasRawMethods()).To(BeTrue())
})
It("should handle custom export name", func() { It("should handle custom export name", func() {
src := `package host src := `package host

View File

@@ -0,0 +1,25 @@
{{define "base64_bytes_module"}}
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
mod base64_bytes {
use serde::{self, Deserialize, Deserializer, Serializer};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
BASE64.decode(&s).map_err(serde::de::Error::custom)
}
}
{{- end}}

View File

@@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
{{- if hasHashMap .Capability}} {{- if hasHashMap .Capability}}
use std::collections::HashMap; use std::collections::HashMap;
{{- end}} {{- end}}
{{- if .Capability.HasByteFields}}{{template "base64_bytes_module" .}}{{- end}}
// Helper functions for skip_serializing_if with numeric types // Helper functions for skip_serializing_if with numeric types
#[allow(dead_code)] #[allow(dead_code)]
@@ -70,6 +71,9 @@ pub struct {{.Name}} {
#[serde(default, skip_serializing_if = "{{skipSerializingFunc .Type}}")] #[serde(default, skip_serializing_if = "{{skipSerializingFunc .Type}}")]
{{- else}} {{- else}}
#[serde(default)] #[serde(default)]
{{- end}}
{{- if .IsByteSlice}}
#[serde(with = "base64_bytes")]
{{- end}} {{- end}}
pub {{rustFieldName .Name}}: {{fieldRustType .}}, pub {{rustFieldName .Name}}: {{fieldRustType .}},
{{- end}} {{- end}}

View File

@@ -8,9 +8,6 @@
package {{.Package}} package {{.Package}}
import ( import (
{{- if .Service.HasRawMethods}}
"encoding/binary"
{{- end}}
"encoding/json" "encoding/json"
{{- if .Service.HasErrors}} {{- if .Service.HasErrors}}
"errors" "errors"
@@ -52,7 +49,7 @@ type {{requestType .}} struct {
{{- end}} {{- end}}
} }
{{- end}} {{- end}}
{{- if and (not .IsErrorOnly) (not .Raw)}} {{- if not .IsErrorOnly}}
type {{responseType .}} struct { type {{responseType .}} struct {
{{- range .Returns}} {{- range .Returns}}
@@ -98,27 +95,7 @@ func {{$.Service.Name}}{{.Name}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{
// Read the response from memory // Read the response from memory
responseMem := pdk.FindMemory(responsePtr) responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes() responseBytes := responseMem.ReadBytes()
{{- if .Raw}} {{- if .IsErrorOnly}}
// Parse binary-framed response
if len(responseBytes) == 0 {
return "", nil, errors.New("empty response from host")
}
if responseBytes[0] == 0x01 { // error
return "", nil, errors.New(string(responseBytes[1:]))
}
if responseBytes[0] != 0x00 {
return "", nil, errors.New("unknown response status")
}
if len(responseBytes) < 5 {
return "", nil, errors.New("malformed raw response: incomplete header")
}
ctLen := binary.BigEndian.Uint32(responseBytes[1:5])
if uint32(len(responseBytes)) < 5+ctLen {
return "", nil, errors.New("malformed raw response: content-type overflow")
}
return string(responseBytes[5 : 5+ctLen]), responseBytes[5+ctLen:], nil
{{- else if .IsErrorOnly}}
// Parse error-only response // Parse error-only response
var response struct { var response struct {

View File

@@ -8,12 +8,12 @@
# main __init__.py file. Copy the needed functions from this file into your plugin. # main __init__.py file. Copy the needed functions from this file into your plugin.
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any{{- if .Service.HasRawMethods}}, Tuple{{end}} from typing import Any
import extism import extism
import json import json
{{- if .Service.HasRawMethods}} {{- if .Service.HasByteFields}}
import struct import base64
{{- end}} {{- end}}
@@ -32,7 +32,7 @@ def _{{exportName .}}(offset: int) -> int:
{{- end}} {{- end}}
{{- /* Generate dataclasses for multi-value returns */ -}} {{- /* Generate dataclasses for multi-value returns */ -}}
{{range .Service.Methods}} {{range .Service.Methods}}
{{- if and .NeedsResultClass (not .Raw)}} {{- if .NeedsResultClass}}
@dataclass @dataclass
@@ -47,7 +47,7 @@ class {{pythonResultType .}}:
{{range .Service.Methods}} {{range .Service.Methods}}
def {{pythonFunc .}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{$p.PythonName}}: {{$p.PythonType}}{{end}}){{if .Raw}} -> Tuple[str, bytes]{{else if .NeedsResultClass}} -> {{pythonResultType .}}{{else if .HasReturns}} -> {{(index .Returns 0).PythonType}}{{else}} -> None{{end}}: def {{pythonFunc .}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{$p.PythonName}}: {{$p.PythonType}}{{end}}){{if .NeedsResultClass}} -> {{pythonResultType .}}{{else if .HasReturns}} -> {{(index .Returns 0).PythonType}}{{else}} -> None{{end}}:
"""{{if .Doc}}{{.Doc}}{{else}}Call the {{exportName .}} host function.{{end}} """{{if .Doc}}{{.Doc}}{{else}}Call the {{exportName .}} host function.{{end}}
{{- if .HasParams}} {{- if .HasParams}}
@@ -56,11 +56,7 @@ def {{pythonFunc .}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{$p.PythonNam
{{.PythonName}}: {{.PythonType}} parameter. {{.PythonName}}: {{.PythonType}} parameter.
{{- end}} {{- end}}
{{- end}} {{- end}}
{{- if .Raw}} {{- if .HasReturns}}
Returns:
Tuple of (content_type, data) with the raw binary response.
{{- else if .HasReturns}}
Returns: Returns:
{{- if .NeedsResultClass}} {{- if .NeedsResultClass}}
@@ -76,7 +72,11 @@ def {{pythonFunc .}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{$p.PythonNam
{{- if .HasParams}} {{- if .HasParams}}
request = { request = {
{{- range .Params}} {{- range .Params}}
{{- if .IsByteSlice}}
"{{.JSONName}}": base64.b64encode({{.PythonName}}).decode("ascii"),
{{- else}}
"{{.JSONName}}": {{.PythonName}}, "{{.JSONName}}": {{.PythonName}},
{{- end}}
{{- end}} {{- end}}
} }
request_bytes = json.dumps(request).encode("utf-8") request_bytes = json.dumps(request).encode("utf-8")
@@ -86,24 +86,6 @@ def {{pythonFunc .}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{$p.PythonNam
request_mem = extism.memory.alloc(request_bytes) request_mem = extism.memory.alloc(request_bytes)
response_offset = _{{exportName .}}(request_mem.offset) response_offset = _{{exportName .}}(request_mem.offset)
response_mem = extism.memory.find(response_offset) response_mem = extism.memory.find(response_offset)
{{- if .Raw}}
response_bytes = response_mem.bytes()
if len(response_bytes) == 0:
raise HostFunctionError("empty response from host")
if response_bytes[0] == 0x01:
raise HostFunctionError(response_bytes[1:].decode("utf-8"))
if response_bytes[0] != 0x00:
raise HostFunctionError("unknown response status")
if len(response_bytes) < 5:
raise HostFunctionError("malformed raw response: incomplete header")
ct_len = struct.unpack(">I", response_bytes[1:5])[0]
if len(response_bytes) < 5 + ct_len:
raise HostFunctionError("malformed raw response: content-type overflow")
content_type = response_bytes[5:5 + ct_len].decode("utf-8")
data = response_bytes[5 + ct_len:]
return content_type, data
{{- else}}
response = json.loads(extism.memory.string(response_mem)) response = json.loads(extism.memory.string(response_mem))
{{if .HasError}} {{if .HasError}}
if response.get("error"): if response.get("error"):
@@ -112,10 +94,17 @@ def {{pythonFunc .}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{$p.PythonNam
{{- if .NeedsResultClass}} {{- if .NeedsResultClass}}
return {{pythonResultType .}}( return {{pythonResultType .}}(
{{- range .Returns}} {{- range .Returns}}
{{- if .IsByteSlice}}
{{.PythonName}}=base64.b64decode(response.get("{{.JSONName}}", "")),
{{- else}}
{{.PythonName}}=response.get("{{.JSONName}}"{{pythonDefault .}}), {{.PythonName}}=response.get("{{.JSONName}}"{{pythonDefault .}}),
{{- end}}
{{- end}} {{- end}}
) )
{{- else if .HasReturns}} {{- else if .HasReturns}}
{{- if (index .Returns 0).IsByteSlice}}
return base64.b64decode(response.get("{{(index .Returns 0).JSONName}}", ""))
{{- else}}
return response.get("{{(index .Returns 0).JSONName}}"{{pythonDefault (index .Returns 0)}}) return response.get("{{(index .Returns 0).JSONName}}"{{pythonDefault (index .Returns 0)}})
{{- end}} {{- end}}
{{- end}} {{- end}}

View File

@@ -5,6 +5,7 @@
use extism_pdk::*; use extism_pdk::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
{{- if .Service.HasByteFields}}{{template "base64_bytes_module" .}}{{- end}}
{{- /* Generate struct definitions */ -}} {{- /* Generate struct definitions */ -}}
{{- range .Service.Structs}} {{- range .Service.Structs}}
{{if .Doc}} {{if .Doc}}
@@ -16,6 +17,9 @@ pub struct {{.Name}} {
{{- range .Fields}} {{- range .Fields}}
{{- if .NeedsDefault}} {{- if .NeedsDefault}}
#[serde(default)] #[serde(default)]
{{- end}}
{{- if .IsByteSlice}}
#[serde(with = "base64_bytes")]
{{- end}} {{- end}}
pub {{.RustName}}: {{fieldRustType .}}, pub {{.RustName}}: {{fieldRustType .}},
{{- end}} {{- end}}
@@ -29,17 +33,22 @@ pub struct {{.Name}} {
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct {{requestType .}} { struct {{requestType .}} {
{{- range .Params}} {{- range .Params}}
{{- if .IsByteSlice}}
#[serde(with = "base64_bytes")]
{{- end}}
{{.RustName}}: {{rustType .}}, {{.RustName}}: {{rustType .}},
{{- end}} {{- end}}
} }
{{- end}} {{- end}}
{{- if not .Raw}}
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct {{responseType .}} { struct {{responseType .}} {
{{- range .Returns}} {{- range .Returns}}
#[serde(default)] #[serde(default)]
{{- if .IsByteSlice}}
#[serde(with = "base64_bytes")]
{{- end}}
{{.RustName}}: {{rustType .}}, {{.RustName}}: {{rustType .}},
{{- end}} {{- end}}
{{- if .HasError}} {{- if .HasError}}
@@ -48,92 +57,16 @@ struct {{responseType .}} {
{{- end}} {{- end}}
} }
{{- end}} {{- end}}
{{- end}}
#[host_fn] #[host_fn]
extern "ExtismHost" { extern "ExtismHost" {
{{- range .Service.Methods}} {{- range .Service.Methods}}
{{- if not .Raw}}
fn {{exportName .}}(input: Json<{{if .HasParams}}{{requestType .}}{{else}}serde_json::Value{{end}}>) -> Json<{{responseType .}}>; fn {{exportName .}}(input: Json<{{if .HasParams}}{{requestType .}}{{else}}serde_json::Value{{end}}>) -> Json<{{responseType .}}>;
{{- end}} {{- end}}
{{- end}}
} }
{{- /* Declare raw extern "C" imports for raw methods */ -}}
{{- range .Service.Methods}}
{{- if .Raw}}
#[link(wasm_import_module = "extism:host/user")]
extern "C" {
fn {{exportName .}}(offset: u64) -> u64;
}
{{- end}}
{{- end}}
{{- /* Generate wrapper functions */ -}} {{- /* Generate wrapper functions */ -}}
{{range .Service.Methods}} {{range .Service.Methods}}
{{- if .Raw}}
{{if .Doc}}{{rustDocComment .Doc}}{{else}}/// Calls the {{exportName .}} host function.{{end}}
{{- if .HasParams}}
///
/// # Arguments
{{- range .Params}}
/// * `{{.RustName}}` - {{rustType .}} parameter.
{{- end}}
{{- end}}
///
/// # Returns
/// A tuple of (content_type, data) with the raw binary response.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn {{rustFunc .}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{$p.RustName}}: {{rustParamType $p}}{{end}}) -> Result<(String, Vec<u8>), Error> {
{{- if .HasParams}}
let req = {{requestType .}} {
{{- range .Params}}
{{.RustName}}: {{.RustName}}{{if .NeedsToOwned}}.to_owned(){{end}},
{{- end}}
};
let input_bytes = serde_json::to_vec(&req).map_err(|e| Error::msg(e.to_string()))?;
{{- else}}
let input_bytes = b"{}".to_vec();
{{- end}}
let input_mem = Memory::from_bytes(&input_bytes).map_err(|e| Error::msg(e.to_string()))?;
let response_offset = unsafe { {{exportName .}}(input_mem.offset()) };
let response_mem = Memory::find(response_offset)
.ok_or_else(|| Error::msg("empty response from host"))?;
let response_bytes = response_mem.to_vec();
if response_bytes.is_empty() {
return Err(Error::msg("empty response from host"));
}
if response_bytes[0] == 0x01 {
let msg = String::from_utf8_lossy(&response_bytes[1..]).to_string();
return Err(Error::msg(msg));
}
if response_bytes[0] != 0x00 {
return Err(Error::msg("unknown response status"));
}
if response_bytes.len() < 5 {
return Err(Error::msg("malformed raw response: incomplete header"));
}
let ct_len = u32::from_be_bytes([
response_bytes[1],
response_bytes[2],
response_bytes[3],
response_bytes[4],
]) as usize;
if ct_len > response_bytes.len() - 5 {
return Err(Error::msg("malformed raw response: content-type overflow"));
}
let ct_end = 5 + ct_len;
let content_type = String::from_utf8_lossy(&response_bytes[5..ct_end]).to_string();
let data = response_bytes[ct_end..].to_vec();
Ok((content_type, data))
}
{{- else}}
{{if .Doc}}{{rustDocComment .Doc}}{{else}}/// Calls the {{exportName .}} host function.{{end}} {{if .Doc}}{{rustDocComment .Doc}}{{else}}/// Calls the {{exportName .}} host function.{{end}}
{{- if .HasParams}} {{- if .HasParams}}
@@ -209,4 +142,3 @@ pub fn {{rustFunc .}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{$p.RustName
} }
{{- end}} {{- end}}
{{- end}} {{- end}}
{{- end}}

View File

@@ -4,9 +4,6 @@ package {{.Package}}
import ( import (
"context" "context"
{{- if .Service.HasRawMethods}}
"encoding/binary"
{{- end}}
"encoding/json" "encoding/json"
extism "github.com/extism/go-sdk" extism "github.com/extism/go-sdk"
@@ -23,7 +20,6 @@ type {{requestType .}} struct {
{{- end}} {{- end}}
} }
{{- end}} {{- end}}
{{- if not .Raw}}
// {{responseType .}} is the response type for {{$.Service.Name}}.{{.Name}}. // {{responseType .}} is the response type for {{$.Service.Name}}.{{.Name}}.
type {{responseType .}} struct { type {{responseType .}} struct {
@@ -34,7 +30,6 @@ type {{responseType .}} struct {
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
{{- end}} {{- end}}
} }
{{- end}}
{{end}} {{end}}
// Register{{.Service.Name}}HostFunctions registers {{.Service.Name}} service host functions. // Register{{.Service.Name}}HostFunctions registers {{.Service.Name}} service host functions.
@@ -56,48 +51,18 @@ func new{{$.Service.Name}}{{.Name}}HostFunction(service {{$.Service.Interface}})
// Read JSON request from plugin memory // Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0]) reqBytes, err := p.ReadBytes(stack[0])
if err != nil { if err != nil {
{{- if .Raw}}
{{$.Service.Name | lower}}WriteRawError(p, stack, err)
{{- else}}
{{$.Service.Name | lower}}WriteError(p, stack, err) {{$.Service.Name | lower}}WriteError(p, stack, err)
{{- end}}
return return
} }
var req {{requestType .}} var req {{requestType .}}
if err := json.Unmarshal(reqBytes, &req); err != nil { if err := json.Unmarshal(reqBytes, &req); err != nil {
{{- if .Raw}}
{{$.Service.Name | lower}}WriteRawError(p, stack, err)
{{- else}}
{{$.Service.Name | lower}}WriteError(p, stack, err) {{$.Service.Name | lower}}WriteError(p, stack, err)
{{- end}}
return return
} }
{{- end}} {{- end}}
// Call the service method // Call the service method
{{- if .Raw}} {{- if .HasReturns}}
{{range $i, $r := .Returns}}{{if $i}}, {{end}}{{lower $r.Name}}{{end}}, svcErr := service.{{.Name}}(ctx{{range .Params}}, req.{{title .Name}}{{end}})
if svcErr != nil {
{{$.Service.Name | lower}}WriteRawError(p, stack, svcErr)
return
}
// Write binary-framed response to plugin memory:
// [0x00][4-byte content-type length (big-endian)][content-type string][raw data]
ctBytes := []byte({{lower (index .Returns 0).Name}})
frame := make([]byte, 1+4+len(ctBytes)+len({{lower (index .Returns 1).Name}}))
frame[0] = 0x00 // success
binary.BigEndian.PutUint32(frame[1:5], uint32(len(ctBytes)))
copy(frame[5:5+len(ctBytes)], ctBytes)
copy(frame[5+len(ctBytes):], {{lower (index .Returns 1).Name}})
respPtr, err := p.WriteBytes(frame)
if err != nil {
stack[0] = 0
return
}
stack[0] = respPtr
{{- else if .HasReturns}}
{{- if .HasError}} {{- if .HasError}}
{{range $i, $r := .Returns}}{{if $i}}, {{end}}{{lower $r.Name}}{{end}}, svcErr := service.{{.Name}}(ctx{{range .Params}}, req.{{title .Name}}{{end}}) {{range $i, $r := .Returns}}{{if $i}}, {{end}}{{lower $r.Name}}{{end}}, svcErr := service.{{.Name}}(ctx{{range .Params}}, req.{{title .Name}}{{end}})
if svcErr != nil { if svcErr != nil {
@@ -162,16 +127,3 @@ func {{.Service.Name | lower}}WriteError(p *extism.CurrentPlugin, stack []uint64
respPtr, _ := p.WriteBytes(respBytes) respPtr, _ := p.WriteBytes(respBytes)
stack[0] = respPtr stack[0] = respPtr
} }
{{- if .Service.HasRawMethods}}
// {{.Service.Name | lower}}WriteRawError writes a binary-framed error response to plugin memory.
// Format: [0x01][UTF-8 error message]
func {{.Service.Name | lower}}WriteRawError(p *extism.CurrentPlugin, stack []uint64, err error) {
errMsg := []byte(err.Error())
frame := make([]byte, 1+len(errMsg))
frame[0] = 0x01 // error
copy(frame[1:], errMsg)
respPtr, _ := p.WriteBytes(frame)
stack[0] = respPtr
}
{{- end}}

View File

@@ -173,16 +173,6 @@ func (s Service) HasErrors() bool {
return false return false
} }
// HasRawMethods returns true if any method in the service uses raw binary framing.
func (s Service) HasRawMethods() bool {
for _, m := range s.Methods {
if m.Raw {
return true
}
}
return false
}
// Method represents a host function method within a service. // Method represents a host function method within a service.
type Method struct { type Method struct {
Name string // Go method name (e.g., "Call") Name string // Go method name (e.g., "Call")
@@ -191,7 +181,6 @@ type Method struct {
Returns []Param // Return values (excluding error) Returns []Param // Return values (excluding error)
HasError bool // Whether the method returns an error HasError bool // Whether the method returns an error
Doc string // Documentation comment for the method Doc string // Documentation comment for the method
Raw bool // If true, response uses binary framing instead of JSON
} }
// FunctionName returns the Extism host function export name. // FunctionName returns the Extism host function export name.
@@ -343,6 +332,52 @@ type Param struct {
JSONName string // JSON field name (camelCase) JSONName string // JSON field name (camelCase)
} }
// IsByteSlice returns true if the parameter type is []byte.
func (p Param) IsByteSlice() bool {
return p.Type == "[]byte"
}
// IsByteSlice returns true if the field type is []byte.
func (f FieldDef) IsByteSlice() bool {
return f.Type == "[]byte"
}
// HasByteFields returns true if any method params, returns, or struct fields use []byte.
func (s Service) HasByteFields() bool {
for _, m := range s.Methods {
for _, p := range m.Params {
if p.IsByteSlice() {
return true
}
}
for _, r := range m.Returns {
if r.IsByteSlice() {
return true
}
}
}
for _, st := range s.Structs {
for _, f := range st.Fields {
if f.IsByteSlice() {
return true
}
}
}
return false
}
// HasByteFields returns true if any capability struct fields use []byte.
func (c Capability) HasByteFields() bool {
for _, st := range c.Structs {
for _, f := range st.Fields {
if f.IsByteSlice() {
return true
}
}
}
return false
}
// NewParam creates a Param with auto-generated JSON name. // NewParam creates a Param with auto-generated JSON name.
func NewParam(name, typ string) Param { func NewParam(name, typ string) Param {
return Param{ return Param{

View File

@@ -12,6 +12,7 @@ from typing import Any
import extism import extism
import json import json
import base64
class HostFunctionError(Exception): class HostFunctionError(Exception):
@@ -38,7 +39,7 @@ def codec_encode(data: bytes) -> bytes:
HostFunctionError: If the host function returns an error. HostFunctionError: If the host function returns an error.
""" """
request = { request = {
"data": data, "data": base64.b64encode(data).decode("ascii"),
} }
request_bytes = json.dumps(request).encode("utf-8") request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes) request_mem = extism.memory.alloc(request_bytes)
@@ -49,4 +50,4 @@ def codec_encode(data: bytes) -> bytes:
if response.get("error"): if response.get("error"):
raise HostFunctionError(response["error"]) raise HostFunctionError(response["error"])
return response.get("result", b"") return base64.b64decode(response.get("result", ""))

View File

@@ -5,10 +5,34 @@
use extism_pdk::*; use extism_pdk::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
mod base64_bytes {
use serde::{self, Deserialize, Deserializer, Serializer};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
BASE64.decode(&s).map_err(serde::de::Error::custom)
}
}
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct CodecEncodeRequest { struct CodecEncodeRequest {
#[serde(with = "base64_bytes")]
data: Vec<u8>, data: Vec<u8>,
} }
@@ -16,6 +40,7 @@ struct CodecEncodeRequest {
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct CodecEncodeResponse { struct CodecEncodeResponse {
#[serde(default)] #[serde(default)]
#[serde(with = "base64_bytes")]
result: Vec<u8>, result: Vec<u8>,
#[serde(default)] #[serde(default)]
error: Option<String>, error: Option<String>,

View File

@@ -12,6 +12,7 @@ from typing import Any
import extism import extism
import json import json
import base64
class HostFunctionError(Exception): class HostFunctionError(Exception):
@@ -327,7 +328,7 @@ def comprehensive_byte_slice(data: bytes) -> bytes:
HostFunctionError: If the host function returns an error. HostFunctionError: If the host function returns an error.
""" """
request = { request = {
"data": data, "data": base64.b64encode(data).decode("ascii"),
} }
request_bytes = json.dumps(request).encode("utf-8") request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes) request_mem = extism.memory.alloc(request_bytes)
@@ -338,4 +339,4 @@ def comprehensive_byte_slice(data: bytes) -> bytes:
if response.get("error"): if response.get("error"):
raise HostFunctionError(response["error"]) raise HostFunctionError(response["error"])
return response.get("result", b"") return base64.b64decode(response.get("result", ""))

View File

@@ -5,6 +5,29 @@
use extism_pdk::*; use extism_pdk::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
mod base64_bytes {
use serde::{self, Deserialize, Deserializer, Serializer};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
BASE64.decode(&s).map_err(serde::de::Error::custom)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
@@ -144,6 +167,7 @@ struct ComprehensiveMultipleReturnsResponse {
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct ComprehensiveByteSliceRequest { struct ComprehensiveByteSliceRequest {
#[serde(with = "base64_bytes")]
data: Vec<u8>, data: Vec<u8>,
} }
@@ -151,6 +175,7 @@ struct ComprehensiveByteSliceRequest {
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct ComprehensiveByteSliceResponse { struct ComprehensiveByteSliceResponse {
#[serde(default)] #[serde(default)]
#[serde(with = "base64_bytes")]
result: Vec<u8>, result: Vec<u8>,
#[serde(default)] #[serde(default)]
error: Option<String>, error: Option<String>,

View File

@@ -1,66 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains client wrappers for the Stream host service.
// It is intended for use in Navidrome plugins built with TinyGo.
//
//go:build wasip1
package ndpdk
import (
"encoding/binary"
"encoding/json"
"errors"
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
)
// stream_getstream is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user stream_getstream
func stream_getstream(uint64) uint64
type streamGetStreamRequest struct {
Uri string `json:"uri"`
}
// StreamGetStream calls the stream_getstream host function.
// GetStream returns raw binary stream data with content type.
func StreamGetStream(uri string) (string, []byte, error) {
// Marshal request to JSON
req := streamGetStreamRequest{
Uri: uri,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return "", nil, err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := stream_getstream(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse binary-framed response
if len(responseBytes) == 0 {
return "", nil, errors.New("empty response from host")
}
if responseBytes[0] == 0x01 { // error
return "", nil, errors.New(string(responseBytes[1:]))
}
if responseBytes[0] != 0x00 {
return "", nil, errors.New("unknown response status")
}
if len(responseBytes) < 5 {
return "", nil, errors.New("malformed raw response: incomplete header")
}
ctLen := binary.BigEndian.Uint32(responseBytes[1:5])
if uint32(len(responseBytes)) < 5+ctLen {
return "", nil, errors.New("malformed raw response: content-type overflow")
}
return string(responseBytes[5 : 5+ctLen]), responseBytes[5+ctLen:], nil
}

View File

@@ -1,63 +0,0 @@
# Code generated by ndpgen. DO NOT EDIT.
#
# This file contains client wrappers for the Stream host service.
# It is intended for use in Navidrome plugins built with extism-py.
#
# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly.
# The @extism.import_fn decorators are only detected when defined in the plugin's
# main __init__.py file. Copy the needed functions from this file into your plugin.
from dataclasses import dataclass
from typing import Any, Tuple
import extism
import json
import struct
class HostFunctionError(Exception):
"""Raised when a host function returns an error."""
pass
@extism.import_fn("extism:host/user", "stream_getstream")
def _stream_getstream(offset: int) -> int:
"""Raw host function - do not call directly."""
...
def stream_get_stream(uri: str) -> Tuple[str, bytes]:
"""GetStream returns raw binary stream data with content type.
Args:
uri: str parameter.
Returns:
Tuple of (content_type, data) with the raw binary response.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"uri": uri,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _stream_getstream(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response_bytes = response_mem.bytes()
if len(response_bytes) == 0:
raise HostFunctionError("empty response from host")
if response_bytes[0] == 0x01:
raise HostFunctionError(response_bytes[1:].decode("utf-8"))
if response_bytes[0] != 0x00:
raise HostFunctionError("unknown response status")
if len(response_bytes) < 5:
raise HostFunctionError("malformed raw response: incomplete header")
ct_len = struct.unpack(">I", response_bytes[1:5])[0]
if len(response_bytes) < 5 + ct_len:
raise HostFunctionError("malformed raw response: content-type overflow")
content_type = response_bytes[5:5 + ct_len].decode("utf-8")
data = response_bytes[5 + ct_len:]
return content_type, data

View File

@@ -1,73 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains client wrappers for the Stream host service.
// It is intended for use in Navidrome plugins built with extism-pdk.
use extism_pdk::*;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct StreamGetStreamRequest {
uri: String,
}
#[host_fn]
extern "ExtismHost" {
}
#[link(wasm_import_module = "extism:host/user")]
extern "C" {
fn stream_getstream(offset: u64) -> u64;
}
/// GetStream returns raw binary stream data with content type.
///
/// # Arguments
/// * `uri` - String parameter.
///
/// # Returns
/// A tuple of (content_type, data) with the raw binary response.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn get_stream(uri: &str) -> Result<(String, Vec<u8>), Error> {
let req = StreamGetStreamRequest {
uri: uri.to_owned(),
};
let input_bytes = serde_json::to_vec(&req).map_err(|e| Error::msg(e.to_string()))?;
let input_mem = Memory::from_bytes(&input_bytes).map_err(|e| Error::msg(e.to_string()))?;
let response_offset = unsafe { stream_getstream(input_mem.offset()) };
let response_mem = Memory::find(response_offset)
.ok_or_else(|| Error::msg("empty response from host"))?;
let response_bytes = response_mem.to_vec();
if response_bytes.is_empty() {
return Err(Error::msg("empty response from host"));
}
if response_bytes[0] == 0x01 {
let msg = String::from_utf8_lossy(&response_bytes[1..]).to_string();
return Err(Error::msg(msg));
}
if response_bytes[0] != 0x00 {
return Err(Error::msg("unknown response status"));
}
if response_bytes.len() < 5 {
return Err(Error::msg("malformed raw response: incomplete header"));
}
let ct_len = u32::from_be_bytes([
response_bytes[1],
response_bytes[2],
response_bytes[3],
response_bytes[4],
]) as usize;
if ct_len > response_bytes.len() - 5 {
return Err(Error::msg("malformed raw response: content-type overflow"));
}
let ct_end = 5 + ct_len;
let content_type = String::from_utf8_lossy(&response_bytes[5..ct_end]).to_string();
let data = response_bytes[ct_end..].to_vec();
Ok((content_type, data))
}

View File

@@ -1,10 +0,0 @@
package testpkg
import "context"
//nd:hostservice name=Stream permission=stream
type StreamService interface {
// GetStream returns raw binary stream data with content type.
//nd:hostfunc raw=true
GetStream(ctx context.Context, uri string) (contentType string, data []byte, err error)
}

View File

@@ -17,8 +17,8 @@ type SubsonicAPIService interface {
Call(ctx context.Context, uri string) (responseJSON string, err error) Call(ctx context.Context, uri string) (responseJSON string, err error)
// CallRaw executes a Subsonic API request and returns the raw binary response. // CallRaw executes a Subsonic API request and returns the raw binary response.
// Optimized for binary endpoints like getCoverArt and stream that return // Designed for binary endpoints like getCoverArt and stream that return
// non-JSON data. The response is returned as raw bytes without JSON encoding overhead. // non-JSON data. The data is base64-encoded over JSON on the wire.
//nd:hostfunc raw=true //nd:hostfunc
CallRaw(ctx context.Context, uri string) (contentType string, data []byte, err error) CallRaw(ctx context.Context, uri string) (contentType string, data []byte, err error)
} }

View File

@@ -4,7 +4,6 @@ package host
import ( import (
"context" "context"
"encoding/binary"
"encoding/json" "encoding/json"
extism "github.com/extism/go-sdk" extism "github.com/extism/go-sdk"
@@ -26,6 +25,13 @@ type SubsonicAPICallRawRequest struct {
Uri string `json:"uri"` Uri string `json:"uri"`
} }
// SubsonicAPICallRawResponse is the response type for SubsonicAPI.CallRaw.
type SubsonicAPICallRawResponse struct {
ContentType string `json:"contentType,omitempty"`
Data []byte `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
// RegisterSubsonicAPIHostFunctions registers SubsonicAPI service host functions. // RegisterSubsonicAPIHostFunctions registers SubsonicAPI service host functions.
// The returned host functions should be added to the plugin's configuration. // The returned host functions should be added to the plugin's configuration.
func RegisterSubsonicAPIHostFunctions(service SubsonicAPIService) []extism.HostFunction { func RegisterSubsonicAPIHostFunctions(service SubsonicAPIService) []extism.HostFunction {
@@ -76,37 +82,28 @@ func newSubsonicAPICallRawHostFunction(service SubsonicAPIService) extism.HostFu
// Read JSON request from plugin memory // Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0]) reqBytes, err := p.ReadBytes(stack[0])
if err != nil { if err != nil {
subsonicapiWriteRawError(p, stack, err) subsonicapiWriteError(p, stack, err)
return return
} }
var req SubsonicAPICallRawRequest var req SubsonicAPICallRawRequest
if err := json.Unmarshal(reqBytes, &req); err != nil { if err := json.Unmarshal(reqBytes, &req); err != nil {
subsonicapiWriteRawError(p, stack, err) subsonicapiWriteError(p, stack, err)
return return
} }
// Call the service method // Call the service method
contenttype, data, svcErr := service.CallRaw(ctx, req.Uri) contenttype, data, svcErr := service.CallRaw(ctx, req.Uri)
if svcErr != nil { if svcErr != nil {
subsonicapiWriteRawError(p, stack, svcErr) subsonicapiWriteError(p, stack, svcErr)
return return
} }
// Write binary-framed response to plugin memory: // Write JSON response to plugin memory
// [0x00][4-byte content-type length (big-endian)][content-type string][raw data] resp := SubsonicAPICallRawResponse{
ctBytes := []byte(contenttype) ContentType: contenttype,
frame := make([]byte, 1+4+len(ctBytes)+len(data)) Data: data,
frame[0] = 0x00 // success
binary.BigEndian.PutUint32(frame[1:5], uint32(len(ctBytes)))
copy(frame[5:5+len(ctBytes)], ctBytes)
copy(frame[5+len(ctBytes):], data)
respPtr, err := p.WriteBytes(frame)
if err != nil {
stack[0] = 0
return
} }
stack[0] = respPtr subsonicapiWriteResponse(p, stack, resp)
}, },
[]extism.ValueType{extism.ValueTypePTR}, []extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR}, []extism.ValueType{extism.ValueTypePTR},
@@ -137,14 +134,3 @@ func subsonicapiWriteError(p *extism.CurrentPlugin, stack []uint64, err error) {
respPtr, _ := p.WriteBytes(respBytes) respPtr, _ := p.WriteBytes(respBytes)
stack[0] = respPtr stack[0] = respPtr
} }
// subsonicapiWriteRawError writes a binary-framed error response to plugin memory.
// Format: [0x01][UTF-8 error message]
func subsonicapiWriteRawError(p *extism.CurrentPlugin, stack []uint64, err error) {
errMsg := []byte(err.Error())
frame := make([]byte, 1+len(errMsg))
frame[0] = 0x01 // error
copy(frame[1:], errMsg)
respPtr, _ := p.WriteBytes(frame)
stack[0] = respPtr
}

68
plugins/host/task.go Normal file
View File

@@ -0,0 +1,68 @@
package host
import "context"
// TaskInfo holds the current state of a task.
type TaskInfo struct {
// Status is the current task status: "pending", "running",
// "completed", "failed", or "cancelled".
Status string `json:"status"`
// Message is the status/result message returned by the plugin callback.
Message string `json:"message"`
// Attempt is the current or last attempt number (1-based).
Attempt int32 `json:"attempt"`
}
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
// Concurrency is the max number of parallel workers. Default: 1.
// Capped by the plugin's manifest maxConcurrency.
Concurrency int32 `json:"concurrency"`
// MaxRetries is the number of times to retry a failed task. Default: 0.
MaxRetries int32 `json:"maxRetries"`
// BackoffMs is the initial backoff between retries in milliseconds.
// Doubles each retry (exponential: backoffMs * 2^(attempt-1)). Default: 1000.
BackoffMs int64 `json:"backoffMs"`
// DelayMs is the minimum delay between starting consecutive tasks
// in milliseconds. Useful for rate limiting. Default: 0.
DelayMs int64 `json:"delayMs"`
// RetentionMs is how long completed/failed/cancelled tasks are kept
// in milliseconds. Default: 3600000 (1h). Min: 60000 (1m). Max: 604800000 (1w).
RetentionMs int64 `json:"retentionMs"`
}
// TaskService provides persistent task queues for plugins.
//
// This service allows plugins to create named queues with configurable concurrency,
// retry policies, and rate limiting. Tasks are persisted to SQLite and survive
// server restarts. When a task is ready to execute, the host calls the plugin's
// nd_task_execute callback function.
//
//nd:hostservice name=Task permission=taskqueue
type TaskService interface {
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
//nd:hostfunc
CreateQueue(ctx context.Context, name string, config QueueConfig) error
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
//nd:hostfunc
Enqueue(ctx context.Context, queueName string, payload []byte) (string, error)
// Get returns the current state of a task including its status,
// message, and attempt count.
//nd:hostfunc
Get(ctx context.Context, taskID string) (*TaskInfo, error)
// Cancel cancels a pending task. Returns error if already
// running, completed, or failed.
//nd:hostfunc
Cancel(ctx context.Context, taskID string) error
}

220
plugins/host/task_gen.go Normal file
View File

@@ -0,0 +1,220 @@
// Code generated by ndpgen. DO NOT EDIT.
package host
import (
"context"
"encoding/json"
extism "github.com/extism/go-sdk"
)
// TaskCreateQueueRequest is the request type for Task.CreateQueue.
type TaskCreateQueueRequest struct {
Name string `json:"name"`
Config QueueConfig `json:"config"`
}
// TaskCreateQueueResponse is the response type for Task.CreateQueue.
type TaskCreateQueueResponse struct {
Error string `json:"error,omitempty"`
}
// TaskEnqueueRequest is the request type for Task.Enqueue.
type TaskEnqueueRequest struct {
QueueName string `json:"queueName"`
Payload []byte `json:"payload"`
}
// TaskEnqueueResponse is the response type for Task.Enqueue.
type TaskEnqueueResponse struct {
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
// TaskGetRequest is the request type for Task.Get.
type TaskGetRequest struct {
TaskID string `json:"taskId"`
}
// TaskGetResponse is the response type for Task.Get.
type TaskGetResponse struct {
Result *TaskInfo `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
// TaskCancelRequest is the request type for Task.Cancel.
type TaskCancelRequest struct {
TaskID string `json:"taskId"`
}
// TaskCancelResponse is the response type for Task.Cancel.
type TaskCancelResponse struct {
Error string `json:"error,omitempty"`
}
// RegisterTaskHostFunctions registers Task service host functions.
// The returned host functions should be added to the plugin's configuration.
func RegisterTaskHostFunctions(service TaskService) []extism.HostFunction {
return []extism.HostFunction{
newTaskCreateQueueHostFunction(service),
newTaskEnqueueHostFunction(service),
newTaskGetHostFunction(service),
newTaskCancelHostFunction(service),
}
}
func newTaskCreateQueueHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_createqueue",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskCreateQueueRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
if svcErr := service.CreateQueue(ctx, req.Name, req.Config); svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskCreateQueueResponse{}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskEnqueueHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_enqueue",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskEnqueueRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
result, svcErr := service.Enqueue(ctx, req.QueueName, req.Payload)
if svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskEnqueueResponse{
Result: result,
}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskGetHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_get",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskGetRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
result, svcErr := service.Get(ctx, req.TaskID)
if svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskGetResponse{
Result: result,
}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskCancelHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_cancel",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskCancelRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
if svcErr := service.Cancel(ctx, req.TaskID); svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskCancelResponse{}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
// taskWriteResponse writes a JSON response to plugin memory.
func taskWriteResponse(p *extism.CurrentPlugin, stack []uint64, resp any) {
respBytes, err := json.Marshal(resp)
if err != nil {
taskWriteError(p, stack, err)
return
}
respPtr, err := p.WriteBytes(respBytes)
if err != nil {
stack[0] = 0
return
}
stack[0] = respPtr
}
// taskWriteError writes an error response to plugin memory.
func taskWriteError(p *extism.CurrentPlugin, stack []uint64, err error) {
errResp := struct {
Error string `json:"error"`
}{Error: err.Error()}
respBytes, _ := json.Marshal(errResp)
respPtr, _ := p.WriteBytes(respBytes)
stack[0] = respPtr
}

View File

@@ -188,12 +188,6 @@ func (s *schedulerServiceImpl) invokeCallback(ctx context.Context, scheduleID st
return return
} }
// Check if plugin has the scheduler capability
if !hasCapability(instance.capabilities, CapabilityScheduler) {
log.Warn(ctx, "Plugin does not have scheduler capability", "plugin", s.pluginName, "scheduleID", scheduleID)
return
}
// Prepare callback input // Prepare callback input
input := capabilities.SchedulerCallbackRequest{ input := capabilities.SchedulerCallbackRequest{
ScheduleID: scheduleID, ScheduleID: scheduleID,

566
plugins/host_taskqueue.go Normal file
View File

@@ -0,0 +1,566 @@
package plugins
import (
"context"
"database/sql"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
_ "github.com/mattn/go-sqlite3"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model/id"
"github.com/navidrome/navidrome/plugins/capabilities"
"github.com/navidrome/navidrome/plugins/host"
"golang.org/x/time/rate"
)
const (
defaultConcurrency int32 = 1
defaultBackoffMs int64 = 1000
defaultRetentionMs int64 = 3_600_000 // 1 hour
minRetentionMs int64 = 60_000 // 1 minute
maxRetentionMs int64 = 604_800_000 // 1 week
maxQueueNameLength = 128
maxPayloadSize = 1 * 1024 * 1024 // 1MB
maxBackoffMs int64 = 3_600_000 // 1 hour
cleanupInterval = 5 * time.Minute
pollInterval = 5 * time.Second
shutdownTimeout = 10 * time.Second
taskStatusPending = "pending"
taskStatusRunning = "running"
taskStatusCompleted = "completed"
taskStatusFailed = "failed"
taskStatusCancelled = "cancelled"
)
// CapabilityTaskWorker indicates the plugin can receive task execution callbacks.
const CapabilityTaskWorker Capability = "TaskWorker"
const FuncTaskWorkerCallback = "nd_task_execute"
func init() {
registerCapability(CapabilityTaskWorker, FuncTaskWorkerCallback)
}
type queueState struct {
config host.QueueConfig
signal chan struct{}
limiter *rate.Limiter
}
// notifyWorkers sends a non-blocking signal to wake up queue workers.
func (qs *queueState) notifyWorkers() {
select {
case qs.signal <- struct{}{}:
default:
}
}
// taskQueueServiceImpl implements host.TaskQueueService with SQLite persistence
// and background worker goroutines for task execution.
type taskQueueServiceImpl struct {
pluginName string
manager *Manager
maxConcurrency int32
db *sql.DB
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.Mutex
queues map[string]*queueState
// For testing: override how callbacks are invoked
invokeCallbackFn func(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error)
}
// newTaskQueueService creates a new taskQueueServiceImpl with its own SQLite database.
func newTaskQueueService(pluginName string, manager *Manager, maxConcurrency int32) (*taskQueueServiceImpl, error) {
dataDir := filepath.Join(conf.Server.DataFolder, "plugins", pluginName)
if err := os.MkdirAll(dataDir, 0700); err != nil {
return nil, fmt.Errorf("creating plugin data directory: %w", err)
}
dbPath := filepath.Join(dataDir, "taskqueue.db")
db, err := sql.Open("sqlite3", dbPath+"?_busy_timeout=5000&_journal_mode=WAL&_foreign_keys=off")
if err != nil {
return nil, fmt.Errorf("opening taskqueue database: %w", err)
}
db.SetMaxOpenConns(3)
db.SetMaxIdleConns(1)
if err := createTaskQueueSchema(db); err != nil {
db.Close()
return nil, fmt.Errorf("creating taskqueue schema: %w", err)
}
ctx, cancel := context.WithCancel(manager.ctx)
s := &taskQueueServiceImpl{
pluginName: pluginName,
manager: manager,
maxConcurrency: maxConcurrency,
db: db,
ctx: ctx,
cancel: cancel,
queues: make(map[string]*queueState),
}
s.invokeCallbackFn = s.defaultInvokeCallback
s.wg.Go(s.cleanupLoop)
log.Debug("Initialized plugin taskqueue", "plugin", pluginName, "path", dbPath, "maxConcurrency", maxConcurrency)
return s, nil
}
func createTaskQueueSchema(db *sql.DB) error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS queues (
name TEXT PRIMARY KEY,
concurrency INTEGER NOT NULL DEFAULT 1,
max_retries INTEGER NOT NULL DEFAULT 0,
backoff_ms INTEGER NOT NULL DEFAULT 1000,
delay_ms INTEGER NOT NULL DEFAULT 0,
retention_ms INTEGER NOT NULL DEFAULT 3600000
);
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
queue_name TEXT NOT NULL REFERENCES queues(name),
payload BLOB NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
attempt INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL,
next_run_at INTEGER NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
message TEXT NOT NULL DEFAULT ''
);
CREATE INDEX IF NOT EXISTS idx_tasks_dequeue ON tasks(queue_name, status, next_run_at);
`)
return err
}
// applyConfigDefaults fills zero-value config fields with sensible defaults
// and clamps values to valid ranges, logging warnings for clamped values.
func (s *taskQueueServiceImpl) applyConfigDefaults(ctx context.Context, name string, config *host.QueueConfig) {
if config.Concurrency <= 0 {
config.Concurrency = defaultConcurrency
}
if config.BackoffMs <= 0 {
config.BackoffMs = defaultBackoffMs
}
if config.RetentionMs <= 0 {
config.RetentionMs = defaultRetentionMs
}
if config.RetentionMs < minRetentionMs {
log.Warn(ctx, "TaskQueue retention clamped to minimum", "plugin", s.pluginName, "queue", name,
"requested", config.RetentionMs, "min", minRetentionMs)
config.RetentionMs = minRetentionMs
}
if config.RetentionMs > maxRetentionMs {
log.Warn(ctx, "TaskQueue retention clamped to maximum", "plugin", s.pluginName, "queue", name,
"requested", config.RetentionMs, "max", maxRetentionMs)
config.RetentionMs = maxRetentionMs
}
}
// clampConcurrency reduces config.Concurrency if it exceeds the remaining budget.
// Returns an error when the concurrency budget is fully exhausted.
// Must be called with s.mu held.
func (s *taskQueueServiceImpl) clampConcurrency(ctx context.Context, name string, config *host.QueueConfig) error {
var allocated int32
for _, qs := range s.queues {
allocated += qs.config.Concurrency
}
available := s.maxConcurrency - allocated
if available <= 0 {
log.Warn(ctx, "TaskQueue concurrency budget exhausted", "plugin", s.pluginName, "queue", name,
"allocated", allocated, "maxConcurrency", s.maxConcurrency)
return fmt.Errorf("concurrency budget exhausted (%d/%d allocated)", allocated, s.maxConcurrency)
}
if config.Concurrency > available {
log.Warn(ctx, "TaskQueue concurrency clamped", "plugin", s.pluginName, "queue", name,
"requested", config.Concurrency, "available", available, "maxConcurrency", s.maxConcurrency)
config.Concurrency = available
}
return nil
}
func (s *taskQueueServiceImpl) CreateQueue(ctx context.Context, name string, config host.QueueConfig) error {
if len(name) == 0 {
return fmt.Errorf("queue name cannot be empty")
}
if len(name) > maxQueueNameLength {
return fmt.Errorf("queue name exceeds maximum length of %d bytes", maxQueueNameLength)
}
s.applyConfigDefaults(ctx, name, &config)
s.mu.Lock()
defer s.mu.Unlock()
if err := s.clampConcurrency(ctx, name, &config); err != nil {
return err
}
if _, exists := s.queues[name]; exists {
return fmt.Errorf("queue %q already exists", name)
}
// Upsert into queues table (idempotent across restarts)
_, err := s.db.ExecContext(ctx, `
INSERT INTO queues (name, concurrency, max_retries, backoff_ms, delay_ms, retention_ms)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(name) DO UPDATE SET
concurrency = excluded.concurrency,
max_retries = excluded.max_retries,
backoff_ms = excluded.backoff_ms,
delay_ms = excluded.delay_ms,
retention_ms = excluded.retention_ms
`, name, config.Concurrency, config.MaxRetries, config.BackoffMs, config.DelayMs, config.RetentionMs)
if err != nil {
return fmt.Errorf("creating queue: %w", err)
}
// Reset stale running tasks from previous crash
now := time.Now().UnixMilli()
_, err = s.db.ExecContext(ctx, `
UPDATE tasks SET status = ?, updated_at = ? WHERE queue_name = ? AND status = ?
`, taskStatusPending, now, name, taskStatusRunning)
if err != nil {
return fmt.Errorf("resetting stale tasks: %w", err)
}
qs := &queueState{
config: config,
signal: make(chan struct{}, 1),
}
if config.DelayMs > 0 {
// Rate limit dispatches to enforce delay between tasks.
// Burst of 1 allows one immediate dispatch, then enforces the delay interval.
qs.limiter = rate.NewLimiter(rate.Every(time.Duration(config.DelayMs)*time.Millisecond), 1)
}
s.queues[name] = qs
for i := int32(0); i < config.Concurrency; i++ {
s.wg.Go(func() { s.worker(name, qs) })
}
log.Debug(ctx, "Created task queue", "plugin", s.pluginName, "queue", name,
"concurrency", config.Concurrency, "maxRetries", config.MaxRetries,
"backoffMs", config.BackoffMs, "delayMs", config.DelayMs, "retentionMs", config.RetentionMs)
return nil
}
func (s *taskQueueServiceImpl) Enqueue(ctx context.Context, queueName string, payload []byte) (string, error) {
s.mu.Lock()
qs, exists := s.queues[queueName]
s.mu.Unlock()
if !exists {
return "", fmt.Errorf("queue %q does not exist", queueName)
}
if len(payload) > maxPayloadSize {
return "", fmt.Errorf("payload size %d exceeds maximum of %d bytes", len(payload), maxPayloadSize)
}
taskID := id.NewRandom()
now := time.Now().UnixMilli()
_, err := s.db.ExecContext(ctx, `
INSERT INTO tasks (id, queue_name, payload, status, attempt, max_retries, next_run_at, created_at, updated_at)
VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?)
`, taskID, queueName, payload, taskStatusPending, qs.config.MaxRetries, now, now, now)
if err != nil {
return "", fmt.Errorf("enqueuing task: %w", err)
}
qs.notifyWorkers()
log.Trace(ctx, "Enqueued task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
return taskID, nil
}
// Get returns the current state of a task.
func (s *taskQueueServiceImpl) Get(ctx context.Context, taskID string) (*host.TaskInfo, error) {
var info host.TaskInfo
err := s.db.QueryRowContext(ctx, `SELECT status, message, attempt FROM tasks WHERE id = ?`, taskID).
Scan(&info.Status, &info.Message, &info.Attempt)
if errors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("task %q not found", taskID)
}
if err != nil {
return nil, fmt.Errorf("getting task info: %w", err)
}
return &info, nil
}
// Cancel cancels a pending task.
func (s *taskQueueServiceImpl) Cancel(ctx context.Context, taskID string) error {
now := time.Now().UnixMilli()
result, err := s.db.ExecContext(ctx, `
UPDATE tasks SET status = ?, updated_at = ? WHERE id = ? AND status = ?
`, taskStatusCancelled, now, taskID, taskStatusPending)
if err != nil {
return fmt.Errorf("cancelling task: %w", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("checking cancel result: %w", err)
}
if rowsAffected == 0 {
// Check if task exists at all
var status string
err := s.db.QueryRowContext(ctx, `SELECT status FROM tasks WHERE id = ?`, taskID).Scan(&status)
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("task %q not found", taskID)
}
if err != nil {
return fmt.Errorf("checking task existence: %w", err)
}
return fmt.Errorf("task %q cannot be cancelled (status: %s)", taskID, status)
}
log.Trace(ctx, "Cancelled task", "plugin", s.pluginName, "taskID", taskID)
return nil
}
// worker is the main loop for a single worker goroutine.
func (s *taskQueueServiceImpl) worker(queueName string, qs *queueState) {
// Process any existing pending tasks immediately on startup
s.drainQueue(queueName, qs)
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-qs.signal:
s.drainQueue(queueName, qs)
case <-ticker.C:
s.drainQueue(queueName, qs)
}
}
}
func (s *taskQueueServiceImpl) drainQueue(queueName string, qs *queueState) {
for s.ctx.Err() == nil && s.processTask(queueName, qs) {
}
}
// processTask dequeues and processes a single task. Returns true if a task was processed.
func (s *taskQueueServiceImpl) processTask(queueName string, qs *queueState) bool {
now := time.Now().UnixMilli()
// Atomically dequeue a task
var taskID string
var payload []byte
var attempt, maxRetries int32
err := s.db.QueryRowContext(s.ctx, `
UPDATE tasks SET status = ?, attempt = attempt + 1, updated_at = ?
WHERE id = (
SELECT id FROM tasks
WHERE queue_name = ? AND status = ? AND next_run_at <= ?
ORDER BY next_run_at, created_at LIMIT 1
)
RETURNING id, payload, attempt, max_retries
`, taskStatusRunning, now, queueName, taskStatusPending, now).Scan(&taskID, &payload, &attempt, &maxRetries)
if errors.Is(err, sql.ErrNoRows) {
return false
}
if err != nil {
log.Error(s.ctx, "Failed to dequeue task", "plugin", s.pluginName, "queue", queueName, err)
return false
}
// Enforce delay between task dispatches using a rate limiter.
// This is done after dequeue so that empty polls don't consume rate tokens.
if qs.limiter != nil {
if err := qs.limiter.Wait(s.ctx); err != nil {
// Context cancelled during wait — revert task to pending for recovery
s.revertTaskToPending(taskID)
return false
}
}
// Invoke callback
log.Debug(s.ctx, "Executing task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID, "attempt", attempt)
message, callbackErr := s.invokeCallbackFn(s.ctx, queueName, taskID, payload, attempt)
// If context was cancelled (shutdown), revert task to pending for recovery
if s.ctx.Err() != nil {
s.revertTaskToPending(taskID)
return false
}
if callbackErr == nil {
s.completeTask(queueName, taskID, message)
} else {
s.handleTaskFailure(queueName, taskID, attempt, maxRetries, qs, callbackErr, message)
}
return true
}
func (s *taskQueueServiceImpl) completeTask(queueName, taskID, message string) {
now := time.Now().UnixMilli()
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusCompleted, message, now, taskID); err != nil {
log.Error(s.ctx, "Failed to mark task as completed", "plugin", s.pluginName, "taskID", taskID, err)
}
log.Debug(s.ctx, "Task completed", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
}
func (s *taskQueueServiceImpl) handleTaskFailure(queueName, taskID string, attempt, maxRetries int32, qs *queueState, callbackErr error, message string) {
log.Warn(s.ctx, "Task execution failed", "plugin", s.pluginName, "queue", queueName,
"taskID", taskID, "attempt", attempt, "maxRetries", maxRetries, "err", callbackErr)
// Use error message as fallback if no message was provided
if message == "" {
message = callbackErr.Error()
}
now := time.Now().UnixMilli()
if attempt > maxRetries {
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusFailed, message, now, taskID); err != nil {
log.Error(s.ctx, "Failed to mark task as failed", "plugin", s.pluginName, "taskID", taskID, err)
}
log.Warn(s.ctx, "Task failed after all retries", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
return
}
// Exponential backoff: backoffMs * 2^(attempt-1)
backoff := qs.config.BackoffMs << (attempt - 1)
if backoff <= 0 || backoff > maxBackoffMs {
backoff = maxBackoffMs
}
nextRunAt := now + backoff
if _, err := s.db.ExecContext(s.ctx, `
UPDATE tasks SET status = ?, next_run_at = ?, updated_at = ? WHERE id = ?
`, taskStatusPending, nextRunAt, now, taskID); err != nil {
log.Error(s.ctx, "Failed to reschedule task for retry", "plugin", s.pluginName, "taskID", taskID, err)
}
// Wake worker after backoff expires
time.AfterFunc(time.Duration(backoff)*time.Millisecond, func() {
qs.notifyWorkers()
})
}
// revertTaskToPending puts a running task back to pending status and decrements the attempt
// counter (used during shutdown to ensure the interrupted attempt doesn't count).
func (s *taskQueueServiceImpl) revertTaskToPending(taskID string) {
now := time.Now().UnixMilli()
_, err := s.db.Exec(`UPDATE tasks SET status = ?, attempt = MAX(attempt - 1, 0), updated_at = ? WHERE id = ? AND status = ?`, taskStatusPending, now, taskID, taskStatusRunning)
if err != nil {
log.Error("Failed to revert task to pending", "plugin", s.pluginName, "taskID", taskID, err)
}
}
// defaultInvokeCallback calls the plugin's nd_task_execute function.
func (s *taskQueueServiceImpl) defaultInvokeCallback(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error) {
s.manager.mu.RLock()
p, ok := s.manager.plugins[s.pluginName]
s.manager.mu.RUnlock()
if !ok {
return "", fmt.Errorf("plugin %s not loaded", s.pluginName)
}
input := capabilities.TaskExecuteRequest{
QueueName: queueName,
TaskID: taskID,
Payload: payload,
Attempt: attempt,
}
message, err := callPluginFunction[capabilities.TaskExecuteRequest, string](ctx, p, FuncTaskWorkerCallback, input)
if err != nil {
return "", err
}
return message, nil
}
// cleanupLoop periodically removes terminal tasks past their retention period.
func (s *taskQueueServiceImpl) cleanupLoop() {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.runCleanup()
}
}
}
// runCleanup deletes terminal tasks past their retention period.
func (s *taskQueueServiceImpl) runCleanup() {
s.mu.Lock()
queues := make(map[string]*queueState, len(s.queues))
for k, v := range s.queues {
queues[k] = v
}
s.mu.Unlock()
now := time.Now().UnixMilli()
for name, qs := range queues {
result, err := s.db.ExecContext(s.ctx, `
DELETE FROM tasks WHERE queue_name = ? AND status IN (?, ?, ?) AND updated_at + ? < ?
`, name, taskStatusCompleted, taskStatusFailed, taskStatusCancelled, qs.config.RetentionMs, now)
if err != nil {
log.Error(s.ctx, "Failed to cleanup tasks", "plugin", s.pluginName, "queue", name, err)
continue
}
if deleted, _ := result.RowsAffected(); deleted > 0 {
log.Debug(s.ctx, "Cleaned up terminal tasks", "plugin", s.pluginName, "queue", name, "deleted", deleted)
}
}
}
// Close shuts down the task queue service, stopping all workers and closing the database.
func (s *taskQueueServiceImpl) Close() error {
// Cancel context to signal all goroutines
s.cancel()
// Wait for goroutines with timeout
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(shutdownTimeout):
log.Warn("TaskQueue shutdown timed out", "plugin", s.pluginName)
}
// Mark running tasks as pending for recovery on next startup
if s.db != nil {
now := time.Now().UnixMilli()
if _, err := s.db.Exec(`UPDATE tasks SET status = ?, updated_at = ? WHERE status = ?`, taskStatusPending, now, taskStatusRunning); err != nil {
log.Error("Failed to reset running tasks on shutdown", "plugin", s.pluginName, err)
}
log.Debug("Closing plugin taskqueue", "plugin", s.pluginName)
return s.db.Close()
}
return nil
}
// Compile-time verification
var _ host.TaskService = (*taskQueueServiceImpl)(nil)
var _ io.Closer = (*taskQueueServiceImpl)(nil)

View File

File diff suppressed because it is too large Load Diff

View File

@@ -128,6 +128,23 @@ var hostServices = []hostServiceEntry{
return host.RegisterHTTPHostFunctions(service), nil return host.RegisterHTTPHostFunctions(service), nil
}, },
}, },
{
name: "Task",
hasPermission: func(p *Permissions) bool { return p != nil && p.Taskqueue != nil },
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
perm := ctx.permissions.Taskqueue
maxConcurrency := int32(1)
if perm.MaxConcurrency > 0 {
maxConcurrency = int32(perm.MaxConcurrency)
}
service, err := newTaskQueueService(ctx.pluginName, ctx.manager, maxConcurrency)
if err != nil {
log.Error("Failed to create Task service", "plugin", ctx.pluginName, err)
return nil, nil
}
return host.RegisterTaskHostFunctions(service), service
},
},
} }
// extractManifest reads manifest from an .ndp package and computes its SHA-256 hash. // extractManifest reads manifest from an .ndp package and computes its SHA-256 hash.

View File

@@ -110,6 +110,9 @@
}, },
"users": { "users": {
"$ref": "#/$defs/UsersPermission" "$ref": "#/$defs/UsersPermission"
},
"taskqueue": {
"$ref": "#/$defs/TaskQueuePermission"
} }
} }
}, },
@@ -224,6 +227,23 @@
} }
} }
}, },
"TaskQueuePermission": {
"type": "object",
"description": "Task queue permissions for background task processing",
"additionalProperties": false,
"properties": {
"reason": {
"type": "string",
"description": "Explanation for why task queue access is needed"
},
"maxConcurrency": {
"type": "integer",
"description": "Maximum total concurrent workers across all queues. Default: 1",
"minimum": 1,
"default": 1
}
}
},
"UsersPermission": { "UsersPermission": {
"type": "object", "type": "object",
"description": "Users service permissions for accessing user information", "description": "Users service permissions for accessing user information",

View File

@@ -64,6 +64,21 @@ func ValidateWithCapabilities(m *Manifest, capabilities []Capability) error {
return fmt.Errorf("scrobbler capability requires 'users' permission to be declared in manifest") return fmt.Errorf("scrobbler capability requires 'users' permission to be declared in manifest")
} }
} }
// Scheduler permission requires SchedulerCallback capability
if m.Permissions != nil && m.Permissions.Scheduler != nil {
if !hasCapability(capabilities, CapabilityScheduler) {
return fmt.Errorf("'scheduler' permission requires plugin to export '%s' function", FuncSchedulerCallback)
}
}
// Task (taskqueue) permission requires TaskWorker capability
if m.Permissions != nil && m.Permissions.Taskqueue != nil {
if !hasCapability(capabilities, CapabilityTaskWorker) {
return fmt.Errorf("'taskqueue' permission requires plugin to export '%s' function", FuncTaskWorkerCallback)
}
}
return nil return nil
} }

View File

@@ -181,6 +181,9 @@ type Permissions struct {
// Subsonicapi corresponds to the JSON schema field "subsonicapi". // Subsonicapi corresponds to the JSON schema field "subsonicapi".
Subsonicapi *SubsonicAPIPermission `json:"subsonicapi,omitempty" yaml:"subsonicapi,omitempty" mapstructure:"subsonicapi,omitempty"` Subsonicapi *SubsonicAPIPermission `json:"subsonicapi,omitempty" yaml:"subsonicapi,omitempty" mapstructure:"subsonicapi,omitempty"`
// Taskqueue corresponds to the JSON schema field "taskqueue".
Taskqueue *TaskQueuePermission `json:"taskqueue,omitempty" yaml:"taskqueue,omitempty" mapstructure:"taskqueue,omitempty"`
// Users corresponds to the JSON schema field "users". // Users corresponds to the JSON schema field "users".
Users *UsersPermission `json:"users,omitempty" yaml:"users,omitempty" mapstructure:"users,omitempty"` Users *UsersPermission `json:"users,omitempty" yaml:"users,omitempty" mapstructure:"users,omitempty"`
@@ -200,6 +203,36 @@ type SubsonicAPIPermission struct {
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"` Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
} }
// Task queue permissions for background task processing
type TaskQueuePermission struct {
// Maximum total concurrent workers across all queues. Default: 1
MaxConcurrency int `json:"maxConcurrency,omitempty" yaml:"maxConcurrency,omitempty" mapstructure:"maxConcurrency,omitempty"`
// Explanation for why task queue access is needed
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *TaskQueuePermission) UnmarshalJSON(value []byte) error {
var raw map[string]interface{}
if err := json.Unmarshal(value, &raw); err != nil {
return err
}
type Plain TaskQueuePermission
var plain Plain
if err := json.Unmarshal(value, &plain); err != nil {
return err
}
if v, ok := raw["maxConcurrency"]; !ok || v == nil {
plain.MaxConcurrency = 1.0
}
if 1 > plain.MaxConcurrency {
return fmt.Errorf("field %s: must be >= %v", "maxConcurrency", 1)
}
*j = TaskQueuePermission(plain)
return nil
}
// Enable experimental WebAssembly threads support // Enable experimental WebAssembly threads support
type ThreadsFeature struct { type ThreadsFeature struct {
// Explanation for why threads support is needed // Explanation for why threads support is needed

View File

@@ -6,3 +6,10 @@ require (
github.com/extism/go-pdk v1.1.3 github.com/extism/go-pdk v1.1.3
github.com/stretchr/testify v1.11.1 github.com/stretchr/testify v1.11.1
) )
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@@ -43,6 +43,7 @@ The following host services are available:
- Library: provides access to music library metadata for plugins. - Library: provides access to music library metadata for plugins.
- Scheduler: provides task scheduling capabilities for plugins. - Scheduler: provides task scheduling capabilities for plugins.
- SubsonicAPI: provides access to Navidrome's Subsonic API from plugins. - SubsonicAPI: provides access to Navidrome's Subsonic API from plugins.
- Task: provides persistent task queues for plugins.
- Users: provides access to user information for plugins. - Users: provides access to user information for plugins.
- WebSocket: provides WebSocket communication capabilities for plugins. - WebSocket: provides WebSocket communication capabilities for plugins.

View File

@@ -14,6 +14,7 @@ import (
"github.com/navidrome/navidrome/plugins/pdk/go/pdk" "github.com/navidrome/navidrome/plugins/pdk/go/pdk"
) )
// HTTPRequest represents the HTTPRequest data structure.
// HTTPRequest represents an outbound HTTP request from a plugin. // HTTPRequest represents an outbound HTTP request from a plugin.
type HTTPRequest struct { type HTTPRequest struct {
Method string `json:"method"` Method string `json:"method"`
@@ -23,6 +24,7 @@ type HTTPRequest struct {
TimeoutMs int32 `json:"timeoutMs"` TimeoutMs int32 `json:"timeoutMs"`
} }
// HTTPResponse represents the HTTPResponse data structure.
// HTTPResponse represents the response from an outbound HTTP request. // HTTPResponse represents the response from an outbound HTTP request.
type HTTPResponse struct { type HTTPResponse struct {
StatusCode int32 `json:"statusCode"` StatusCode int32 `json:"statusCode"`
@@ -35,11 +37,11 @@ type HTTPResponse struct {
//go:wasmimport extism:host/user http_send //go:wasmimport extism:host/user http_send
func http_send(uint64) uint64 func http_send(uint64) uint64
type httpSendRequest struct { type hTTPSendRequest struct {
Request HTTPRequest `json:"request"` Request HTTPRequest `json:"request"`
} }
type httpSendResponse struct { type hTTPSendResponse struct {
Result *HTTPResponse `json:"result,omitempty"` Result *HTTPResponse `json:"result,omitempty"`
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
@@ -55,7 +57,7 @@ type httpSendResponse struct {
// Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error. // Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error.
func HTTPSend(request HTTPRequest) (*HTTPResponse, error) { func HTTPSend(request HTTPRequest) (*HTTPResponse, error) {
// Marshal request to JSON // Marshal request to JSON
req := httpSendRequest{ req := hTTPSendRequest{
Request: request, Request: request,
} }
reqBytes, err := json.Marshal(req) reqBytes, err := json.Marshal(req)
@@ -73,7 +75,7 @@ func HTTPSend(request HTTPRequest) (*HTTPResponse, error) {
responseBytes := responseMem.ReadBytes() responseBytes := responseMem.ReadBytes()
// Parse the response // Parse the response
var response httpSendResponse var response hTTPSendResponse
if err := json.Unmarshal(responseBytes, &response); err != nil { if err := json.Unmarshal(responseBytes, &response); err != nil {
return nil, err return nil, err
} }

View File

@@ -10,6 +10,7 @@ package host
import "github.com/stretchr/testify/mock" import "github.com/stretchr/testify/mock"
// HTTPRequest represents the HTTPRequest data structure.
// HTTPRequest represents an outbound HTTP request from a plugin. // HTTPRequest represents an outbound HTTP request from a plugin.
type HTTPRequest struct { type HTTPRequest struct {
Method string `json:"method"` Method string `json:"method"`
@@ -19,6 +20,7 @@ type HTTPRequest struct {
TimeoutMs int32 `json:"timeoutMs"` TimeoutMs int32 `json:"timeoutMs"`
} }
// HTTPResponse represents the HTTPResponse data structure.
// HTTPResponse represents the response from an outbound HTTP request. // HTTPResponse represents the response from an outbound HTTP request.
type HTTPResponse struct { type HTTPResponse struct {
StatusCode int32 `json:"statusCode"` StatusCode int32 `json:"statusCode"`

View File

@@ -8,7 +8,6 @@
package host package host
import ( import (
"encoding/binary"
"encoding/json" "encoding/json"
"errors" "errors"
@@ -38,6 +37,12 @@ type subsonicAPICallRawRequest struct {
Uri string `json:"uri"` Uri string `json:"uri"`
} }
type subsonicAPICallRawResponse struct {
ContentType string `json:"contentType,omitempty"`
Data []byte `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
// SubsonicAPICall calls the subsonicapi_call host function. // SubsonicAPICall calls the subsonicapi_call host function.
// Call executes a Subsonic API request and returns the JSON response. // Call executes a Subsonic API request and returns the JSON response.
// //
@@ -78,8 +83,8 @@ func SubsonicAPICall(uri string) (string, error) {
// SubsonicAPICallRaw calls the subsonicapi_callraw host function. // SubsonicAPICallRaw calls the subsonicapi_callraw host function.
// CallRaw executes a Subsonic API request and returns the raw binary response. // CallRaw executes a Subsonic API request and returns the raw binary response.
// Optimized for binary endpoints like getCoverArt and stream that return // Designed for binary endpoints like getCoverArt and stream that return
// non-JSON data. The response is returned as raw bytes without JSON encoding overhead. // non-JSON data. The data is base64-encoded over JSON on the wire.
func SubsonicAPICallRaw(uri string) (string, []byte, error) { func SubsonicAPICallRaw(uri string) (string, []byte, error) {
// Marshal request to JSON // Marshal request to JSON
req := subsonicAPICallRawRequest{ req := subsonicAPICallRawRequest{
@@ -99,22 +104,16 @@ func SubsonicAPICallRaw(uri string) (string, []byte, error) {
responseMem := pdk.FindMemory(responsePtr) responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes() responseBytes := responseMem.ReadBytes()
// Parse binary-framed response // Parse the response
if len(responseBytes) == 0 { var response subsonicAPICallRawResponse
return "", nil, errors.New("empty response from host") if err := json.Unmarshal(responseBytes, &response); err != nil {
return "", nil, err
} }
if responseBytes[0] == 0x01 { // error
return "", nil, errors.New(string(responseBytes[1:])) // Convert Error field to Go error
if response.Error != "" {
return "", nil, errors.New(response.Error)
} }
if responseBytes[0] != 0x00 {
return "", nil, errors.New("unknown response status") return response.ContentType, response.Data, nil
}
if len(responseBytes) < 5 {
return "", nil, errors.New("malformed raw response: incomplete header")
}
ctLen := binary.BigEndian.Uint32(responseBytes[1:5])
if uint32(len(responseBytes)) < 5+ctLen {
return "", nil, errors.New("malformed raw response: content-type overflow")
}
return string(responseBytes[5 : 5+ctLen]), responseBytes[5+ctLen:], nil
} }

View File

@@ -42,8 +42,8 @@ func (m *mockSubsonicAPIService) CallRaw(uri string) (string, []byte, error) {
// SubsonicAPICallRaw delegates to the mock instance. // SubsonicAPICallRaw delegates to the mock instance.
// CallRaw executes a Subsonic API request and returns the raw binary response. // CallRaw executes a Subsonic API request and returns the raw binary response.
// Optimized for binary endpoints like getCoverArt and stream that return // Designed for binary endpoints like getCoverArt and stream that return
// non-JSON data. The response is returned as raw bytes without JSON encoding overhead. // non-JSON data. The data is base64-encoded over JSON on the wire.
func SubsonicAPICallRaw(uri string) (string, []byte, error) { func SubsonicAPICallRaw(uri string) (string, []byte, error) {
return SubsonicAPIMock.CallRaw(uri) return SubsonicAPIMock.CallRaw(uri)
} }

View File

@@ -0,0 +1,227 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains client wrappers for the Task host service.
// It is intended for use in Navidrome plugins built with TinyGo.
//
//go:build wasip1
package host
import (
"encoding/json"
"errors"
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
)
// QueueConfig represents the QueueConfig data structure.
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
Concurrency int32 `json:"concurrency"`
MaxRetries int32 `json:"maxRetries"`
BackoffMs int64 `json:"backoffMs"`
DelayMs int64 `json:"delayMs"`
RetentionMs int64 `json:"retentionMs"`
}
// TaskInfo represents the TaskInfo data structure.
// TaskInfo holds the current state of a task.
type TaskInfo struct {
Status string `json:"status"`
Message string `json:"message"`
Attempt int32 `json:"attempt"`
}
// task_createqueue is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_createqueue
func task_createqueue(uint64) uint64
// task_enqueue is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_enqueue
func task_enqueue(uint64) uint64
// task_get is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_get
func task_get(uint64) uint64
// task_cancel is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_cancel
func task_cancel(uint64) uint64
type taskCreateQueueRequest struct {
Name string `json:"name"`
Config QueueConfig `json:"config"`
}
type taskEnqueueRequest struct {
QueueName string `json:"queueName"`
Payload []byte `json:"payload"`
}
type taskEnqueueResponse struct {
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
type taskGetRequest struct {
TaskID string `json:"taskId"`
}
type taskGetResponse struct {
Result *TaskInfo `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
type taskCancelRequest struct {
TaskID string `json:"taskId"`
}
// TaskCreateQueue calls the task_createqueue host function.
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
func TaskCreateQueue(name string, config QueueConfig) error {
// Marshal request to JSON
req := taskCreateQueueRequest{
Name: name,
Config: config,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_createqueue(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse error-only response
var response struct {
Error string `json:"error,omitempty"`
}
if err := json.Unmarshal(responseBytes, &response); err != nil {
return err
}
if response.Error != "" {
return errors.New(response.Error)
}
return nil
}
// TaskEnqueue calls the task_enqueue host function.
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
func TaskEnqueue(queueName string, payload []byte) (string, error) {
// Marshal request to JSON
req := taskEnqueueRequest{
QueueName: queueName,
Payload: payload,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return "", err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_enqueue(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse the response
var response taskEnqueueResponse
if err := json.Unmarshal(responseBytes, &response); err != nil {
return "", err
}
// Convert Error field to Go error
if response.Error != "" {
return "", errors.New(response.Error)
}
return response.Result, nil
}
// TaskGet calls the task_get host function.
// Get returns the current state of a task including its status,
// message, and attempt count.
func TaskGet(taskID string) (*TaskInfo, error) {
// Marshal request to JSON
req := taskGetRequest{
TaskID: taskID,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return nil, err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_get(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse the response
var response taskGetResponse
if err := json.Unmarshal(responseBytes, &response); err != nil {
return nil, err
}
// Convert Error field to Go error
if response.Error != "" {
return nil, errors.New(response.Error)
}
return response.Result, nil
}
// TaskCancel calls the task_cancel host function.
// Cancel cancels a pending task. Returns error if already
// running, completed, or failed.
func TaskCancel(taskID string) error {
// Marshal request to JSON
req := taskCancelRequest{
TaskID: taskID,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_cancel(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse error-only response
var response struct {
Error string `json:"error,omitempty"`
}
if err := json.Unmarshal(responseBytes, &response); err != nil {
return err
}
if response.Error != "" {
return errors.New(response.Error)
}
return nil
}

View File

@@ -0,0 +1,92 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains mock implementations for non-WASM builds.
// These mocks allow IDE support, compilation, and unit testing on non-WASM platforms.
// Plugin authors can use the exported mock instances to set expectations in tests.
//
//go:build !wasip1
package host
import "github.com/stretchr/testify/mock"
// QueueConfig represents the QueueConfig data structure.
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
Concurrency int32 `json:"concurrency"`
MaxRetries int32 `json:"maxRetries"`
BackoffMs int64 `json:"backoffMs"`
DelayMs int64 `json:"delayMs"`
RetentionMs int64 `json:"retentionMs"`
}
// TaskInfo represents the TaskInfo data structure.
// TaskInfo holds the current state of a task.
type TaskInfo struct {
Status string `json:"status"`
Message string `json:"message"`
Attempt int32 `json:"attempt"`
}
// mockTaskService is the mock implementation for testing.
type mockTaskService struct {
mock.Mock
}
// TaskMock is the auto-instantiated mock instance for testing.
// Use this to set expectations: host.TaskMock.On("MethodName", args...).Return(values...)
var TaskMock = &mockTaskService{}
// CreateQueue is the mock method for TaskCreateQueue.
func (m *mockTaskService) CreateQueue(name string, config QueueConfig) error {
args := m.Called(name, config)
return args.Error(0)
}
// TaskCreateQueue delegates to the mock instance.
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
func TaskCreateQueue(name string, config QueueConfig) error {
return TaskMock.CreateQueue(name, config)
}
// Enqueue is the mock method for TaskEnqueue.
func (m *mockTaskService) Enqueue(queueName string, payload []byte) (string, error) {
args := m.Called(queueName, payload)
return args.String(0), args.Error(1)
}
// TaskEnqueue delegates to the mock instance.
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
func TaskEnqueue(queueName string, payload []byte) (string, error) {
return TaskMock.Enqueue(queueName, payload)
}
// Get is the mock method for TaskGet.
func (m *mockTaskService) Get(taskID string) (*TaskInfo, error) {
args := m.Called(taskID)
return args.Get(0).(*TaskInfo), args.Error(1)
}
// TaskGet delegates to the mock instance.
// Get returns the current state of a task including its status,
// message, and attempt count.
func TaskGet(taskID string) (*TaskInfo, error) {
return TaskMock.Get(taskID)
}
// Cancel is the mock method for TaskCancel.
func (m *mockTaskService) Cancel(taskID string) error {
args := m.Called(taskID)
return args.Error(0)
}
// TaskCancel delegates to the mock instance.
// Cancel cancels a pending task. Returns error if already
// running, completed, or failed.
func TaskCancel(taskID string) error {
return TaskMock.Cancel(taskID)
}

View File

@@ -0,0 +1,79 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains export wrappers for the TaskWorker capability.
// It is intended for use in Navidrome plugins built with TinyGo.
//
//go:build wasip1
package taskworker
import (
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
)
// TaskExecuteRequest is the request provided when a task is ready to execute.
type TaskExecuteRequest struct {
// QueueName is the name of the queue this task belongs to.
QueueName string `json:"queueName"`
// TaskID is the unique identifier for this task.
TaskID string `json:"taskId"`
// Payload is the opaque data provided when the task was enqueued.
Payload []byte `json:"payload"`
// Attempt is the current attempt number (1-based: first attempt = 1).
Attempt int32 `json:"attempt"`
}
// TaskWorker is the marker interface for taskworker plugins.
// Implement one or more of the provider interfaces below.
// TaskWorker provides task execution handling.
// This capability allows plugins to receive callbacks when their queued tasks
// are ready to execute. Plugins that use the taskqueue host service must
// implement this capability.
type TaskWorker interface{}
// TaskExecuteProvider provides the OnTaskExecute function.
type TaskExecuteProvider interface {
OnTaskExecute(TaskExecuteRequest) (string, error)
} // Internal implementation holders
var (
taskExecuteImpl func(TaskExecuteRequest) (string, error)
)
// Register registers a taskworker implementation.
// The implementation is checked for optional provider interfaces.
func Register(impl TaskWorker) {
if p, ok := impl.(TaskExecuteProvider); ok {
taskExecuteImpl = p.OnTaskExecute
}
}
// NotImplementedCode is the standard return code for unimplemented functions.
// The host recognizes this and skips the plugin gracefully.
const NotImplementedCode int32 = -2
//go:wasmexport nd_task_execute
func _NdTaskExecute() int32 {
if taskExecuteImpl == nil {
// Return standard code - host will skip this plugin gracefully
return NotImplementedCode
}
var input TaskExecuteRequest
if err := pdk.InputJSON(&input); err != nil {
pdk.SetError(err)
return -1
}
output, err := taskExecuteImpl(input)
if err != nil {
pdk.SetError(err)
return -1
}
if err := pdk.OutputJSON(output); err != nil {
pdk.SetError(err)
return -1
}
return 0
}

View File

@@ -0,0 +1,41 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file provides stub implementations for non-WASM platforms.
// It allows Go plugins to compile and run tests outside of WASM,
// but the actual functionality is only available in WASM builds.
//
//go:build !wasip1
package taskworker
// TaskExecuteRequest is the request provided when a task is ready to execute.
type TaskExecuteRequest struct {
// QueueName is the name of the queue this task belongs to.
QueueName string `json:"queueName"`
// TaskID is the unique identifier for this task.
TaskID string `json:"taskId"`
// Payload is the opaque data provided when the task was enqueued.
Payload []byte `json:"payload"`
// Attempt is the current attempt number (1-based: first attempt = 1).
Attempt int32 `json:"attempt"`
}
// TaskWorker is the marker interface for taskworker plugins.
// Implement one or more of the provider interfaces below.
// TaskWorker provides task execution handling.
// This capability allows plugins to receive callbacks when their queued tasks
// are ready to execute. Plugins that use the taskqueue host service must
// implement this capability.
type TaskWorker interface{}
// TaskExecuteProvider provides the OnTaskExecute function.
type TaskExecuteProvider interface {
OnTaskExecute(TaskExecuteRequest) (string, error)
}
// NotImplementedCode is the standard return code for unimplemented functions.
const NotImplementedCode int32 = -2
// Register is a no-op on non-WASM platforms.
// This stub allows code to compile outside of WASM.
func Register(_ TaskWorker) {}

View File

@@ -12,6 +12,7 @@ from typing import Any
import extism import extism
import json import json
import base64
class HostFunctionError(Exception): class HostFunctionError(Exception):
@@ -337,7 +338,7 @@ Returns an error if the operation fails.
""" """
request = { request = {
"key": key, "key": key,
"value": value, "value": base64.b64encode(value).decode("ascii"),
"ttlSeconds": ttl_seconds, "ttlSeconds": ttl_seconds,
} }
request_bytes = json.dumps(request).encode("utf-8") request_bytes = json.dumps(request).encode("utf-8")
@@ -382,7 +383,7 @@ or the stored value is not a byte slice, exists will be false.
raise HostFunctionError(response["error"]) raise HostFunctionError(response["error"])
return CacheGetBytesResult( return CacheGetBytesResult(
value=response.get("value", b""), value=base64.b64decode(response.get("value", "")),
exists=response.get("exists", False), exists=response.get("exists", False),
) )

View File

@@ -0,0 +1,60 @@
# Code generated by ndpgen. DO NOT EDIT.
#
# This file contains client wrappers for the HTTP host service.
# It is intended for use in Navidrome plugins built with extism-py.
#
# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly.
# The @extism.import_fn decorators are only detected when defined in the plugin's
# main __init__.py file. Copy the needed functions from this file into your plugin.
from dataclasses import dataclass
from typing import Any
import extism
import json
import base64
class HostFunctionError(Exception):
"""Raised when a host function returns an error."""
pass
@extism.import_fn("extism:host/user", "http_send")
def _http_send(offset: int) -> int:
"""Raw host function - do not call directly."""
...
def http_send(request: Any) -> Any:
"""Send executes an HTTP request and returns the response.
Parameters:
- request: The HTTP request to execute, including method, URL, headers, body, and timeout
Returns the HTTP response with status code, headers, and body.
Network errors, timeouts, and permission failures are returned as Go errors.
Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error.
Args:
request: Any parameter.
Returns:
Any: The result value.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"request": request,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _http_send(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
return response.get("result", None)

View File

@@ -12,6 +12,7 @@ from typing import Any
import extism import extism
import json import json
import base64
class HostFunctionError(Exception): class HostFunctionError(Exception):
@@ -80,7 +81,7 @@ Returns an error if the storage limit would be exceeded or the operation fails.
""" """
request = { request = {
"key": key, "key": key,
"value": value, "value": base64.b64encode(value).decode("ascii"),
} }
request_bytes = json.dumps(request).encode("utf-8") request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes) request_mem = extism.memory.alloc(request_bytes)
@@ -123,7 +124,7 @@ Returns the value and whether the key exists.
raise HostFunctionError(response["error"]) raise HostFunctionError(response["error"])
return KVStoreGetResult( return KVStoreGetResult(
value=response.get("value", b""), value=base64.b64decode(response.get("value", "")),
exists=response.get("exists", False), exists=response.get("exists", False),
) )

View File

@@ -8,11 +8,11 @@
# main __init__.py file. Copy the needed functions from this file into your plugin. # main __init__.py file. Copy the needed functions from this file into your plugin.
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Tuple from typing import Any
import extism import extism
import json import json
import struct import base64
class HostFunctionError(Exception): class HostFunctionError(Exception):
@@ -32,6 +32,13 @@ def _subsonicapi_callraw(offset: int) -> int:
... ...
@dataclass
class SubsonicAPICallRawResult:
"""Result type for subsonicapi_call_raw."""
content_type: str
data: bytes
def subsonicapi_call(uri: str) -> str: def subsonicapi_call(uri: str) -> str:
"""Call executes a Subsonic API request and returns the JSON response. """Call executes a Subsonic API request and returns the JSON response.
@@ -62,16 +69,16 @@ e.g., "getAlbumList2?type=random&size=10". The response is returned as raw JSON.
return response.get("responseJson", "") return response.get("responseJson", "")
def subsonicapi_call_raw(uri: str) -> Tuple[str, bytes]: def subsonicapi_call_raw(uri: str) -> SubsonicAPICallRawResult:
"""CallRaw executes a Subsonic API request and returns the raw binary response. """CallRaw executes a Subsonic API request and returns the raw binary response.
Optimized for binary endpoints like getCoverArt and stream that return Designed for binary endpoints like getCoverArt and stream that return
non-JSON data. The response is returned as raw bytes without JSON encoding overhead. non-JSON data. The data is base64-encoded over JSON on the wire.
Args: Args:
uri: str parameter. uri: str parameter.
Returns: Returns:
Tuple of (content_type, data) with the raw binary response. SubsonicAPICallRawResult containing content_type, data,.
Raises: Raises:
HostFunctionError: If the host function returns an error. HostFunctionError: If the host function returns an error.
@@ -83,19 +90,12 @@ non-JSON data. The response is returned as raw bytes without JSON encoding overh
request_mem = extism.memory.alloc(request_bytes) request_mem = extism.memory.alloc(request_bytes)
response_offset = _subsonicapi_callraw(request_mem.offset) response_offset = _subsonicapi_callraw(request_mem.offset)
response_mem = extism.memory.find(response_offset) response_mem = extism.memory.find(response_offset)
response_bytes = response_mem.bytes() response = json.loads(extism.memory.string(response_mem))
if len(response_bytes) == 0: if response.get("error"):
raise HostFunctionError("empty response from host") raise HostFunctionError(response["error"])
if response_bytes[0] == 0x01:
raise HostFunctionError(response_bytes[1:].decode("utf-8")) return SubsonicAPICallRawResult(
if response_bytes[0] != 0x00: content_type=response.get("contentType", ""),
raise HostFunctionError("unknown response status") data=base64.b64decode(response.get("data", "")),
if len(response_bytes) < 5: )
raise HostFunctionError("malformed raw response: incomplete header")
ct_len = struct.unpack(">I", response_bytes[1:5])[0]
if len(response_bytes) < 5 + ct_len:
raise HostFunctionError("malformed raw response: content-type overflow")
content_type = response_bytes[5:5 + ct_len].decode("utf-8")
data = response_bytes[5 + ct_len:]
return content_type, data

View File

@@ -0,0 +1,154 @@
# Code generated by ndpgen. DO NOT EDIT.
#
# This file contains client wrappers for the Task host service.
# It is intended for use in Navidrome plugins built with extism-py.
#
# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly.
# The @extism.import_fn decorators are only detected when defined in the plugin's
# main __init__.py file. Copy the needed functions from this file into your plugin.
from dataclasses import dataclass
from typing import Any
import extism
import json
import base64
class HostFunctionError(Exception):
"""Raised when a host function returns an error."""
pass
@extism.import_fn("extism:host/user", "task_createqueue")
def _task_createqueue(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "task_enqueue")
def _task_enqueue(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "task_get")
def _task_get(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "task_cancel")
def _task_cancel(offset: int) -> int:
"""Raw host function - do not call directly."""
...
def task_create_queue(name: str, config: Any) -> None:
"""CreateQueue creates a named task queue with the given configuration.
Zero-value fields in config use sensible defaults.
If a queue with the same name already exists, returns an error.
On startup, this also recovers any stale "running" tasks from a previous crash.
Args:
name: str parameter.
config: Any parameter.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"name": name,
"config": config,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_createqueue(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
def task_enqueue(queue_name: str, payload: bytes) -> str:
"""Enqueue adds a task to the named queue. Returns the task ID.
payload is opaque bytes passed back to the plugin on execution.
Args:
queue_name: str parameter.
payload: bytes parameter.
Returns:
str: The result value.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"queueName": queue_name,
"payload": base64.b64encode(payload).decode("ascii"),
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_enqueue(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
return response.get("result", "")
def task_get(task_id: str) -> Any:
"""Get returns the current state of a task including its status,
message, and attempt count.
Args:
task_id: str parameter.
Returns:
Any: The result value.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"taskId": task_id,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_get(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
return response.get("result", None)
def task_cancel(task_id: str) -> None:
"""Cancel cancels a pending task. Returns error if already
running, completed, or failed.
Args:
task_id: str parameter.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"taskId": task_id,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_cancel(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])

View File

@@ -12,6 +12,7 @@ from typing import Any
import extism import extism
import json import json
import base64
class HostFunctionError(Exception): class HostFunctionError(Exception):
@@ -134,7 +135,7 @@ Returns an error if the connection is not found or if sending fails.
""" """
request = { request = {
"connectionId": connection_id, "connectionId": connection_id,
"data": data, "data": base64.b64encode(data).decode("ascii"),
} }
request_bytes = json.dumps(request).encode("utf-8") request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes) request_mem = extism.memory.alloc(request_bytes)

View File

@@ -11,6 +11,7 @@ path = "src/lib.rs"
crate-type = ["rlib"] crate-type = ["rlib"]
[dependencies] [dependencies]
base64 = "0.22"
extism-pdk = "1.2" extism-pdk = "1.2"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"

View File

@@ -9,4 +9,5 @@ pub mod lifecycle;
pub mod metadata; pub mod metadata;
pub mod scheduler; pub mod scheduler;
pub mod scrobbler; pub mod scrobbler;
pub mod taskworker;
pub mod websocket; pub mod websocket;

View File

@@ -0,0 +1,102 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains export wrappers for the TaskWorker capability.
// It is intended for use in Navidrome plugins built with extism-pdk.
use serde::{Deserialize, Serialize};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
mod base64_bytes {
use serde::{self, Deserialize, Deserializer, Serializer};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
BASE64.decode(&s).map_err(serde::de::Error::custom)
}
}
// Helper functions for skip_serializing_if with numeric types
#[allow(dead_code)]
fn is_zero_i32(value: &i32) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_u32(value: &u32) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_i64(value: &i64) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_u64(value: &u64) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_f32(value: &f32) -> bool { *value == 0.0 }
#[allow(dead_code)]
fn is_zero_f64(value: &f64) -> bool { *value == 0.0 }
/// TaskExecuteRequest is the request provided when a task is ready to execute.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskExecuteRequest {
/// QueueName is the name of the queue this task belongs to.
#[serde(default)]
pub queue_name: String,
/// TaskID is the unique identifier for this task.
#[serde(default)]
pub task_id: String,
/// Payload is the opaque data provided when the task was enqueued.
#[serde(default)]
#[serde(with = "base64_bytes")]
pub payload: Vec<u8>,
/// Attempt is the current attempt number (1-based: first attempt = 1).
#[serde(default)]
pub attempt: i32,
}
/// Error represents an error from a capability method.
#[derive(Debug)]
pub struct Error {
pub message: String,
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.message)
}
}
impl std::error::Error for Error {}
impl Error {
pub fn new(message: impl Into<String>) -> Self {
Self { message: message.into() }
}
}
/// TaskExecuteProvider provides the OnTaskExecute function.
pub trait TaskExecuteProvider {
fn on_task_execute(&self, req: TaskExecuteRequest) -> Result<String, Error>;
}
/// Register the on_task_execute export.
/// This macro generates the WASM export function for this method.
#[macro_export]
macro_rules! register_taskworker_task_execute {
($plugin_type:ty) => {
#[extism_pdk::plugin_fn]
pub fn nd_task_execute(
req: extism_pdk::Json<$crate::taskworker::TaskExecuteRequest>
) -> extism_pdk::FnResult<extism_pdk::Json<String>> {
let plugin = <$plugin_type>::default();
let result = $crate::taskworker::TaskExecuteProvider::on_task_execute(&plugin, req.into_inner())?;
Ok(extism_pdk::Json(result))
}
};
}

View File

@@ -11,6 +11,7 @@ readme = "README.md"
crate-type = ["rlib"] crate-type = ["rlib"]
[dependencies] [dependencies]
base64 = "0.22"
extism-pdk = "1.2" extism-pdk = "1.2"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"

View File

@@ -40,6 +40,7 @@
//! - [`library`] - provides access to music library metadata for plugins. //! - [`library`] - provides access to music library metadata for plugins.
//! - [`scheduler`] - provides task scheduling capabilities for plugins. //! - [`scheduler`] - provides task scheduling capabilities for plugins.
//! - [`subsonicapi`] - provides access to Navidrome's Subsonic API from plugins. //! - [`subsonicapi`] - provides access to Navidrome's Subsonic API from plugins.
//! - [`task`] - provides persistent task queues for plugins.
//! - [`users`] - provides access to user information for plugins. //! - [`users`] - provides access to user information for plugins.
//! - [`websocket`] - provides WebSocket communication capabilities for plugins. //! - [`websocket`] - provides WebSocket communication capabilities for plugins.
@@ -99,6 +100,13 @@ pub mod subsonicapi {
pub use super::nd_host_subsonicapi::*; pub use super::nd_host_subsonicapi::*;
} }
#[doc(hidden)]
mod nd_host_task;
/// provides persistent task queues for plugins.
pub mod task {
pub use super::nd_host_task::*;
}
#[doc(hidden)] #[doc(hidden)]
mod nd_host_users; mod nd_host_users;
/// provides access to user information for plugins. /// provides access to user information for plugins.

View File

@@ -5,6 +5,29 @@
use extism_pdk::*; use extism_pdk::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
mod base64_bytes {
use serde::{self, Deserialize, Deserializer, Serializer};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
BASE64.decode(&s).map_err(serde::de::Error::custom)
}
}
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
@@ -106,6 +129,7 @@ struct CacheGetFloatResponse {
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct CacheSetBytesRequest { struct CacheSetBytesRequest {
key: String, key: String,
#[serde(with = "base64_bytes")]
value: Vec<u8>, value: Vec<u8>,
ttl_seconds: i64, ttl_seconds: i64,
} }
@@ -127,6 +151,7 @@ struct CacheGetBytesRequest {
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct CacheGetBytesResponse { struct CacheGetBytesResponse {
#[serde(default)] #[serde(default)]
#[serde(with = "base64_bytes")]
value: Vec<u8>, value: Vec<u8>,
#[serde(default)] #[serde(default)]
exists: bool, exists: bool,

View File

@@ -5,16 +5,40 @@
use extism_pdk::*; use extism_pdk::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
mod base64_bytes {
use serde::{self, Deserialize, Deserializer, Serializer};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
BASE64.decode(&s).map_err(serde::de::Error::custom)
}
}
/// HTTPRequest represents an outbound HTTP request from a plugin. /// HTTPRequest represents an outbound HTTP request from a plugin.
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct HttpRequest { pub struct HTTPRequest {
pub method: String, pub method: String,
pub url: String, pub url: String,
#[serde(default)] #[serde(default)]
pub headers: std::collections::HashMap<String, String>, pub headers: std::collections::HashMap<String, String>,
#[serde(default)] #[serde(default)]
#[serde(with = "base64_bytes")]
pub body: Vec<u8>, pub body: Vec<u8>,
#[serde(default)] #[serde(default)]
pub timeout_ms: i32, pub timeout_ms: i32,
@@ -23,25 +47,26 @@ pub struct HttpRequest {
/// HTTPResponse represents the response from an outbound HTTP request. /// HTTPResponse represents the response from an outbound HTTP request.
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct HttpResponse { pub struct HTTPResponse {
pub status_code: i32, pub status_code: i32,
#[serde(default)] #[serde(default)]
pub headers: std::collections::HashMap<String, String>, pub headers: std::collections::HashMap<String, String>,
#[serde(default)] #[serde(default)]
#[serde(with = "base64_bytes")]
pub body: Vec<u8>, pub body: Vec<u8>,
} }
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct HTTPSendRequest { struct HTTPSendRequest {
request: HttpRequest, request: HTTPRequest,
} }
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct HTTPSendResponse { struct HTTPSendResponse {
#[serde(default)] #[serde(default)]
result: Option<HttpResponse>, result: Option<HTTPResponse>,
#[serde(default)] #[serde(default)]
error: Option<String>, error: Option<String>,
} }
@@ -57,18 +82,18 @@ extern "ExtismHost" {
/// - request: The HTTP request to execute, including method, URL, headers, body, and timeout /// - request: The HTTP request to execute, including method, URL, headers, body, and timeout
/// ///
/// Returns the HTTP response with status code, headers, and body. /// Returns the HTTP response with status code, headers, and body.
/// Network errors, timeouts, and permission failures are returned as errors. /// Network errors, timeouts, and permission failures are returned as Go errors.
/// Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error. /// Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error.
/// ///
/// # Arguments /// # Arguments
/// * `request` - HttpRequest parameter. /// * `request` - HTTPRequest parameter.
/// ///
/// # Returns /// # Returns
/// The result value. /// The result value.
/// ///
/// # Errors /// # Errors
/// Returns an error if the host function call fails. /// Returns an error if the host function call fails.
pub fn send(request: HttpRequest) -> Result<Option<HttpResponse>, Error> { pub fn send(request: HTTPRequest) -> Result<Option<HTTPResponse>, Error> {
let response = unsafe { let response = unsafe {
http_send(Json(HTTPSendRequest { http_send(Json(HTTPSendRequest {
request: request, request: request,

View File

@@ -5,11 +5,35 @@
use extism_pdk::*; use extism_pdk::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
mod base64_bytes {
use serde::{self, Deserialize, Deserializer, Serializer};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
BASE64.decode(&s).map_err(serde::de::Error::custom)
}
}
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct KVStoreSetRequest { struct KVStoreSetRequest {
key: String, key: String,
#[serde(with = "base64_bytes")]
value: Vec<u8>, value: Vec<u8>,
} }
@@ -30,6 +54,7 @@ struct KVStoreGetRequest {
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct KVStoreGetResponse { struct KVStoreGetResponse {
#[serde(default)] #[serde(default)]
#[serde(with = "base64_bytes")]
value: Vec<u8>, value: Vec<u8>,
#[serde(default)] #[serde(default)]
exists: bool, exists: bool,

View File

@@ -5,6 +5,29 @@
use extism_pdk::*; use extism_pdk::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
mod base64_bytes {
use serde::{self, Deserialize, Deserializer, Serializer};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
BASE64.decode(&s).map_err(serde::de::Error::custom)
}
}
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
@@ -27,14 +50,22 @@ struct SubsonicAPICallRawRequest {
uri: String, uri: String,
} }
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct SubsonicAPICallRawResponse {
#[serde(default)]
content_type: String,
#[serde(default)]
#[serde(with = "base64_bytes")]
data: Vec<u8>,
#[serde(default)]
error: Option<String>,
}
#[host_fn] #[host_fn]
extern "ExtismHost" { extern "ExtismHost" {
fn subsonicapi_call(input: Json<SubsonicAPICallRequest>) -> Json<SubsonicAPICallResponse>; fn subsonicapi_call(input: Json<SubsonicAPICallRequest>) -> Json<SubsonicAPICallResponse>;
} fn subsonicapi_callraw(input: Json<SubsonicAPICallRawRequest>) -> Json<SubsonicAPICallRawResponse>;
#[link(wasm_import_module = "extism:host/user")]
extern "C" {
fn subsonicapi_callraw(offset: u64) -> u64;
} }
/// Call executes a Subsonic API request and returns the JSON response. /// Call executes a Subsonic API request and returns the JSON response.
@@ -65,54 +96,27 @@ pub fn call(uri: &str) -> Result<String, Error> {
} }
/// CallRaw executes a Subsonic API request and returns the raw binary response. /// CallRaw executes a Subsonic API request and returns the raw binary response.
/// Optimized for binary endpoints like getCoverArt and stream that return /// Designed for binary endpoints like getCoverArt and stream that return
/// non-JSON data. The response is returned as raw bytes without JSON encoding overhead. /// non-JSON data. The data is base64-encoded over JSON on the wire.
/// ///
/// # Arguments /// # Arguments
/// * `uri` - String parameter. /// * `uri` - String parameter.
/// ///
/// # Returns /// # Returns
/// A tuple of (content_type, data) with the raw binary response. /// A tuple of (content_type, data).
/// ///
/// # Errors /// # Errors
/// Returns an error if the host function call fails. /// Returns an error if the host function call fails.
pub fn call_raw(uri: &str) -> Result<(String, Vec<u8>), Error> { pub fn call_raw(uri: &str) -> Result<(String, Vec<u8>), Error> {
let req = SubsonicAPICallRawRequest { let response = unsafe {
uri: uri.to_owned(), subsonicapi_callraw(Json(SubsonicAPICallRawRequest {
uri: uri.to_owned(),
}))?
}; };
let input_bytes = serde_json::to_vec(&req).map_err(|e| Error::msg(e.to_string()))?;
let input_mem = Memory::from_bytes(&input_bytes).map_err(|e| Error::msg(e.to_string()))?;
let response_offset = unsafe { subsonicapi_callraw(input_mem.offset()) }; if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
let response_mem = Memory::find(response_offset) Ok((response.0.content_type, response.0.data))
.ok_or_else(|| Error::msg("empty response from host"))?;
let response_bytes = response_mem.to_vec();
if response_bytes.is_empty() {
return Err(Error::msg("empty response from host"));
}
if response_bytes[0] == 0x01 {
let msg = String::from_utf8_lossy(&response_bytes[1..]).to_string();
return Err(Error::msg(msg));
}
if response_bytes[0] != 0x00 {
return Err(Error::msg("unknown response status"));
}
if response_bytes.len() < 5 {
return Err(Error::msg("malformed raw response: incomplete header"));
}
let ct_len = u32::from_be_bytes([
response_bytes[1],
response_bytes[2],
response_bytes[3],
response_bytes[4],
]) as usize;
if ct_len > response_bytes.len() - 5 {
return Err(Error::msg("malformed raw response: content-type overflow"));
}
let ct_end = 5 + ct_len;
let content_type = String::from_utf8_lossy(&response_bytes[5..ct_end]).to_string();
let data = response_bytes[ct_end..].to_vec();
Ok((content_type, data))
} }

View File

@@ -0,0 +1,217 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains client wrappers for the Task host service.
// It is intended for use in Navidrome plugins built with extism-pdk.
use extism_pdk::*;
use serde::{Deserialize, Serialize};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
mod base64_bytes {
use serde::{self, Deserialize, Deserializer, Serializer};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
BASE64.decode(&s).map_err(serde::de::Error::custom)
}
}
/// QueueConfig holds configuration for a task queue.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct QueueConfig {
pub concurrency: i32,
pub max_retries: i32,
pub backoff_ms: i64,
pub delay_ms: i64,
pub retention_ms: i64,
}
/// TaskInfo holds the current state of a task.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskInfo {
pub status: String,
pub message: String,
pub attempt: i32,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskCreateQueueRequest {
name: String,
config: QueueConfig,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskCreateQueueResponse {
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskEnqueueRequest {
queue_name: String,
#[serde(with = "base64_bytes")]
payload: Vec<u8>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskEnqueueResponse {
#[serde(default)]
result: String,
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskGetRequest {
task_id: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskGetResponse {
#[serde(default)]
result: Option<TaskInfo>,
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskCancelRequest {
task_id: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskCancelResponse {
#[serde(default)]
error: Option<String>,
}
#[host_fn]
extern "ExtismHost" {
fn task_createqueue(input: Json<TaskCreateQueueRequest>) -> Json<TaskCreateQueueResponse>;
fn task_enqueue(input: Json<TaskEnqueueRequest>) -> Json<TaskEnqueueResponse>;
fn task_get(input: Json<TaskGetRequest>) -> Json<TaskGetResponse>;
fn task_cancel(input: Json<TaskCancelRequest>) -> Json<TaskCancelResponse>;
}
/// CreateQueue creates a named task queue with the given configuration.
/// Zero-value fields in config use sensible defaults.
/// If a queue with the same name already exists, returns an error.
/// On startup, this also recovers any stale "running" tasks from a previous crash.
///
/// # Arguments
/// * `name` - String parameter.
/// * `config` - QueueConfig parameter.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn create_queue(name: &str, config: QueueConfig) -> Result<(), Error> {
let response = unsafe {
task_createqueue(Json(TaskCreateQueueRequest {
name: name.to_owned(),
config: config,
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(())
}
/// Enqueue adds a task to the named queue. Returns the task ID.
/// payload is opaque bytes passed back to the plugin on execution.
///
/// # Arguments
/// * `queue_name` - String parameter.
/// * `payload` - Vec<u8> parameter.
///
/// # Returns
/// The result value.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn enqueue(queue_name: &str, payload: Vec<u8>) -> Result<String, Error> {
let response = unsafe {
task_enqueue(Json(TaskEnqueueRequest {
queue_name: queue_name.to_owned(),
payload: payload,
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(response.0.result)
}
/// Get returns the current state of a task including its status,
/// message, and attempt count.
///
/// # Arguments
/// * `task_id` - String parameter.
///
/// # Returns
/// The result value.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn get(task_id: &str) -> Result<Option<TaskInfo>, Error> {
let response = unsafe {
task_get(Json(TaskGetRequest {
task_id: task_id.to_owned(),
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(response.0.result)
}
/// Cancel cancels a pending task. Returns error if already
/// running, completed, or failed.
///
/// # Arguments
/// * `task_id` - String parameter.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn cancel(task_id: &str) -> Result<(), Error> {
let response = unsafe {
task_cancel(Json(TaskCancelRequest {
task_id: task_id.to_owned(),
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(())
}

View File

@@ -5,6 +5,29 @@
use extism_pdk::*; use extism_pdk::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
mod base64_bytes {
use serde::{self, Deserialize, Deserializer, Serializer};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
BASE64.decode(&s).map_err(serde::de::Error::custom)
}
}
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
@@ -41,6 +64,7 @@ struct WebSocketSendTextResponse {
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct WebSocketSendBinaryRequest { struct WebSocketSendBinaryRequest {
connection_id: String, connection_id: String,
#[serde(with = "base64_bytes")]
data: Vec<u8>, data: Vec<u8>,
} }

16
plugins/testdata/test-taskqueue/go.mod vendored Normal file
View File

@@ -0,0 +1,16 @@
module test-taskqueue
go 1.25
require github.com/navidrome/navidrome/plugins/pdk/go v0.0.0
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/extism/go-pdk v1.1.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/stretchr/testify v1.11.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace github.com/navidrome/navidrome/plugins/pdk/go => ../../pdk/go

14
plugins/testdata/test-taskqueue/go.sum vendored Normal file
View File

@@ -0,0 +1,14 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/extism/go-pdk v1.1.3 h1:hfViMPWrqjN6u67cIYRALZTZLk/enSPpNKa+rZ9X2SQ=
github.com/extism/go-pdk v1.1.3/go.mod h1:Gz+LIU/YCKnKXhgge8yo5Yu1F/lbv7KtKFkiCSzW/P4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

104
plugins/testdata/test-taskqueue/main.go vendored Normal file
View File

@@ -0,0 +1,104 @@
// Test TaskQueue plugin for Navidrome plugin system integration tests.
// Build with: tinygo build -o ../test-taskqueue.wasm -target wasip1 -buildmode=c-shared .
package main
import (
"fmt"
"github.com/navidrome/navidrome/plugins/pdk/go/host"
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
"github.com/navidrome/navidrome/plugins/pdk/go/taskworker"
)
func init() {
taskworker.Register(&handler{})
}
type handler struct{}
func (h *handler) OnTaskExecute(req taskworker.TaskExecuteRequest) (string, error) {
payload := string(req.Payload)
if payload == "fail" {
return "", fmt.Errorf("task failed as instructed")
}
if payload == "fail-then-succeed" && req.Attempt < 2 {
return "", fmt.Errorf("transient failure")
}
return "completed successfully", nil
}
// Test helper types
type TestInput struct {
Operation string `json:"operation"`
QueueName string `json:"queueName,omitempty"`
Config *host.QueueConfig `json:"config,omitempty"`
Payload []byte `json:"payload,omitempty"`
TaskID string `json:"taskId,omitempty"`
}
type TestOutput struct {
TaskID string `json:"taskId,omitempty"`
Status string `json:"status,omitempty"`
Message string `json:"message,omitempty"`
Attempt int32 `json:"attempt,omitempty"`
Error *string `json:"error,omitempty"`
}
//go:wasmexport nd_test_taskqueue
func ndTestTaskQueue() int32 {
var input TestInput
if err := pdk.InputJSON(&input); err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
switch input.Operation {
case "create_queue":
config := host.QueueConfig{}
if input.Config != nil {
config = *input.Config
}
err := host.TaskCreateQueue(input.QueueName, config)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{})
case "enqueue":
taskID, err := host.TaskEnqueue(input.QueueName, input.Payload)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{TaskID: taskID})
case "get_task_status":
info, err := host.TaskGet(input.TaskID)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{Status: info.Status, Message: info.Message, Attempt: info.Attempt})
case "cancel_task":
err := host.TaskCancel(input.TaskID)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{})
default:
errStr := "unknown operation: " + input.Operation
pdk.OutputJSON(TestOutput{Error: &errStr})
}
return 0
}
func main() {}

View File

@@ -0,0 +1,12 @@
{
"name": "Test TaskQueue Plugin",
"author": "Navidrome Test",
"version": "1.0.0",
"description": "A test plugin for TaskQueue integration testing",
"permissions": {
"taskqueue": {
"reason": "For testing task queue operations",
"maxConcurrency": 10
}
}
}

View File

@@ -443,7 +443,7 @@ func (api *Router) buildArtist(r *http.Request, artist *model.Artist) (*response
func (api *Router) buildAlbumDirectory(ctx context.Context, album *model.Album) (*responses.Directory, error) { func (api *Router) buildAlbumDirectory(ctx context.Context, album *model.Album) (*responses.Directory, error) {
dir := &responses.Directory{} dir := &responses.Directory{}
dir.Id = album.ID dir.Id = album.ID
dir.Name = album.Name dir.Name = album.FullName()
dir.Parent = album.AlbumArtistID dir.Parent = album.AlbumArtistID
dir.PlayCount = album.PlayCount dir.PlayCount = album.PlayCount
if album.PlayCount > 0 { if album.PlayCount > 0 {

View File

@@ -197,7 +197,7 @@ func childFromMediaFile(ctx context.Context, mf model.MediaFile) responses.Child
} }
child.Parent = mf.AlbumID child.Parent = mf.AlbumID
child.Album = mf.Album child.Album = mf.FullAlbumName()
child.Year = int32(mf.Year) child.Year = int32(mf.Year)
child.Artist = mf.Artist child.Artist = mf.Artist
child.Genre = mf.Genre child.Genre = mf.Genre
@@ -302,7 +302,7 @@ func artistRefs(participants model.ParticipantList) []responses.ArtistID3Ref {
func fakePath(mf model.MediaFile) string { func fakePath(mf model.MediaFile) string {
builder := strings.Builder{} builder := strings.Builder{}
builder.WriteString(fmt.Sprintf("%s/%s/", sanitizeSlashes(mf.AlbumArtist), sanitizeSlashes(mf.Album))) builder.WriteString(fmt.Sprintf("%s/%s/", sanitizeSlashes(mf.AlbumArtist), sanitizeSlashes(mf.FullAlbumName())))
if mf.DiscNumber != 0 { if mf.DiscNumber != 0 {
builder.WriteString(fmt.Sprintf("%02d-", mf.DiscNumber)) builder.WriteString(fmt.Sprintf("%02d-", mf.DiscNumber))
} }
@@ -321,9 +321,10 @@ func childFromAlbum(ctx context.Context, al model.Album) responses.Child {
child := responses.Child{} child := responses.Child{}
child.Id = al.ID child.Id = al.ID
child.IsDir = true child.IsDir = true
child.Title = al.Name fullName := al.FullName()
child.Name = al.Name child.Title = fullName
child.Album = al.Name child.Name = fullName
child.Album = fullName
child.Artist = al.AlbumArtist child.Artist = al.AlbumArtist
child.Year = int32(cmp.Or(al.MaxOriginalYear, al.MaxYear)) child.Year = int32(cmp.Or(al.MaxOriginalYear, al.MaxYear))
child.Genre = al.Genre child.Genre = al.Genre
@@ -405,7 +406,7 @@ func buildDiscSubtitles(a model.Album) []responses.DiscTitle {
func buildAlbumID3(ctx context.Context, album model.Album) responses.AlbumID3 { func buildAlbumID3(ctx context.Context, album model.Album) responses.AlbumID3 {
dir := responses.AlbumID3{} dir := responses.AlbumID3{}
dir.Id = album.ID dir.Id = album.ID
dir.Name = album.Name dir.Name = album.FullName()
dir.Artist = album.AlbumArtist dir.Artist = album.AlbumArtist
dir.ArtistId = album.AlbumArtistID dir.ArtistId = album.AlbumArtistID
dir.CoverArt = album.CoverArtID().String() dir.CoverArt = album.CoverArtID().String()