mirror of
https://github.com/navidrome/navidrome.git
synced 2026-02-27 20:36:20 -05:00
Compare commits
20 Commits
feat/spell
...
feat/plugi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c2e8b39392 | ||
|
|
1974d1276e | ||
|
|
d7ace6f95f | ||
|
|
a196ec9a59 | ||
|
|
132928abb6 | ||
|
|
173aa9b979 | ||
|
|
3b2133c134 | ||
|
|
74bacf6879 | ||
|
|
55ef58da83 | ||
|
|
2bfbe6fde1 | ||
|
|
03cce614fd | ||
|
|
36a8cb37ca | ||
|
|
11d2b3b51c | ||
|
|
b308c71f38 | ||
|
|
591f3a333b | ||
|
|
36b58a9a10 | ||
|
|
bd8032b327 | ||
|
|
582d1b3cd9 | ||
|
|
cdd3432788 | ||
|
|
5bc2bbb70e |
@@ -155,6 +155,7 @@ type scannerOptions struct {
|
|||||||
|
|
||||||
type subsonicOptions struct {
|
type subsonicOptions struct {
|
||||||
AppendSubtitle bool
|
AppendSubtitle bool
|
||||||
|
AppendAlbumVersion bool
|
||||||
ArtistParticipations bool
|
ArtistParticipations bool
|
||||||
DefaultReportRealPath bool
|
DefaultReportRealPath bool
|
||||||
EnableAverageRating bool
|
EnableAverageRating bool
|
||||||
@@ -689,6 +690,7 @@ func setViperDefaults() {
|
|||||||
viper.SetDefault("scanner.followsymlinks", true)
|
viper.SetDefault("scanner.followsymlinks", true)
|
||||||
viper.SetDefault("scanner.purgemissing", consts.PurgeMissingNever)
|
viper.SetDefault("scanner.purgemissing", consts.PurgeMissingNever)
|
||||||
viper.SetDefault("subsonic.appendsubtitle", true)
|
viper.SetDefault("subsonic.appendsubtitle", true)
|
||||||
|
viper.SetDefault("subsonic.appendalbumversion", true)
|
||||||
viper.SetDefault("subsonic.artistparticipations", false)
|
viper.SetDefault("subsonic.artistparticipations", false)
|
||||||
viper.SetDefault("subsonic.defaultreportrealpath", false)
|
viper.SetDefault("subsonic.defaultreportrealpath", false)
|
||||||
viper.SetDefault("subsonic.enableaveragerating", true)
|
viper.SetDefault("subsonic.enableaveragerating", true)
|
||||||
|
|||||||
@@ -1,11 +1,14 @@
|
|||||||
package model
|
package model
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"iter"
|
"iter"
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/navidrome/navidrome/conf"
|
||||||
|
|
||||||
"github.com/gohugoio/hashstructure"
|
"github.com/gohugoio/hashstructure"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -70,6 +73,13 @@ func (a Album) CoverArtID() ArtworkID {
|
|||||||
return artworkIDFromAlbum(a)
|
return artworkIDFromAlbum(a)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a Album) FullName() string {
|
||||||
|
if conf.Server.Subsonic.AppendAlbumVersion && len(a.Tags[TagAlbumVersion]) > 0 {
|
||||||
|
return fmt.Sprintf("%s (%s)", a.Name, a.Tags[TagAlbumVersion][0])
|
||||||
|
}
|
||||||
|
return a.Name
|
||||||
|
}
|
||||||
|
|
||||||
// Equals compares two Album structs, ignoring calculated fields
|
// Equals compares two Album structs, ignoring calculated fields
|
||||||
func (a Album) Equals(other Album) bool {
|
func (a Album) Equals(other Album) bool {
|
||||||
// Normalize float32 values to avoid false negatives
|
// Normalize float32 values to avoid false negatives
|
||||||
|
|||||||
@@ -3,11 +3,30 @@ package model_test
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/navidrome/navidrome/conf"
|
||||||
|
"github.com/navidrome/navidrome/conf/configtest"
|
||||||
. "github.com/navidrome/navidrome/model"
|
. "github.com/navidrome/navidrome/model"
|
||||||
. "github.com/onsi/ginkgo/v2"
|
. "github.com/onsi/ginkgo/v2"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var _ = Describe("Album", func() {
|
||||||
|
BeforeEach(func() {
|
||||||
|
DeferCleanup(configtest.SetupConfig())
|
||||||
|
})
|
||||||
|
DescribeTable("FullName",
|
||||||
|
func(enabled bool, tags Tags, expected string) {
|
||||||
|
conf.Server.Subsonic.AppendAlbumVersion = enabled
|
||||||
|
a := Album{Name: "Album", Tags: tags}
|
||||||
|
Expect(a.FullName()).To(Equal(expected))
|
||||||
|
},
|
||||||
|
Entry("appends version when enabled and tag is present", true, Tags{TagAlbumVersion: []string{"Remastered"}}, "Album (Remastered)"),
|
||||||
|
Entry("returns just name when disabled", false, Tags{TagAlbumVersion: []string{"Remastered"}}, "Album"),
|
||||||
|
Entry("returns just name when tag is absent", true, Tags{}, "Album"),
|
||||||
|
Entry("returns just name when tag is an empty slice", true, Tags{TagAlbumVersion: []string{}}, "Album"),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
var _ = Describe("Albums", func() {
|
var _ = Describe("Albums", func() {
|
||||||
var albums Albums
|
var albums Albums
|
||||||
|
|
||||||
|
|||||||
@@ -95,12 +95,19 @@ type MediaFile struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mf MediaFile) FullTitle() string {
|
func (mf MediaFile) FullTitle() string {
|
||||||
if conf.Server.Subsonic.AppendSubtitle && mf.Tags[TagSubtitle] != nil {
|
if conf.Server.Subsonic.AppendSubtitle && len(mf.Tags[TagSubtitle]) > 0 {
|
||||||
return fmt.Sprintf("%s (%s)", mf.Title, mf.Tags[TagSubtitle][0])
|
return fmt.Sprintf("%s (%s)", mf.Title, mf.Tags[TagSubtitle][0])
|
||||||
}
|
}
|
||||||
return mf.Title
|
return mf.Title
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mf MediaFile) FullAlbumName() string {
|
||||||
|
if conf.Server.Subsonic.AppendAlbumVersion && len(mf.Tags[TagAlbumVersion]) > 0 {
|
||||||
|
return fmt.Sprintf("%s (%s)", mf.Album, mf.Tags[TagAlbumVersion][0])
|
||||||
|
}
|
||||||
|
return mf.Album
|
||||||
|
}
|
||||||
|
|
||||||
func (mf MediaFile) ContentType() string {
|
func (mf MediaFile) ContentType() string {
|
||||||
return mime.TypeByExtension("." + mf.Suffix)
|
return mime.TypeByExtension("." + mf.Suffix)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -475,7 +475,29 @@ var _ = Describe("MediaFile", func() {
|
|||||||
DeferCleanup(configtest.SetupConfig())
|
DeferCleanup(configtest.SetupConfig())
|
||||||
conf.Server.EnableMediaFileCoverArt = true
|
conf.Server.EnableMediaFileCoverArt = true
|
||||||
})
|
})
|
||||||
Describe(".CoverArtId()", func() {
|
DescribeTable("FullTitle",
|
||||||
|
func(enabled bool, tags Tags, expected string) {
|
||||||
|
conf.Server.Subsonic.AppendSubtitle = enabled
|
||||||
|
mf := MediaFile{Title: "Song", Tags: tags}
|
||||||
|
Expect(mf.FullTitle()).To(Equal(expected))
|
||||||
|
},
|
||||||
|
Entry("appends subtitle when enabled and tag is present", true, Tags{TagSubtitle: []string{"Live"}}, "Song (Live)"),
|
||||||
|
Entry("returns just title when disabled", false, Tags{TagSubtitle: []string{"Live"}}, "Song"),
|
||||||
|
Entry("returns just title when tag is absent", true, Tags{}, "Song"),
|
||||||
|
Entry("returns just title when tag is an empty slice", true, Tags{TagSubtitle: []string{}}, "Song"),
|
||||||
|
)
|
||||||
|
DescribeTable("FullAlbumName",
|
||||||
|
func(enabled bool, tags Tags, expected string) {
|
||||||
|
conf.Server.Subsonic.AppendAlbumVersion = enabled
|
||||||
|
mf := MediaFile{Album: "Album", Tags: tags}
|
||||||
|
Expect(mf.FullAlbumName()).To(Equal(expected))
|
||||||
|
},
|
||||||
|
Entry("appends version when enabled and tag is present", true, Tags{TagAlbumVersion: []string{"Deluxe Edition"}}, "Album (Deluxe Edition)"),
|
||||||
|
Entry("returns just album name when disabled", false, Tags{TagAlbumVersion: []string{"Deluxe Edition"}}, "Album"),
|
||||||
|
Entry("returns just album name when tag is absent", true, Tags{}, "Album"),
|
||||||
|
Entry("returns just album name when tag is an empty slice", true, Tags{TagAlbumVersion: []string{}}, "Album"),
|
||||||
|
)
|
||||||
|
Describe("CoverArtId()", func() {
|
||||||
It("returns its own id if it HasCoverArt", func() {
|
It("returns its own id if it HasCoverArt", func() {
|
||||||
mf := MediaFile{ID: "111", AlbumID: "1", HasCoverArt: true}
|
mf := MediaFile{ID: "111", AlbumID: "1", HasCoverArt: true}
|
||||||
id := mf.CoverArtID()
|
id := mf.CoverArtID()
|
||||||
|
|||||||
27
plugins/capabilities/taskworker.go
Normal file
27
plugins/capabilities/taskworker.go
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
package capabilities
|
||||||
|
|
||||||
|
// TaskWorker provides task execution handling.
|
||||||
|
// This capability allows plugins to receive callbacks when their queued tasks
|
||||||
|
// are ready to execute. Plugins that use the taskqueue host service must
|
||||||
|
// implement this capability.
|
||||||
|
//
|
||||||
|
//nd:capability name=taskworker
|
||||||
|
type TaskWorker interface {
|
||||||
|
// OnTaskExecute is called when a queued task is ready to run.
|
||||||
|
// The returned string is a status/result message stored in the tasks table.
|
||||||
|
// Return an error to trigger retry (if retries are configured).
|
||||||
|
//nd:export name=nd_task_execute
|
||||||
|
OnTaskExecute(TaskExecuteRequest) (string, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskExecuteRequest is the request provided when a task is ready to execute.
|
||||||
|
type TaskExecuteRequest struct {
|
||||||
|
// QueueName is the name of the queue this task belongs to.
|
||||||
|
QueueName string `json:"queueName"`
|
||||||
|
// TaskID is the unique identifier for this task.
|
||||||
|
TaskID string `json:"taskId"`
|
||||||
|
// Payload is the opaque data provided when the task was enqueued.
|
||||||
|
Payload []byte `json:"payload"`
|
||||||
|
// Attempt is the current attempt number (1-based: first attempt = 1).
|
||||||
|
Attempt int32 `json:"attempt"`
|
||||||
|
}
|
||||||
38
plugins/capabilities/taskworker.yaml
Normal file
38
plugins/capabilities/taskworker.yaml
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
version: v1-draft
|
||||||
|
exports:
|
||||||
|
nd_task_execute:
|
||||||
|
description: |-
|
||||||
|
OnTaskExecute is called when a queued task is ready to run.
|
||||||
|
The returned string is a status/result message stored in the tasks table.
|
||||||
|
Return an error to trigger retry (if retries are configured).
|
||||||
|
input:
|
||||||
|
$ref: '#/components/schemas/TaskExecuteRequest'
|
||||||
|
contentType: application/json
|
||||||
|
output:
|
||||||
|
type: string
|
||||||
|
contentType: application/json
|
||||||
|
components:
|
||||||
|
schemas:
|
||||||
|
TaskExecuteRequest:
|
||||||
|
description: TaskExecuteRequest is the request provided when a task is ready to execute.
|
||||||
|
properties:
|
||||||
|
queueName:
|
||||||
|
type: string
|
||||||
|
description: QueueName is the name of the queue this task belongs to.
|
||||||
|
taskId:
|
||||||
|
type: string
|
||||||
|
description: TaskID is the unique identifier for this task.
|
||||||
|
payload:
|
||||||
|
type: array
|
||||||
|
description: Payload is the opaque data provided when the task was enqueued.
|
||||||
|
items:
|
||||||
|
type: object
|
||||||
|
attempt:
|
||||||
|
type: integer
|
||||||
|
format: int32
|
||||||
|
description: 'Attempt is the current attempt number (1-based: first attempt = 1).'
|
||||||
|
required:
|
||||||
|
- queueName
|
||||||
|
- taskId
|
||||||
|
- payload
|
||||||
|
- attempt
|
||||||
@@ -282,9 +282,6 @@ type ServiceB interface {
|
|||||||
|
|
||||||
Entry("option pattern (value, exists bool)",
|
Entry("option pattern (value, exists bool)",
|
||||||
"config_service.go.txt", "config_client_expected.go.txt", "config_client_expected.py", "config_client_expected.rs"),
|
"config_service.go.txt", "config_client_expected.go.txt", "config_client_expected.py", "config_client_expected.rs"),
|
||||||
|
|
||||||
Entry("raw=true binary response",
|
|
||||||
"raw_service.go.txt", "raw_client_expected.go.txt", "raw_client_expected.py", "raw_client_expected.rs"),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
It("generates compilable client code for comprehensive service", func() {
|
It("generates compilable client code for comprehensive service", func() {
|
||||||
|
|||||||
@@ -256,6 +256,15 @@ func GenerateClientRust(svc Service) ([]byte, error) {
|
|||||||
return nil, fmt.Errorf("parsing template: %w", err)
|
return nil, fmt.Errorf("parsing template: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
partialContent, err := templatesFS.ReadFile("templates/base64_bytes.rs.tmpl")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("reading base64_bytes partial: %w", err)
|
||||||
|
}
|
||||||
|
tmpl, err = tmpl.Parse(string(partialContent))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parsing base64_bytes partial: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
data := templateData{
|
data := templateData{
|
||||||
Service: svc,
|
Service: svc,
|
||||||
}
|
}
|
||||||
@@ -622,6 +631,15 @@ func GenerateCapabilityRust(cap Capability) ([]byte, error) {
|
|||||||
return nil, fmt.Errorf("parsing template: %w", err)
|
return nil, fmt.Errorf("parsing template: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
partialContent, err := templatesFS.ReadFile("templates/base64_bytes.rs.tmpl")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("reading base64_bytes partial: %w", err)
|
||||||
|
}
|
||||||
|
tmpl, err = tmpl.Parse(string(partialContent))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parsing base64_bytes partial: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
data := capabilityTemplateData{
|
data := capabilityTemplateData{
|
||||||
Package: cap.Name,
|
Package: cap.Name,
|
||||||
Capability: cap,
|
Capability: cap,
|
||||||
|
|||||||
@@ -264,96 +264,6 @@ var _ = Describe("Generator", func() {
|
|||||||
Expect(codeStr).To(ContainSubstring(`extism "github.com/extism/go-sdk"`))
|
Expect(codeStr).To(ContainSubstring(`extism "github.com/extism/go-sdk"`))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should generate binary framing for raw=true methods", func() {
|
|
||||||
svc := Service{
|
|
||||||
Name: "Stream",
|
|
||||||
Permission: "stream",
|
|
||||||
Interface: "StreamService",
|
|
||||||
Methods: []Method{
|
|
||||||
{
|
|
||||||
Name: "GetStream",
|
|
||||||
HasError: true,
|
|
||||||
Raw: true,
|
|
||||||
Params: []Param{NewParam("uri", "string")},
|
|
||||||
Returns: []Param{
|
|
||||||
NewParam("contentType", "string"),
|
|
||||||
NewParam("data", "[]byte"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
code, err := GenerateHost(svc, "host")
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
_, err = format.Source(code)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
codeStr := string(code)
|
|
||||||
|
|
||||||
// Should include encoding/binary import for raw methods
|
|
||||||
Expect(codeStr).To(ContainSubstring(`"encoding/binary"`))
|
|
||||||
|
|
||||||
// Should NOT generate a response type for raw methods
|
|
||||||
Expect(codeStr).NotTo(ContainSubstring("type StreamGetStreamResponse struct"))
|
|
||||||
|
|
||||||
// Should generate request type (request is still JSON)
|
|
||||||
Expect(codeStr).To(ContainSubstring("type StreamGetStreamRequest struct"))
|
|
||||||
|
|
||||||
// Should build binary frame [0x00][4-byte CT len][CT][data]
|
|
||||||
Expect(codeStr).To(ContainSubstring("frame[0] = 0x00"))
|
|
||||||
Expect(codeStr).To(ContainSubstring("binary.BigEndian.PutUint32"))
|
|
||||||
|
|
||||||
// Should have writeRawError helper
|
|
||||||
Expect(codeStr).To(ContainSubstring("streamWriteRawError"))
|
|
||||||
|
|
||||||
// Should use writeRawError instead of writeError for raw methods
|
|
||||||
Expect(codeStr).To(ContainSubstring("streamWriteRawError(p, stack"))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should generate both writeError and writeRawError for mixed services", func() {
|
|
||||||
svc := Service{
|
|
||||||
Name: "API",
|
|
||||||
Permission: "api",
|
|
||||||
Interface: "APIService",
|
|
||||||
Methods: []Method{
|
|
||||||
{
|
|
||||||
Name: "Call",
|
|
||||||
HasError: true,
|
|
||||||
Params: []Param{NewParam("uri", "string")},
|
|
||||||
Returns: []Param{NewParam("response", "string")},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: "CallRaw",
|
|
||||||
HasError: true,
|
|
||||||
Raw: true,
|
|
||||||
Params: []Param{NewParam("uri", "string")},
|
|
||||||
Returns: []Param{
|
|
||||||
NewParam("contentType", "string"),
|
|
||||||
NewParam("data", "[]byte"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
code, err := GenerateHost(svc, "host")
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
_, err = format.Source(code)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
codeStr := string(code)
|
|
||||||
|
|
||||||
// Should have both helpers
|
|
||||||
Expect(codeStr).To(ContainSubstring("apiWriteResponse"))
|
|
||||||
Expect(codeStr).To(ContainSubstring("apiWriteError"))
|
|
||||||
Expect(codeStr).To(ContainSubstring("apiWriteRawError"))
|
|
||||||
|
|
||||||
// Should generate response type for non-raw method only
|
|
||||||
Expect(codeStr).To(ContainSubstring("type APICallResponse struct"))
|
|
||||||
Expect(codeStr).NotTo(ContainSubstring("type APICallRawResponse struct"))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should always include json import for JSON protocol", func() {
|
It("should always include json import for JSON protocol", func() {
|
||||||
// All services use JSON protocol, so json import is always needed
|
// All services use JSON protocol, so json import is always needed
|
||||||
svc := Service{
|
svc := Service{
|
||||||
@@ -717,49 +627,7 @@ var _ = Describe("Generator", func() {
|
|||||||
Expect(codeStr).To(ContainSubstring(`response.get("boolVal", False)`))
|
Expect(codeStr).To(ContainSubstring(`response.get("boolVal", False)`))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should generate binary frame parsing for raw methods", func() {
|
It("should not import base64 for non-byte services", func() {
|
||||||
svc := Service{
|
|
||||||
Name: "Stream",
|
|
||||||
Permission: "stream",
|
|
||||||
Interface: "StreamService",
|
|
||||||
Methods: []Method{
|
|
||||||
{
|
|
||||||
Name: "GetStream",
|
|
||||||
HasError: true,
|
|
||||||
Raw: true,
|
|
||||||
Params: []Param{NewParam("uri", "string")},
|
|
||||||
Returns: []Param{
|
|
||||||
NewParam("contentType", "string"),
|
|
||||||
NewParam("data", "[]byte"),
|
|
||||||
},
|
|
||||||
Doc: "GetStream returns raw binary stream data.",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
code, err := GenerateClientPython(svc)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
codeStr := string(code)
|
|
||||||
|
|
||||||
// Should import Tuple and struct for raw methods
|
|
||||||
Expect(codeStr).To(ContainSubstring("from typing import Any, Tuple"))
|
|
||||||
Expect(codeStr).To(ContainSubstring("import struct"))
|
|
||||||
|
|
||||||
// Should return Tuple[str, bytes]
|
|
||||||
Expect(codeStr).To(ContainSubstring("-> Tuple[str, bytes]:"))
|
|
||||||
|
|
||||||
// Should parse binary frame instead of JSON
|
|
||||||
Expect(codeStr).To(ContainSubstring("response_bytes = response_mem.bytes()"))
|
|
||||||
Expect(codeStr).To(ContainSubstring("response_bytes[0] == 0x01"))
|
|
||||||
Expect(codeStr).To(ContainSubstring("struct.unpack"))
|
|
||||||
Expect(codeStr).To(ContainSubstring("return content_type, data"))
|
|
||||||
|
|
||||||
// Should NOT use json.loads for response
|
|
||||||
Expect(codeStr).NotTo(ContainSubstring("json.loads(extism.memory.string(response_mem))"))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should not import Tuple or struct for non-raw services", func() {
|
|
||||||
svc := Service{
|
svc := Service{
|
||||||
Name: "Test",
|
Name: "Test",
|
||||||
Permission: "test",
|
Permission: "test",
|
||||||
@@ -779,8 +647,37 @@ var _ = Describe("Generator", func() {
|
|||||||
|
|
||||||
codeStr := string(code)
|
codeStr := string(code)
|
||||||
|
|
||||||
Expect(codeStr).NotTo(ContainSubstring("Tuple"))
|
Expect(codeStr).NotTo(ContainSubstring("import base64"))
|
||||||
Expect(codeStr).NotTo(ContainSubstring("import struct"))
|
})
|
||||||
|
|
||||||
|
It("should generate base64 encoding/decoding for byte fields", func() {
|
||||||
|
svc := Service{
|
||||||
|
Name: "Codec",
|
||||||
|
Permission: "codec",
|
||||||
|
Interface: "CodecService",
|
||||||
|
Methods: []Method{
|
||||||
|
{
|
||||||
|
Name: "Encode",
|
||||||
|
HasError: true,
|
||||||
|
Params: []Param{NewParam("data", "[]byte")},
|
||||||
|
Returns: []Param{NewParam("result", "[]byte")},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
code, err := GenerateClientPython(svc)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
codeStr := string(code)
|
||||||
|
|
||||||
|
// Should import base64
|
||||||
|
Expect(codeStr).To(ContainSubstring("import base64"))
|
||||||
|
|
||||||
|
// Should base64-encode byte params in request
|
||||||
|
Expect(codeStr).To(ContainSubstring(`base64.b64encode(data).decode("ascii")`))
|
||||||
|
|
||||||
|
// Should base64-decode byte returns in response
|
||||||
|
Expect(codeStr).To(ContainSubstring(`base64.b64decode(response.get("result", ""))`))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -939,46 +836,6 @@ var _ = Describe("Generator", func() {
|
|||||||
Expect(codeStr).To(ContainSubstring("github.com/navidrome/navidrome/plugins/pdk/go/pdk"))
|
Expect(codeStr).To(ContainSubstring("github.com/navidrome/navidrome/plugins/pdk/go/pdk"))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should include encoding/binary import for raw methods", func() {
|
|
||||||
svc := Service{
|
|
||||||
Name: "Stream",
|
|
||||||
Permission: "stream",
|
|
||||||
Interface: "StreamService",
|
|
||||||
Methods: []Method{
|
|
||||||
{
|
|
||||||
Name: "GetStream",
|
|
||||||
HasError: true,
|
|
||||||
Raw: true,
|
|
||||||
Params: []Param{NewParam("uri", "string")},
|
|
||||||
Returns: []Param{
|
|
||||||
NewParam("contentType", "string"),
|
|
||||||
NewParam("data", "[]byte"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
code, err := GenerateClientGo(svc, "host")
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
codeStr := string(code)
|
|
||||||
|
|
||||||
// Should include encoding/binary for raw binary frame parsing
|
|
||||||
Expect(codeStr).To(ContainSubstring(`"encoding/binary"`))
|
|
||||||
|
|
||||||
// Should NOT generate response type struct for raw methods
|
|
||||||
Expect(codeStr).NotTo(ContainSubstring("streamGetStreamResponse struct"))
|
|
||||||
|
|
||||||
// Should still generate request type
|
|
||||||
Expect(codeStr).To(ContainSubstring("streamGetStreamRequest struct"))
|
|
||||||
|
|
||||||
// Should parse binary frame
|
|
||||||
Expect(codeStr).To(ContainSubstring("responseBytes[0] == 0x01"))
|
|
||||||
Expect(codeStr).To(ContainSubstring("binary.BigEndian.Uint32"))
|
|
||||||
|
|
||||||
// Should return (string, []byte, error)
|
|
||||||
Expect(codeStr).To(ContainSubstring("func StreamGetStream(uri string) (string, []byte, error)"))
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
|
|
||||||
Describe("GenerateClientGoStub", func() {
|
Describe("GenerateClientGoStub", func() {
|
||||||
@@ -1748,22 +1605,17 @@ var _ = Describe("Rust Generation", func() {
|
|||||||
Expect(codeStr).NotTo(ContainSubstring("Option<bool>"))
|
Expect(codeStr).NotTo(ContainSubstring("Option<bool>"))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should generate raw extern C import and binary frame parsing for raw methods", func() {
|
It("should generate base64 serde for Vec<u8> fields", func() {
|
||||||
svc := Service{
|
svc := Service{
|
||||||
Name: "Stream",
|
Name: "Codec",
|
||||||
Permission: "stream",
|
Permission: "codec",
|
||||||
Interface: "StreamService",
|
Interface: "CodecService",
|
||||||
Methods: []Method{
|
Methods: []Method{
|
||||||
{
|
{
|
||||||
Name: "GetStream",
|
Name: "Encode",
|
||||||
HasError: true,
|
HasError: true,
|
||||||
Raw: true,
|
Params: []Param{NewParam("data", "[]byte")},
|
||||||
Params: []Param{NewParam("uri", "string")},
|
Returns: []Param{NewParam("result", "[]byte")},
|
||||||
Returns: []Param{
|
|
||||||
NewParam("contentType", "string"),
|
|
||||||
NewParam("data", "[]byte"),
|
|
||||||
},
|
|
||||||
Doc: "GetStream returns raw binary stream data.",
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -1773,24 +1625,36 @@ var _ = Describe("Rust Generation", func() {
|
|||||||
|
|
||||||
codeStr := string(code)
|
codeStr := string(code)
|
||||||
|
|
||||||
// Should use extern "C" with wasm_import_module for raw methods, not #[host_fn] extern "ExtismHost"
|
// Should generate base64_bytes serde module
|
||||||
Expect(codeStr).To(ContainSubstring(`#[link(wasm_import_module = "extism:host/user")]`))
|
Expect(codeStr).To(ContainSubstring("mod base64_bytes"))
|
||||||
Expect(codeStr).To(ContainSubstring(`extern "C"`))
|
Expect(codeStr).To(ContainSubstring("use base64::Engine as _"))
|
||||||
Expect(codeStr).To(ContainSubstring("fn stream_getstream(offset: u64) -> u64"))
|
|
||||||
|
|
||||||
// Should NOT generate response type for raw methods
|
// Should add serde(with = "base64_bytes") on Vec<u8> fields
|
||||||
Expect(codeStr).NotTo(ContainSubstring("StreamGetStreamResponse"))
|
Expect(codeStr).To(ContainSubstring(`#[serde(with = "base64_bytes")]`))
|
||||||
|
})
|
||||||
|
|
||||||
// Should generate request type (request is still JSON)
|
It("should not generate base64 module when no byte fields", func() {
|
||||||
Expect(codeStr).To(ContainSubstring("struct StreamGetStreamRequest"))
|
svc := Service{
|
||||||
|
Name: "Test",
|
||||||
|
Permission: "test",
|
||||||
|
Interface: "TestService",
|
||||||
|
Methods: []Method{
|
||||||
|
{
|
||||||
|
Name: "Call",
|
||||||
|
HasError: true,
|
||||||
|
Params: []Param{NewParam("uri", "string")},
|
||||||
|
Returns: []Param{NewParam("response", "string")},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// Should return Result<(String, Vec<u8>), Error>
|
code, err := GenerateClientRust(svc)
|
||||||
Expect(codeStr).To(ContainSubstring("Result<(String, Vec<u8>), Error>"))
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
// Should parse binary frame
|
codeStr := string(code)
|
||||||
Expect(codeStr).To(ContainSubstring("response_bytes[0] == 0x01"))
|
|
||||||
Expect(codeStr).To(ContainSubstring("u32::from_be_bytes"))
|
Expect(codeStr).NotTo(ContainSubstring("mod base64_bytes"))
|
||||||
Expect(codeStr).To(ContainSubstring("String::from_utf8_lossy"))
|
Expect(codeStr).NotTo(ContainSubstring("use base64"))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -761,7 +761,6 @@ func parseMethod(name string, funcType *ast.FuncType, annotation map[string]stri
|
|||||||
m := Method{
|
m := Method{
|
||||||
Name: name,
|
Name: name,
|
||||||
ExportName: annotation["name"],
|
ExportName: annotation["name"],
|
||||||
Raw: annotation["raw"] == "true",
|
|
||||||
Doc: doc,
|
Doc: doc,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -800,13 +799,6 @@ func parseMethod(name string, funcType *ast.FuncType, annotation map[string]stri
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate raw=true methods: must return exactly (string, []byte, error)
|
|
||||||
if m.Raw {
|
|
||||||
if !m.HasError || len(m.Returns) != 2 || m.Returns[0].Type != "string" || m.Returns[1].Type != "[]byte" {
|
|
||||||
return m, fmt.Errorf("raw=true method %s must return (string, []byte, error) — content-type, data, error", name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -122,119 +122,6 @@ type TestService interface {
|
|||||||
Expect(services[0].Methods[0].Name).To(Equal("Exported"))
|
Expect(services[0].Methods[0].Name).To(Equal("Exported"))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should parse raw=true annotation", func() {
|
|
||||||
src := `package host
|
|
||||||
|
|
||||||
import "context"
|
|
||||||
|
|
||||||
//nd:hostservice name=Stream permission=stream
|
|
||||||
type StreamService interface {
|
|
||||||
//nd:hostfunc raw=true
|
|
||||||
GetStream(ctx context.Context, uri string) (contentType string, data []byte, err error)
|
|
||||||
}
|
|
||||||
`
|
|
||||||
err := os.WriteFile(filepath.Join(tmpDir, "stream.go"), []byte(src), 0600)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
services, err := ParseDirectory(tmpDir)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(services).To(HaveLen(1))
|
|
||||||
|
|
||||||
m := services[0].Methods[0]
|
|
||||||
Expect(m.Name).To(Equal("GetStream"))
|
|
||||||
Expect(m.Raw).To(BeTrue())
|
|
||||||
Expect(m.HasError).To(BeTrue())
|
|
||||||
Expect(m.Returns).To(HaveLen(2))
|
|
||||||
Expect(m.Returns[0].Name).To(Equal("contentType"))
|
|
||||||
Expect(m.Returns[0].Type).To(Equal("string"))
|
|
||||||
Expect(m.Returns[1].Name).To(Equal("data"))
|
|
||||||
Expect(m.Returns[1].Type).To(Equal("[]byte"))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should set Raw=false when raw annotation is absent", func() {
|
|
||||||
src := `package host
|
|
||||||
|
|
||||||
import "context"
|
|
||||||
|
|
||||||
//nd:hostservice name=Test permission=test
|
|
||||||
type TestService interface {
|
|
||||||
//nd:hostfunc
|
|
||||||
Call(ctx context.Context, uri string) (response string, err error)
|
|
||||||
}
|
|
||||||
`
|
|
||||||
err := os.WriteFile(filepath.Join(tmpDir, "test.go"), []byte(src), 0600)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
services, err := ParseDirectory(tmpDir)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(services[0].Methods[0].Raw).To(BeFalse())
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should reject raw=true with invalid return signature", func() {
|
|
||||||
src := `package host
|
|
||||||
|
|
||||||
import "context"
|
|
||||||
|
|
||||||
//nd:hostservice name=Test permission=test
|
|
||||||
type TestService interface {
|
|
||||||
//nd:hostfunc raw=true
|
|
||||||
BadRaw(ctx context.Context, uri string) (result string, err error)
|
|
||||||
}
|
|
||||||
`
|
|
||||||
err := os.WriteFile(filepath.Join(tmpDir, "test.go"), []byte(src), 0600)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
_, err = ParseDirectory(tmpDir)
|
|
||||||
Expect(err).To(HaveOccurred())
|
|
||||||
Expect(err.Error()).To(ContainSubstring("raw=true"))
|
|
||||||
Expect(err.Error()).To(ContainSubstring("must return (string, []byte, error)"))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should reject raw=true without error return", func() {
|
|
||||||
src := `package host
|
|
||||||
|
|
||||||
import "context"
|
|
||||||
|
|
||||||
//nd:hostservice name=Test permission=test
|
|
||||||
type TestService interface {
|
|
||||||
//nd:hostfunc raw=true
|
|
||||||
BadRaw(ctx context.Context, uri string) (contentType string, data []byte)
|
|
||||||
}
|
|
||||||
`
|
|
||||||
err := os.WriteFile(filepath.Join(tmpDir, "test.go"), []byte(src), 0600)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
_, err = ParseDirectory(tmpDir)
|
|
||||||
Expect(err).To(HaveOccurred())
|
|
||||||
Expect(err.Error()).To(ContainSubstring("raw=true"))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should parse mixed raw and non-raw methods", func() {
|
|
||||||
src := `package host
|
|
||||||
|
|
||||||
import "context"
|
|
||||||
|
|
||||||
//nd:hostservice name=API permission=api
|
|
||||||
type APIService interface {
|
|
||||||
//nd:hostfunc
|
|
||||||
Call(ctx context.Context, uri string) (responseJSON string, err error)
|
|
||||||
|
|
||||||
//nd:hostfunc raw=true
|
|
||||||
CallRaw(ctx context.Context, uri string) (contentType string, data []byte, err error)
|
|
||||||
}
|
|
||||||
`
|
|
||||||
err := os.WriteFile(filepath.Join(tmpDir, "api.go"), []byte(src), 0600)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
services, err := ParseDirectory(tmpDir)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(services).To(HaveLen(1))
|
|
||||||
Expect(services[0].Methods).To(HaveLen(2))
|
|
||||||
Expect(services[0].Methods[0].Raw).To(BeFalse())
|
|
||||||
Expect(services[0].Methods[1].Raw).To(BeTrue())
|
|
||||||
Expect(services[0].HasRawMethods()).To(BeTrue())
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should handle custom export name", func() {
|
It("should handle custom export name", func() {
|
||||||
src := `package host
|
src := `package host
|
||||||
|
|
||||||
|
|||||||
25
plugins/cmd/ndpgen/internal/templates/base64_bytes.rs.tmpl
Normal file
25
plugins/cmd/ndpgen/internal/templates/base64_bytes.rs.tmpl
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
{{define "base64_bytes_module"}}
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
mod base64_bytes {
|
||||||
|
use serde::{self, Deserialize, Deserializer, Serializer};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
serializer.serialize_str(&BASE64.encode(bytes))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let s = String::deserialize(deserializer)?;
|
||||||
|
BASE64.decode(&s).map_err(serde::de::Error::custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
{{- end}}
|
||||||
@@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
|
|||||||
{{- if hasHashMap .Capability}}
|
{{- if hasHashMap .Capability}}
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
{{- end}}
|
{{- end}}
|
||||||
|
{{- if .Capability.HasByteFields}}{{template "base64_bytes_module" .}}{{- end}}
|
||||||
|
|
||||||
// Helper functions for skip_serializing_if with numeric types
|
// Helper functions for skip_serializing_if with numeric types
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
@@ -70,6 +71,9 @@ pub struct {{.Name}} {
|
|||||||
#[serde(default, skip_serializing_if = "{{skipSerializingFunc .Type}}")]
|
#[serde(default, skip_serializing_if = "{{skipSerializingFunc .Type}}")]
|
||||||
{{- else}}
|
{{- else}}
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
{{- end}}
|
||||||
|
{{- if .IsByteSlice}}
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
{{- end}}
|
{{- end}}
|
||||||
pub {{rustFieldName .Name}}: {{fieldRustType .}},
|
pub {{rustFieldName .Name}}: {{fieldRustType .}},
|
||||||
{{- end}}
|
{{- end}}
|
||||||
|
|||||||
@@ -8,9 +8,6 @@
|
|||||||
package {{.Package}}
|
package {{.Package}}
|
||||||
|
|
||||||
import (
|
import (
|
||||||
{{- if .Service.HasRawMethods}}
|
|
||||||
"encoding/binary"
|
|
||||||
{{- end}}
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
{{- if .Service.HasErrors}}
|
{{- if .Service.HasErrors}}
|
||||||
"errors"
|
"errors"
|
||||||
@@ -52,7 +49,7 @@ type {{requestType .}} struct {
|
|||||||
{{- end}}
|
{{- end}}
|
||||||
}
|
}
|
||||||
{{- end}}
|
{{- end}}
|
||||||
{{- if and (not .IsErrorOnly) (not .Raw)}}
|
{{- if not .IsErrorOnly}}
|
||||||
|
|
||||||
type {{responseType .}} struct {
|
type {{responseType .}} struct {
|
||||||
{{- range .Returns}}
|
{{- range .Returns}}
|
||||||
@@ -98,27 +95,7 @@ func {{$.Service.Name}}{{.Name}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{
|
|||||||
// Read the response from memory
|
// Read the response from memory
|
||||||
responseMem := pdk.FindMemory(responsePtr)
|
responseMem := pdk.FindMemory(responsePtr)
|
||||||
responseBytes := responseMem.ReadBytes()
|
responseBytes := responseMem.ReadBytes()
|
||||||
{{- if .Raw}}
|
{{- if .IsErrorOnly}}
|
||||||
|
|
||||||
// Parse binary-framed response
|
|
||||||
if len(responseBytes) == 0 {
|
|
||||||
return "", nil, errors.New("empty response from host")
|
|
||||||
}
|
|
||||||
if responseBytes[0] == 0x01 { // error
|
|
||||||
return "", nil, errors.New(string(responseBytes[1:]))
|
|
||||||
}
|
|
||||||
if responseBytes[0] != 0x00 {
|
|
||||||
return "", nil, errors.New("unknown response status")
|
|
||||||
}
|
|
||||||
if len(responseBytes) < 5 {
|
|
||||||
return "", nil, errors.New("malformed raw response: incomplete header")
|
|
||||||
}
|
|
||||||
ctLen := binary.BigEndian.Uint32(responseBytes[1:5])
|
|
||||||
if uint32(len(responseBytes)) < 5+ctLen {
|
|
||||||
return "", nil, errors.New("malformed raw response: content-type overflow")
|
|
||||||
}
|
|
||||||
return string(responseBytes[5 : 5+ctLen]), responseBytes[5+ctLen:], nil
|
|
||||||
{{- else if .IsErrorOnly}}
|
|
||||||
|
|
||||||
// Parse error-only response
|
// Parse error-only response
|
||||||
var response struct {
|
var response struct {
|
||||||
|
|||||||
@@ -8,12 +8,12 @@
|
|||||||
# main __init__.py file. Copy the needed functions from this file into your plugin.
|
# main __init__.py file. Copy the needed functions from this file into your plugin.
|
||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Any{{- if .Service.HasRawMethods}}, Tuple{{end}}
|
from typing import Any
|
||||||
|
|
||||||
import extism
|
import extism
|
||||||
import json
|
import json
|
||||||
{{- if .Service.HasRawMethods}}
|
{{- if .Service.HasByteFields}}
|
||||||
import struct
|
import base64
|
||||||
{{- end}}
|
{{- end}}
|
||||||
|
|
||||||
|
|
||||||
@@ -32,7 +32,7 @@ def _{{exportName .}}(offset: int) -> int:
|
|||||||
{{- end}}
|
{{- end}}
|
||||||
{{- /* Generate dataclasses for multi-value returns */ -}}
|
{{- /* Generate dataclasses for multi-value returns */ -}}
|
||||||
{{range .Service.Methods}}
|
{{range .Service.Methods}}
|
||||||
{{- if and .NeedsResultClass (not .Raw)}}
|
{{- if .NeedsResultClass}}
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -47,7 +47,7 @@ class {{pythonResultType .}}:
|
|||||||
{{range .Service.Methods}}
|
{{range .Service.Methods}}
|
||||||
|
|
||||||
|
|
||||||
def {{pythonFunc .}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{$p.PythonName}}: {{$p.PythonType}}{{end}}){{if .Raw}} -> Tuple[str, bytes]{{else if .NeedsResultClass}} -> {{pythonResultType .}}{{else if .HasReturns}} -> {{(index .Returns 0).PythonType}}{{else}} -> None{{end}}:
|
def {{pythonFunc .}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{$p.PythonName}}: {{$p.PythonType}}{{end}}){{if .NeedsResultClass}} -> {{pythonResultType .}}{{else if .HasReturns}} -> {{(index .Returns 0).PythonType}}{{else}} -> None{{end}}:
|
||||||
"""{{if .Doc}}{{.Doc}}{{else}}Call the {{exportName .}} host function.{{end}}
|
"""{{if .Doc}}{{.Doc}}{{else}}Call the {{exportName .}} host function.{{end}}
|
||||||
{{- if .HasParams}}
|
{{- if .HasParams}}
|
||||||
|
|
||||||
@@ -56,11 +56,7 @@ def {{pythonFunc .}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{$p.PythonNam
|
|||||||
{{.PythonName}}: {{.PythonType}} parameter.
|
{{.PythonName}}: {{.PythonType}} parameter.
|
||||||
{{- end}}
|
{{- end}}
|
||||||
{{- end}}
|
{{- end}}
|
||||||
{{- if .Raw}}
|
{{- if .HasReturns}}
|
||||||
|
|
||||||
Returns:
|
|
||||||
Tuple of (content_type, data) with the raw binary response.
|
|
||||||
{{- else if .HasReturns}}
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
{{- if .NeedsResultClass}}
|
{{- if .NeedsResultClass}}
|
||||||
@@ -76,7 +72,11 @@ def {{pythonFunc .}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{$p.PythonNam
|
|||||||
{{- if .HasParams}}
|
{{- if .HasParams}}
|
||||||
request = {
|
request = {
|
||||||
{{- range .Params}}
|
{{- range .Params}}
|
||||||
|
{{- if .IsByteSlice}}
|
||||||
|
"{{.JSONName}}": base64.b64encode({{.PythonName}}).decode("ascii"),
|
||||||
|
{{- else}}
|
||||||
"{{.JSONName}}": {{.PythonName}},
|
"{{.JSONName}}": {{.PythonName}},
|
||||||
|
{{- end}}
|
||||||
{{- end}}
|
{{- end}}
|
||||||
}
|
}
|
||||||
request_bytes = json.dumps(request).encode("utf-8")
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
@@ -86,24 +86,6 @@ def {{pythonFunc .}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{$p.PythonNam
|
|||||||
request_mem = extism.memory.alloc(request_bytes)
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
response_offset = _{{exportName .}}(request_mem.offset)
|
response_offset = _{{exportName .}}(request_mem.offset)
|
||||||
response_mem = extism.memory.find(response_offset)
|
response_mem = extism.memory.find(response_offset)
|
||||||
{{- if .Raw}}
|
|
||||||
response_bytes = response_mem.bytes()
|
|
||||||
|
|
||||||
if len(response_bytes) == 0:
|
|
||||||
raise HostFunctionError("empty response from host")
|
|
||||||
if response_bytes[0] == 0x01:
|
|
||||||
raise HostFunctionError(response_bytes[1:].decode("utf-8"))
|
|
||||||
if response_bytes[0] != 0x00:
|
|
||||||
raise HostFunctionError("unknown response status")
|
|
||||||
if len(response_bytes) < 5:
|
|
||||||
raise HostFunctionError("malformed raw response: incomplete header")
|
|
||||||
ct_len = struct.unpack(">I", response_bytes[1:5])[0]
|
|
||||||
if len(response_bytes) < 5 + ct_len:
|
|
||||||
raise HostFunctionError("malformed raw response: content-type overflow")
|
|
||||||
content_type = response_bytes[5:5 + ct_len].decode("utf-8")
|
|
||||||
data = response_bytes[5 + ct_len:]
|
|
||||||
return content_type, data
|
|
||||||
{{- else}}
|
|
||||||
response = json.loads(extism.memory.string(response_mem))
|
response = json.loads(extism.memory.string(response_mem))
|
||||||
{{if .HasError}}
|
{{if .HasError}}
|
||||||
if response.get("error"):
|
if response.get("error"):
|
||||||
@@ -112,10 +94,17 @@ def {{pythonFunc .}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{$p.PythonNam
|
|||||||
{{- if .NeedsResultClass}}
|
{{- if .NeedsResultClass}}
|
||||||
return {{pythonResultType .}}(
|
return {{pythonResultType .}}(
|
||||||
{{- range .Returns}}
|
{{- range .Returns}}
|
||||||
|
{{- if .IsByteSlice}}
|
||||||
|
{{.PythonName}}=base64.b64decode(response.get("{{.JSONName}}", "")),
|
||||||
|
{{- else}}
|
||||||
{{.PythonName}}=response.get("{{.JSONName}}"{{pythonDefault .}}),
|
{{.PythonName}}=response.get("{{.JSONName}}"{{pythonDefault .}}),
|
||||||
|
{{- end}}
|
||||||
{{- end}}
|
{{- end}}
|
||||||
)
|
)
|
||||||
{{- else if .HasReturns}}
|
{{- else if .HasReturns}}
|
||||||
|
{{- if (index .Returns 0).IsByteSlice}}
|
||||||
|
return base64.b64decode(response.get("{{(index .Returns 0).JSONName}}", ""))
|
||||||
|
{{- else}}
|
||||||
return response.get("{{(index .Returns 0).JSONName}}"{{pythonDefault (index .Returns 0)}})
|
return response.get("{{(index .Returns 0).JSONName}}"{{pythonDefault (index .Returns 0)}})
|
||||||
{{- end}}
|
{{- end}}
|
||||||
{{- end}}
|
{{- end}}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@
|
|||||||
|
|
||||||
use extism_pdk::*;
|
use extism_pdk::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
{{- if .Service.HasByteFields}}{{template "base64_bytes_module" .}}{{- end}}
|
||||||
{{- /* Generate struct definitions */ -}}
|
{{- /* Generate struct definitions */ -}}
|
||||||
{{- range .Service.Structs}}
|
{{- range .Service.Structs}}
|
||||||
{{if .Doc}}
|
{{if .Doc}}
|
||||||
@@ -16,6 +17,9 @@ pub struct {{.Name}} {
|
|||||||
{{- range .Fields}}
|
{{- range .Fields}}
|
||||||
{{- if .NeedsDefault}}
|
{{- if .NeedsDefault}}
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
{{- end}}
|
||||||
|
{{- if .IsByteSlice}}
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
{{- end}}
|
{{- end}}
|
||||||
pub {{.RustName}}: {{fieldRustType .}},
|
pub {{.RustName}}: {{fieldRustType .}},
|
||||||
{{- end}}
|
{{- end}}
|
||||||
@@ -29,17 +33,22 @@ pub struct {{.Name}} {
|
|||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct {{requestType .}} {
|
struct {{requestType .}} {
|
||||||
{{- range .Params}}
|
{{- range .Params}}
|
||||||
|
{{- if .IsByteSlice}}
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
|
{{- end}}
|
||||||
{{.RustName}}: {{rustType .}},
|
{{.RustName}}: {{rustType .}},
|
||||||
{{- end}}
|
{{- end}}
|
||||||
}
|
}
|
||||||
{{- end}}
|
{{- end}}
|
||||||
{{- if not .Raw}}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct {{responseType .}} {
|
struct {{responseType .}} {
|
||||||
{{- range .Returns}}
|
{{- range .Returns}}
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
{{- if .IsByteSlice}}
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
|
{{- end}}
|
||||||
{{.RustName}}: {{rustType .}},
|
{{.RustName}}: {{rustType .}},
|
||||||
{{- end}}
|
{{- end}}
|
||||||
{{- if .HasError}}
|
{{- if .HasError}}
|
||||||
@@ -48,92 +57,16 @@ struct {{responseType .}} {
|
|||||||
{{- end}}
|
{{- end}}
|
||||||
}
|
}
|
||||||
{{- end}}
|
{{- end}}
|
||||||
{{- end}}
|
|
||||||
|
|
||||||
#[host_fn]
|
#[host_fn]
|
||||||
extern "ExtismHost" {
|
extern "ExtismHost" {
|
||||||
{{- range .Service.Methods}}
|
{{- range .Service.Methods}}
|
||||||
{{- if not .Raw}}
|
|
||||||
fn {{exportName .}}(input: Json<{{if .HasParams}}{{requestType .}}{{else}}serde_json::Value{{end}}>) -> Json<{{responseType .}}>;
|
fn {{exportName .}}(input: Json<{{if .HasParams}}{{requestType .}}{{else}}serde_json::Value{{end}}>) -> Json<{{responseType .}}>;
|
||||||
{{- end}}
|
{{- end}}
|
||||||
{{- end}}
|
|
||||||
}
|
}
|
||||||
{{- /* Declare raw extern "C" imports for raw methods */ -}}
|
|
||||||
{{- range .Service.Methods}}
|
|
||||||
{{- if .Raw}}
|
|
||||||
|
|
||||||
#[link(wasm_import_module = "extism:host/user")]
|
|
||||||
extern "C" {
|
|
||||||
fn {{exportName .}}(offset: u64) -> u64;
|
|
||||||
}
|
|
||||||
{{- end}}
|
|
||||||
{{- end}}
|
|
||||||
|
|
||||||
{{- /* Generate wrapper functions */ -}}
|
{{- /* Generate wrapper functions */ -}}
|
||||||
{{range .Service.Methods}}
|
{{range .Service.Methods}}
|
||||||
{{- if .Raw}}
|
|
||||||
|
|
||||||
{{if .Doc}}{{rustDocComment .Doc}}{{else}}/// Calls the {{exportName .}} host function.{{end}}
|
|
||||||
{{- if .HasParams}}
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
{{- range .Params}}
|
|
||||||
/// * `{{.RustName}}` - {{rustType .}} parameter.
|
|
||||||
{{- end}}
|
|
||||||
{{- end}}
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// A tuple of (content_type, data) with the raw binary response.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns an error if the host function call fails.
|
|
||||||
pub fn {{rustFunc .}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{$p.RustName}}: {{rustParamType $p}}{{end}}) -> Result<(String, Vec<u8>), Error> {
|
|
||||||
{{- if .HasParams}}
|
|
||||||
let req = {{requestType .}} {
|
|
||||||
{{- range .Params}}
|
|
||||||
{{.RustName}}: {{.RustName}}{{if .NeedsToOwned}}.to_owned(){{end}},
|
|
||||||
{{- end}}
|
|
||||||
};
|
|
||||||
let input_bytes = serde_json::to_vec(&req).map_err(|e| Error::msg(e.to_string()))?;
|
|
||||||
{{- else}}
|
|
||||||
let input_bytes = b"{}".to_vec();
|
|
||||||
{{- end}}
|
|
||||||
let input_mem = Memory::from_bytes(&input_bytes).map_err(|e| Error::msg(e.to_string()))?;
|
|
||||||
|
|
||||||
let response_offset = unsafe { {{exportName .}}(input_mem.offset()) };
|
|
||||||
|
|
||||||
let response_mem = Memory::find(response_offset)
|
|
||||||
.ok_or_else(|| Error::msg("empty response from host"))?;
|
|
||||||
let response_bytes = response_mem.to_vec();
|
|
||||||
|
|
||||||
if response_bytes.is_empty() {
|
|
||||||
return Err(Error::msg("empty response from host"));
|
|
||||||
}
|
|
||||||
if response_bytes[0] == 0x01 {
|
|
||||||
let msg = String::from_utf8_lossy(&response_bytes[1..]).to_string();
|
|
||||||
return Err(Error::msg(msg));
|
|
||||||
}
|
|
||||||
if response_bytes[0] != 0x00 {
|
|
||||||
return Err(Error::msg("unknown response status"));
|
|
||||||
}
|
|
||||||
if response_bytes.len() < 5 {
|
|
||||||
return Err(Error::msg("malformed raw response: incomplete header"));
|
|
||||||
}
|
|
||||||
let ct_len = u32::from_be_bytes([
|
|
||||||
response_bytes[1],
|
|
||||||
response_bytes[2],
|
|
||||||
response_bytes[3],
|
|
||||||
response_bytes[4],
|
|
||||||
]) as usize;
|
|
||||||
if ct_len > response_bytes.len() - 5 {
|
|
||||||
return Err(Error::msg("malformed raw response: content-type overflow"));
|
|
||||||
}
|
|
||||||
let ct_end = 5 + ct_len;
|
|
||||||
let content_type = String::from_utf8_lossy(&response_bytes[5..ct_end]).to_string();
|
|
||||||
let data = response_bytes[ct_end..].to_vec();
|
|
||||||
Ok((content_type, data))
|
|
||||||
}
|
|
||||||
{{- else}}
|
|
||||||
|
|
||||||
{{if .Doc}}{{rustDocComment .Doc}}{{else}}/// Calls the {{exportName .}} host function.{{end}}
|
{{if .Doc}}{{rustDocComment .Doc}}{{else}}/// Calls the {{exportName .}} host function.{{end}}
|
||||||
{{- if .HasParams}}
|
{{- if .HasParams}}
|
||||||
@@ -209,4 +142,3 @@ pub fn {{rustFunc .}}({{range $i, $p := .Params}}{{if $i}}, {{end}}{{$p.RustName
|
|||||||
}
|
}
|
||||||
{{- end}}
|
{{- end}}
|
||||||
{{- end}}
|
{{- end}}
|
||||||
{{- end}}
|
|
||||||
|
|||||||
@@ -4,9 +4,6 @@ package {{.Package}}
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
{{- if .Service.HasRawMethods}}
|
|
||||||
"encoding/binary"
|
|
||||||
{{- end}}
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
extism "github.com/extism/go-sdk"
|
extism "github.com/extism/go-sdk"
|
||||||
@@ -23,7 +20,6 @@ type {{requestType .}} struct {
|
|||||||
{{- end}}
|
{{- end}}
|
||||||
}
|
}
|
||||||
{{- end}}
|
{{- end}}
|
||||||
{{- if not .Raw}}
|
|
||||||
|
|
||||||
// {{responseType .}} is the response type for {{$.Service.Name}}.{{.Name}}.
|
// {{responseType .}} is the response type for {{$.Service.Name}}.{{.Name}}.
|
||||||
type {{responseType .}} struct {
|
type {{responseType .}} struct {
|
||||||
@@ -34,7 +30,6 @@ type {{responseType .}} struct {
|
|||||||
Error string `json:"error,omitempty"`
|
Error string `json:"error,omitempty"`
|
||||||
{{- end}}
|
{{- end}}
|
||||||
}
|
}
|
||||||
{{- end}}
|
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
// Register{{.Service.Name}}HostFunctions registers {{.Service.Name}} service host functions.
|
// Register{{.Service.Name}}HostFunctions registers {{.Service.Name}} service host functions.
|
||||||
@@ -56,48 +51,18 @@ func new{{$.Service.Name}}{{.Name}}HostFunction(service {{$.Service.Interface}})
|
|||||||
// Read JSON request from plugin memory
|
// Read JSON request from plugin memory
|
||||||
reqBytes, err := p.ReadBytes(stack[0])
|
reqBytes, err := p.ReadBytes(stack[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
{{- if .Raw}}
|
|
||||||
{{$.Service.Name | lower}}WriteRawError(p, stack, err)
|
|
||||||
{{- else}}
|
|
||||||
{{$.Service.Name | lower}}WriteError(p, stack, err)
|
{{$.Service.Name | lower}}WriteError(p, stack, err)
|
||||||
{{- end}}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var req {{requestType .}}
|
var req {{requestType .}}
|
||||||
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||||
{{- if .Raw}}
|
|
||||||
{{$.Service.Name | lower}}WriteRawError(p, stack, err)
|
|
||||||
{{- else}}
|
|
||||||
{{$.Service.Name | lower}}WriteError(p, stack, err)
|
{{$.Service.Name | lower}}WriteError(p, stack, err)
|
||||||
{{- end}}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
{{- end}}
|
{{- end}}
|
||||||
|
|
||||||
// Call the service method
|
// Call the service method
|
||||||
{{- if .Raw}}
|
{{- if .HasReturns}}
|
||||||
{{range $i, $r := .Returns}}{{if $i}}, {{end}}{{lower $r.Name}}{{end}}, svcErr := service.{{.Name}}(ctx{{range .Params}}, req.{{title .Name}}{{end}})
|
|
||||||
if svcErr != nil {
|
|
||||||
{{$.Service.Name | lower}}WriteRawError(p, stack, svcErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write binary-framed response to plugin memory:
|
|
||||||
// [0x00][4-byte content-type length (big-endian)][content-type string][raw data]
|
|
||||||
ctBytes := []byte({{lower (index .Returns 0).Name}})
|
|
||||||
frame := make([]byte, 1+4+len(ctBytes)+len({{lower (index .Returns 1).Name}}))
|
|
||||||
frame[0] = 0x00 // success
|
|
||||||
binary.BigEndian.PutUint32(frame[1:5], uint32(len(ctBytes)))
|
|
||||||
copy(frame[5:5+len(ctBytes)], ctBytes)
|
|
||||||
copy(frame[5+len(ctBytes):], {{lower (index .Returns 1).Name}})
|
|
||||||
|
|
||||||
respPtr, err := p.WriteBytes(frame)
|
|
||||||
if err != nil {
|
|
||||||
stack[0] = 0
|
|
||||||
return
|
|
||||||
}
|
|
||||||
stack[0] = respPtr
|
|
||||||
{{- else if .HasReturns}}
|
|
||||||
{{- if .HasError}}
|
{{- if .HasError}}
|
||||||
{{range $i, $r := .Returns}}{{if $i}}, {{end}}{{lower $r.Name}}{{end}}, svcErr := service.{{.Name}}(ctx{{range .Params}}, req.{{title .Name}}{{end}})
|
{{range $i, $r := .Returns}}{{if $i}}, {{end}}{{lower $r.Name}}{{end}}, svcErr := service.{{.Name}}(ctx{{range .Params}}, req.{{title .Name}}{{end}})
|
||||||
if svcErr != nil {
|
if svcErr != nil {
|
||||||
@@ -162,16 +127,3 @@ func {{.Service.Name | lower}}WriteError(p *extism.CurrentPlugin, stack []uint64
|
|||||||
respPtr, _ := p.WriteBytes(respBytes)
|
respPtr, _ := p.WriteBytes(respBytes)
|
||||||
stack[0] = respPtr
|
stack[0] = respPtr
|
||||||
}
|
}
|
||||||
{{- if .Service.HasRawMethods}}
|
|
||||||
|
|
||||||
// {{.Service.Name | lower}}WriteRawError writes a binary-framed error response to plugin memory.
|
|
||||||
// Format: [0x01][UTF-8 error message]
|
|
||||||
func {{.Service.Name | lower}}WriteRawError(p *extism.CurrentPlugin, stack []uint64, err error) {
|
|
||||||
errMsg := []byte(err.Error())
|
|
||||||
frame := make([]byte, 1+len(errMsg))
|
|
||||||
frame[0] = 0x01 // error
|
|
||||||
copy(frame[1:], errMsg)
|
|
||||||
respPtr, _ := p.WriteBytes(frame)
|
|
||||||
stack[0] = respPtr
|
|
||||||
}
|
|
||||||
{{- end}}
|
|
||||||
|
|||||||
@@ -173,16 +173,6 @@ func (s Service) HasErrors() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasRawMethods returns true if any method in the service uses raw binary framing.
|
|
||||||
func (s Service) HasRawMethods() bool {
|
|
||||||
for _, m := range s.Methods {
|
|
||||||
if m.Raw {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Method represents a host function method within a service.
|
// Method represents a host function method within a service.
|
||||||
type Method struct {
|
type Method struct {
|
||||||
Name string // Go method name (e.g., "Call")
|
Name string // Go method name (e.g., "Call")
|
||||||
@@ -191,7 +181,6 @@ type Method struct {
|
|||||||
Returns []Param // Return values (excluding error)
|
Returns []Param // Return values (excluding error)
|
||||||
HasError bool // Whether the method returns an error
|
HasError bool // Whether the method returns an error
|
||||||
Doc string // Documentation comment for the method
|
Doc string // Documentation comment for the method
|
||||||
Raw bool // If true, response uses binary framing instead of JSON
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// FunctionName returns the Extism host function export name.
|
// FunctionName returns the Extism host function export name.
|
||||||
@@ -343,6 +332,52 @@ type Param struct {
|
|||||||
JSONName string // JSON field name (camelCase)
|
JSONName string // JSON field name (camelCase)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsByteSlice returns true if the parameter type is []byte.
|
||||||
|
func (p Param) IsByteSlice() bool {
|
||||||
|
return p.Type == "[]byte"
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsByteSlice returns true if the field type is []byte.
|
||||||
|
func (f FieldDef) IsByteSlice() bool {
|
||||||
|
return f.Type == "[]byte"
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasByteFields returns true if any method params, returns, or struct fields use []byte.
|
||||||
|
func (s Service) HasByteFields() bool {
|
||||||
|
for _, m := range s.Methods {
|
||||||
|
for _, p := range m.Params {
|
||||||
|
if p.IsByteSlice() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, r := range m.Returns {
|
||||||
|
if r.IsByteSlice() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, st := range s.Structs {
|
||||||
|
for _, f := range st.Fields {
|
||||||
|
if f.IsByteSlice() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasByteFields returns true if any capability struct fields use []byte.
|
||||||
|
func (c Capability) HasByteFields() bool {
|
||||||
|
for _, st := range c.Structs {
|
||||||
|
for _, f := range st.Fields {
|
||||||
|
if f.IsByteSlice() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// NewParam creates a Param with auto-generated JSON name.
|
// NewParam creates a Param with auto-generated JSON name.
|
||||||
func NewParam(name, typ string) Param {
|
func NewParam(name, typ string) Param {
|
||||||
return Param{
|
return Param{
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ from typing import Any
|
|||||||
|
|
||||||
import extism
|
import extism
|
||||||
import json
|
import json
|
||||||
|
import base64
|
||||||
|
|
||||||
|
|
||||||
class HostFunctionError(Exception):
|
class HostFunctionError(Exception):
|
||||||
@@ -38,7 +39,7 @@ def codec_encode(data: bytes) -> bytes:
|
|||||||
HostFunctionError: If the host function returns an error.
|
HostFunctionError: If the host function returns an error.
|
||||||
"""
|
"""
|
||||||
request = {
|
request = {
|
||||||
"data": data,
|
"data": base64.b64encode(data).decode("ascii"),
|
||||||
}
|
}
|
||||||
request_bytes = json.dumps(request).encode("utf-8")
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
request_mem = extism.memory.alloc(request_bytes)
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
@@ -49,4 +50,4 @@ def codec_encode(data: bytes) -> bytes:
|
|||||||
if response.get("error"):
|
if response.get("error"):
|
||||||
raise HostFunctionError(response["error"])
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
return response.get("result", b"")
|
return base64.b64decode(response.get("result", ""))
|
||||||
|
|||||||
@@ -5,10 +5,34 @@
|
|||||||
|
|
||||||
use extism_pdk::*;
|
use extism_pdk::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
mod base64_bytes {
|
||||||
|
use serde::{self, Deserialize, Deserializer, Serializer};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
serializer.serialize_str(&BASE64.encode(bytes))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let s = String::deserialize(deserializer)?;
|
||||||
|
BASE64.decode(&s).map_err(serde::de::Error::custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct CodecEncodeRequest {
|
struct CodecEncodeRequest {
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
data: Vec<u8>,
|
data: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -16,6 +40,7 @@ struct CodecEncodeRequest {
|
|||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct CodecEncodeResponse {
|
struct CodecEncodeResponse {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
result: Vec<u8>,
|
result: Vec<u8>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
error: Option<String>,
|
error: Option<String>,
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ from typing import Any
|
|||||||
|
|
||||||
import extism
|
import extism
|
||||||
import json
|
import json
|
||||||
|
import base64
|
||||||
|
|
||||||
|
|
||||||
class HostFunctionError(Exception):
|
class HostFunctionError(Exception):
|
||||||
@@ -327,7 +328,7 @@ def comprehensive_byte_slice(data: bytes) -> bytes:
|
|||||||
HostFunctionError: If the host function returns an error.
|
HostFunctionError: If the host function returns an error.
|
||||||
"""
|
"""
|
||||||
request = {
|
request = {
|
||||||
"data": data,
|
"data": base64.b64encode(data).decode("ascii"),
|
||||||
}
|
}
|
||||||
request_bytes = json.dumps(request).encode("utf-8")
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
request_mem = extism.memory.alloc(request_bytes)
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
@@ -338,4 +339,4 @@ def comprehensive_byte_slice(data: bytes) -> bytes:
|
|||||||
if response.get("error"):
|
if response.get("error"):
|
||||||
raise HostFunctionError(response["error"])
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
return response.get("result", b"")
|
return base64.b64decode(response.get("result", ""))
|
||||||
|
|||||||
@@ -5,6 +5,29 @@
|
|||||||
|
|
||||||
use extism_pdk::*;
|
use extism_pdk::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
mod base64_bytes {
|
||||||
|
use serde::{self, Deserialize, Deserializer, Serializer};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
serializer.serialize_str(&BASE64.encode(bytes))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let s = String::deserialize(deserializer)?;
|
||||||
|
BASE64.decode(&s).map_err(serde::de::Error::custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
@@ -144,6 +167,7 @@ struct ComprehensiveMultipleReturnsResponse {
|
|||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct ComprehensiveByteSliceRequest {
|
struct ComprehensiveByteSliceRequest {
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
data: Vec<u8>,
|
data: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -151,6 +175,7 @@ struct ComprehensiveByteSliceRequest {
|
|||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct ComprehensiveByteSliceResponse {
|
struct ComprehensiveByteSliceResponse {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
result: Vec<u8>,
|
result: Vec<u8>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
error: Option<String>,
|
error: Option<String>,
|
||||||
|
|||||||
@@ -1,66 +0,0 @@
|
|||||||
// Code generated by ndpgen. DO NOT EDIT.
|
|
||||||
//
|
|
||||||
// This file contains client wrappers for the Stream host service.
|
|
||||||
// It is intended for use in Navidrome plugins built with TinyGo.
|
|
||||||
//
|
|
||||||
//go:build wasip1
|
|
||||||
|
|
||||||
package ndpdk
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/binary"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
|
|
||||||
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
|
||||||
)
|
|
||||||
|
|
||||||
// stream_getstream is the host function provided by Navidrome.
|
|
||||||
//
|
|
||||||
//go:wasmimport extism:host/user stream_getstream
|
|
||||||
func stream_getstream(uint64) uint64
|
|
||||||
|
|
||||||
type streamGetStreamRequest struct {
|
|
||||||
Uri string `json:"uri"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// StreamGetStream calls the stream_getstream host function.
|
|
||||||
// GetStream returns raw binary stream data with content type.
|
|
||||||
func StreamGetStream(uri string) (string, []byte, error) {
|
|
||||||
// Marshal request to JSON
|
|
||||||
req := streamGetStreamRequest{
|
|
||||||
Uri: uri,
|
|
||||||
}
|
|
||||||
reqBytes, err := json.Marshal(req)
|
|
||||||
if err != nil {
|
|
||||||
return "", nil, err
|
|
||||||
}
|
|
||||||
reqMem := pdk.AllocateBytes(reqBytes)
|
|
||||||
defer reqMem.Free()
|
|
||||||
|
|
||||||
// Call the host function
|
|
||||||
responsePtr := stream_getstream(reqMem.Offset())
|
|
||||||
|
|
||||||
// Read the response from memory
|
|
||||||
responseMem := pdk.FindMemory(responsePtr)
|
|
||||||
responseBytes := responseMem.ReadBytes()
|
|
||||||
|
|
||||||
// Parse binary-framed response
|
|
||||||
if len(responseBytes) == 0 {
|
|
||||||
return "", nil, errors.New("empty response from host")
|
|
||||||
}
|
|
||||||
if responseBytes[0] == 0x01 { // error
|
|
||||||
return "", nil, errors.New(string(responseBytes[1:]))
|
|
||||||
}
|
|
||||||
if responseBytes[0] != 0x00 {
|
|
||||||
return "", nil, errors.New("unknown response status")
|
|
||||||
}
|
|
||||||
if len(responseBytes) < 5 {
|
|
||||||
return "", nil, errors.New("malformed raw response: incomplete header")
|
|
||||||
}
|
|
||||||
ctLen := binary.BigEndian.Uint32(responseBytes[1:5])
|
|
||||||
if uint32(len(responseBytes)) < 5+ctLen {
|
|
||||||
return "", nil, errors.New("malformed raw response: content-type overflow")
|
|
||||||
}
|
|
||||||
return string(responseBytes[5 : 5+ctLen]), responseBytes[5+ctLen:], nil
|
|
||||||
}
|
|
||||||
@@ -1,63 +0,0 @@
|
|||||||
# Code generated by ndpgen. DO NOT EDIT.
|
|
||||||
#
|
|
||||||
# This file contains client wrappers for the Stream host service.
|
|
||||||
# It is intended for use in Navidrome plugins built with extism-py.
|
|
||||||
#
|
|
||||||
# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly.
|
|
||||||
# The @extism.import_fn decorators are only detected when defined in the plugin's
|
|
||||||
# main __init__.py file. Copy the needed functions from this file into your plugin.
|
|
||||||
|
|
||||||
from dataclasses import dataclass
|
|
||||||
from typing import Any, Tuple
|
|
||||||
|
|
||||||
import extism
|
|
||||||
import json
|
|
||||||
import struct
|
|
||||||
|
|
||||||
|
|
||||||
class HostFunctionError(Exception):
|
|
||||||
"""Raised when a host function returns an error."""
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
@extism.import_fn("extism:host/user", "stream_getstream")
|
|
||||||
def _stream_getstream(offset: int) -> int:
|
|
||||||
"""Raw host function - do not call directly."""
|
|
||||||
...
|
|
||||||
|
|
||||||
|
|
||||||
def stream_get_stream(uri: str) -> Tuple[str, bytes]:
|
|
||||||
"""GetStream returns raw binary stream data with content type.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
uri: str parameter.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Tuple of (content_type, data) with the raw binary response.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
HostFunctionError: If the host function returns an error.
|
|
||||||
"""
|
|
||||||
request = {
|
|
||||||
"uri": uri,
|
|
||||||
}
|
|
||||||
request_bytes = json.dumps(request).encode("utf-8")
|
|
||||||
request_mem = extism.memory.alloc(request_bytes)
|
|
||||||
response_offset = _stream_getstream(request_mem.offset)
|
|
||||||
response_mem = extism.memory.find(response_offset)
|
|
||||||
response_bytes = response_mem.bytes()
|
|
||||||
|
|
||||||
if len(response_bytes) == 0:
|
|
||||||
raise HostFunctionError("empty response from host")
|
|
||||||
if response_bytes[0] == 0x01:
|
|
||||||
raise HostFunctionError(response_bytes[1:].decode("utf-8"))
|
|
||||||
if response_bytes[0] != 0x00:
|
|
||||||
raise HostFunctionError("unknown response status")
|
|
||||||
if len(response_bytes) < 5:
|
|
||||||
raise HostFunctionError("malformed raw response: incomplete header")
|
|
||||||
ct_len = struct.unpack(">I", response_bytes[1:5])[0]
|
|
||||||
if len(response_bytes) < 5 + ct_len:
|
|
||||||
raise HostFunctionError("malformed raw response: content-type overflow")
|
|
||||||
content_type = response_bytes[5:5 + ct_len].decode("utf-8")
|
|
||||||
data = response_bytes[5 + ct_len:]
|
|
||||||
return content_type, data
|
|
||||||
@@ -1,73 +0,0 @@
|
|||||||
// Code generated by ndpgen. DO NOT EDIT.
|
|
||||||
//
|
|
||||||
// This file contains client wrappers for the Stream host service.
|
|
||||||
// It is intended for use in Navidrome plugins built with extism-pdk.
|
|
||||||
|
|
||||||
use extism_pdk::*;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
struct StreamGetStreamRequest {
|
|
||||||
uri: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[host_fn]
|
|
||||||
extern "ExtismHost" {
|
|
||||||
}
|
|
||||||
|
|
||||||
#[link(wasm_import_module = "extism:host/user")]
|
|
||||||
extern "C" {
|
|
||||||
fn stream_getstream(offset: u64) -> u64;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// GetStream returns raw binary stream data with content type.
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
/// * `uri` - String parameter.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// A tuple of (content_type, data) with the raw binary response.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns an error if the host function call fails.
|
|
||||||
pub fn get_stream(uri: &str) -> Result<(String, Vec<u8>), Error> {
|
|
||||||
let req = StreamGetStreamRequest {
|
|
||||||
uri: uri.to_owned(),
|
|
||||||
};
|
|
||||||
let input_bytes = serde_json::to_vec(&req).map_err(|e| Error::msg(e.to_string()))?;
|
|
||||||
let input_mem = Memory::from_bytes(&input_bytes).map_err(|e| Error::msg(e.to_string()))?;
|
|
||||||
|
|
||||||
let response_offset = unsafe { stream_getstream(input_mem.offset()) };
|
|
||||||
|
|
||||||
let response_mem = Memory::find(response_offset)
|
|
||||||
.ok_or_else(|| Error::msg("empty response from host"))?;
|
|
||||||
let response_bytes = response_mem.to_vec();
|
|
||||||
|
|
||||||
if response_bytes.is_empty() {
|
|
||||||
return Err(Error::msg("empty response from host"));
|
|
||||||
}
|
|
||||||
if response_bytes[0] == 0x01 {
|
|
||||||
let msg = String::from_utf8_lossy(&response_bytes[1..]).to_string();
|
|
||||||
return Err(Error::msg(msg));
|
|
||||||
}
|
|
||||||
if response_bytes[0] != 0x00 {
|
|
||||||
return Err(Error::msg("unknown response status"));
|
|
||||||
}
|
|
||||||
if response_bytes.len() < 5 {
|
|
||||||
return Err(Error::msg("malformed raw response: incomplete header"));
|
|
||||||
}
|
|
||||||
let ct_len = u32::from_be_bytes([
|
|
||||||
response_bytes[1],
|
|
||||||
response_bytes[2],
|
|
||||||
response_bytes[3],
|
|
||||||
response_bytes[4],
|
|
||||||
]) as usize;
|
|
||||||
if ct_len > response_bytes.len() - 5 {
|
|
||||||
return Err(Error::msg("malformed raw response: content-type overflow"));
|
|
||||||
}
|
|
||||||
let ct_end = 5 + ct_len;
|
|
||||||
let content_type = String::from_utf8_lossy(&response_bytes[5..ct_end]).to_string();
|
|
||||||
let data = response_bytes[ct_end..].to_vec();
|
|
||||||
Ok((content_type, data))
|
|
||||||
}
|
|
||||||
10
plugins/cmd/ndpgen/testdata/raw_service.go.txt
vendored
10
plugins/cmd/ndpgen/testdata/raw_service.go.txt
vendored
@@ -1,10 +0,0 @@
|
|||||||
package testpkg
|
|
||||||
|
|
||||||
import "context"
|
|
||||||
|
|
||||||
//nd:hostservice name=Stream permission=stream
|
|
||||||
type StreamService interface {
|
|
||||||
// GetStream returns raw binary stream data with content type.
|
|
||||||
//nd:hostfunc raw=true
|
|
||||||
GetStream(ctx context.Context, uri string) (contentType string, data []byte, err error)
|
|
||||||
}
|
|
||||||
@@ -17,8 +17,8 @@ type SubsonicAPIService interface {
|
|||||||
Call(ctx context.Context, uri string) (responseJSON string, err error)
|
Call(ctx context.Context, uri string) (responseJSON string, err error)
|
||||||
|
|
||||||
// CallRaw executes a Subsonic API request and returns the raw binary response.
|
// CallRaw executes a Subsonic API request and returns the raw binary response.
|
||||||
// Optimized for binary endpoints like getCoverArt and stream that return
|
// Designed for binary endpoints like getCoverArt and stream that return
|
||||||
// non-JSON data. The response is returned as raw bytes without JSON encoding overhead.
|
// non-JSON data. The data is base64-encoded over JSON on the wire.
|
||||||
//nd:hostfunc raw=true
|
//nd:hostfunc
|
||||||
CallRaw(ctx context.Context, uri string) (contentType string, data []byte, err error)
|
CallRaw(ctx context.Context, uri string) (contentType string, data []byte, err error)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ package host
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
extism "github.com/extism/go-sdk"
|
extism "github.com/extism/go-sdk"
|
||||||
@@ -26,6 +25,13 @@ type SubsonicAPICallRawRequest struct {
|
|||||||
Uri string `json:"uri"`
|
Uri string `json:"uri"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SubsonicAPICallRawResponse is the response type for SubsonicAPI.CallRaw.
|
||||||
|
type SubsonicAPICallRawResponse struct {
|
||||||
|
ContentType string `json:"contentType,omitempty"`
|
||||||
|
Data []byte `json:"data,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
// RegisterSubsonicAPIHostFunctions registers SubsonicAPI service host functions.
|
// RegisterSubsonicAPIHostFunctions registers SubsonicAPI service host functions.
|
||||||
// The returned host functions should be added to the plugin's configuration.
|
// The returned host functions should be added to the plugin's configuration.
|
||||||
func RegisterSubsonicAPIHostFunctions(service SubsonicAPIService) []extism.HostFunction {
|
func RegisterSubsonicAPIHostFunctions(service SubsonicAPIService) []extism.HostFunction {
|
||||||
@@ -76,37 +82,28 @@ func newSubsonicAPICallRawHostFunction(service SubsonicAPIService) extism.HostFu
|
|||||||
// Read JSON request from plugin memory
|
// Read JSON request from plugin memory
|
||||||
reqBytes, err := p.ReadBytes(stack[0])
|
reqBytes, err := p.ReadBytes(stack[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
subsonicapiWriteRawError(p, stack, err)
|
subsonicapiWriteError(p, stack, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var req SubsonicAPICallRawRequest
|
var req SubsonicAPICallRawRequest
|
||||||
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||||
subsonicapiWriteRawError(p, stack, err)
|
subsonicapiWriteError(p, stack, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call the service method
|
// Call the service method
|
||||||
contenttype, data, svcErr := service.CallRaw(ctx, req.Uri)
|
contenttype, data, svcErr := service.CallRaw(ctx, req.Uri)
|
||||||
if svcErr != nil {
|
if svcErr != nil {
|
||||||
subsonicapiWriteRawError(p, stack, svcErr)
|
subsonicapiWriteError(p, stack, svcErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write binary-framed response to plugin memory:
|
// Write JSON response to plugin memory
|
||||||
// [0x00][4-byte content-type length (big-endian)][content-type string][raw data]
|
resp := SubsonicAPICallRawResponse{
|
||||||
ctBytes := []byte(contenttype)
|
ContentType: contenttype,
|
||||||
frame := make([]byte, 1+4+len(ctBytes)+len(data))
|
Data: data,
|
||||||
frame[0] = 0x00 // success
|
|
||||||
binary.BigEndian.PutUint32(frame[1:5], uint32(len(ctBytes)))
|
|
||||||
copy(frame[5:5+len(ctBytes)], ctBytes)
|
|
||||||
copy(frame[5+len(ctBytes):], data)
|
|
||||||
|
|
||||||
respPtr, err := p.WriteBytes(frame)
|
|
||||||
if err != nil {
|
|
||||||
stack[0] = 0
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
stack[0] = respPtr
|
subsonicapiWriteResponse(p, stack, resp)
|
||||||
},
|
},
|
||||||
[]extism.ValueType{extism.ValueTypePTR},
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
[]extism.ValueType{extism.ValueTypePTR},
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
@@ -137,14 +134,3 @@ func subsonicapiWriteError(p *extism.CurrentPlugin, stack []uint64, err error) {
|
|||||||
respPtr, _ := p.WriteBytes(respBytes)
|
respPtr, _ := p.WriteBytes(respBytes)
|
||||||
stack[0] = respPtr
|
stack[0] = respPtr
|
||||||
}
|
}
|
||||||
|
|
||||||
// subsonicapiWriteRawError writes a binary-framed error response to plugin memory.
|
|
||||||
// Format: [0x01][UTF-8 error message]
|
|
||||||
func subsonicapiWriteRawError(p *extism.CurrentPlugin, stack []uint64, err error) {
|
|
||||||
errMsg := []byte(err.Error())
|
|
||||||
frame := make([]byte, 1+len(errMsg))
|
|
||||||
frame[0] = 0x01 // error
|
|
||||||
copy(frame[1:], errMsg)
|
|
||||||
respPtr, _ := p.WriteBytes(frame)
|
|
||||||
stack[0] = respPtr
|
|
||||||
}
|
|
||||||
|
|||||||
68
plugins/host/task.go
Normal file
68
plugins/host/task.go
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
package host
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
// TaskInfo holds the current state of a task.
|
||||||
|
type TaskInfo struct {
|
||||||
|
// Status is the current task status: "pending", "running",
|
||||||
|
// "completed", "failed", or "cancelled".
|
||||||
|
Status string `json:"status"`
|
||||||
|
// Message is the status/result message returned by the plugin callback.
|
||||||
|
Message string `json:"message"`
|
||||||
|
// Attempt is the current or last attempt number (1-based).
|
||||||
|
Attempt int32 `json:"attempt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueConfig holds configuration for a task queue.
|
||||||
|
type QueueConfig struct {
|
||||||
|
// Concurrency is the max number of parallel workers. Default: 1.
|
||||||
|
// Capped by the plugin's manifest maxConcurrency.
|
||||||
|
Concurrency int32 `json:"concurrency"`
|
||||||
|
|
||||||
|
// MaxRetries is the number of times to retry a failed task. Default: 0.
|
||||||
|
MaxRetries int32 `json:"maxRetries"`
|
||||||
|
|
||||||
|
// BackoffMs is the initial backoff between retries in milliseconds.
|
||||||
|
// Doubles each retry (exponential: backoffMs * 2^(attempt-1)). Default: 1000.
|
||||||
|
BackoffMs int64 `json:"backoffMs"`
|
||||||
|
|
||||||
|
// DelayMs is the minimum delay between starting consecutive tasks
|
||||||
|
// in milliseconds. Useful for rate limiting. Default: 0.
|
||||||
|
DelayMs int64 `json:"delayMs"`
|
||||||
|
|
||||||
|
// RetentionMs is how long completed/failed/cancelled tasks are kept
|
||||||
|
// in milliseconds. Default: 3600000 (1h). Min: 60000 (1m). Max: 604800000 (1w).
|
||||||
|
RetentionMs int64 `json:"retentionMs"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskService provides persistent task queues for plugins.
|
||||||
|
//
|
||||||
|
// This service allows plugins to create named queues with configurable concurrency,
|
||||||
|
// retry policies, and rate limiting. Tasks are persisted to SQLite and survive
|
||||||
|
// server restarts. When a task is ready to execute, the host calls the plugin's
|
||||||
|
// nd_task_execute callback function.
|
||||||
|
//
|
||||||
|
//nd:hostservice name=Task permission=taskqueue
|
||||||
|
type TaskService interface {
|
||||||
|
// CreateQueue creates a named task queue with the given configuration.
|
||||||
|
// Zero-value fields in config use sensible defaults.
|
||||||
|
// If a queue with the same name already exists, returns an error.
|
||||||
|
// On startup, this also recovers any stale "running" tasks from a previous crash.
|
||||||
|
//nd:hostfunc
|
||||||
|
CreateQueue(ctx context.Context, name string, config QueueConfig) error
|
||||||
|
|
||||||
|
// Enqueue adds a task to the named queue. Returns the task ID.
|
||||||
|
// payload is opaque bytes passed back to the plugin on execution.
|
||||||
|
//nd:hostfunc
|
||||||
|
Enqueue(ctx context.Context, queueName string, payload []byte) (string, error)
|
||||||
|
|
||||||
|
// Get returns the current state of a task including its status,
|
||||||
|
// message, and attempt count.
|
||||||
|
//nd:hostfunc
|
||||||
|
Get(ctx context.Context, taskID string) (*TaskInfo, error)
|
||||||
|
|
||||||
|
// Cancel cancels a pending task. Returns error if already
|
||||||
|
// running, completed, or failed.
|
||||||
|
//nd:hostfunc
|
||||||
|
Cancel(ctx context.Context, taskID string) error
|
||||||
|
}
|
||||||
220
plugins/host/task_gen.go
Normal file
220
plugins/host/task_gen.go
Normal file
@@ -0,0 +1,220 @@
|
|||||||
|
// Code generated by ndpgen. DO NOT EDIT.
|
||||||
|
|
||||||
|
package host
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
extism "github.com/extism/go-sdk"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TaskCreateQueueRequest is the request type for Task.CreateQueue.
|
||||||
|
type TaskCreateQueueRequest struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Config QueueConfig `json:"config"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskCreateQueueResponse is the response type for Task.CreateQueue.
|
||||||
|
type TaskCreateQueueResponse struct {
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskEnqueueRequest is the request type for Task.Enqueue.
|
||||||
|
type TaskEnqueueRequest struct {
|
||||||
|
QueueName string `json:"queueName"`
|
||||||
|
Payload []byte `json:"payload"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskEnqueueResponse is the response type for Task.Enqueue.
|
||||||
|
type TaskEnqueueResponse struct {
|
||||||
|
Result string `json:"result,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskGetRequest is the request type for Task.Get.
|
||||||
|
type TaskGetRequest struct {
|
||||||
|
TaskID string `json:"taskId"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskGetResponse is the response type for Task.Get.
|
||||||
|
type TaskGetResponse struct {
|
||||||
|
Result *TaskInfo `json:"result,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskCancelRequest is the request type for Task.Cancel.
|
||||||
|
type TaskCancelRequest struct {
|
||||||
|
TaskID string `json:"taskId"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskCancelResponse is the response type for Task.Cancel.
|
||||||
|
type TaskCancelResponse struct {
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterTaskHostFunctions registers Task service host functions.
|
||||||
|
// The returned host functions should be added to the plugin's configuration.
|
||||||
|
func RegisterTaskHostFunctions(service TaskService) []extism.HostFunction {
|
||||||
|
return []extism.HostFunction{
|
||||||
|
newTaskCreateQueueHostFunction(service),
|
||||||
|
newTaskEnqueueHostFunction(service),
|
||||||
|
newTaskGetHostFunction(service),
|
||||||
|
newTaskCancelHostFunction(service),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTaskCreateQueueHostFunction(service TaskService) extism.HostFunction {
|
||||||
|
return extism.NewHostFunctionWithStack(
|
||||||
|
"task_createqueue",
|
||||||
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||||
|
// Read JSON request from plugin memory
|
||||||
|
reqBytes, err := p.ReadBytes(stack[0])
|
||||||
|
if err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req TaskCreateQueueRequest
|
||||||
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the service method
|
||||||
|
if svcErr := service.CreateQueue(ctx, req.Name, req.Config); svcErr != nil {
|
||||||
|
taskWriteError(p, stack, svcErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write JSON response to plugin memory
|
||||||
|
resp := TaskCreateQueueResponse{}
|
||||||
|
taskWriteResponse(p, stack, resp)
|
||||||
|
},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTaskEnqueueHostFunction(service TaskService) extism.HostFunction {
|
||||||
|
return extism.NewHostFunctionWithStack(
|
||||||
|
"task_enqueue",
|
||||||
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||||
|
// Read JSON request from plugin memory
|
||||||
|
reqBytes, err := p.ReadBytes(stack[0])
|
||||||
|
if err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req TaskEnqueueRequest
|
||||||
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the service method
|
||||||
|
result, svcErr := service.Enqueue(ctx, req.QueueName, req.Payload)
|
||||||
|
if svcErr != nil {
|
||||||
|
taskWriteError(p, stack, svcErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write JSON response to plugin memory
|
||||||
|
resp := TaskEnqueueResponse{
|
||||||
|
Result: result,
|
||||||
|
}
|
||||||
|
taskWriteResponse(p, stack, resp)
|
||||||
|
},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTaskGetHostFunction(service TaskService) extism.HostFunction {
|
||||||
|
return extism.NewHostFunctionWithStack(
|
||||||
|
"task_get",
|
||||||
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||||
|
// Read JSON request from plugin memory
|
||||||
|
reqBytes, err := p.ReadBytes(stack[0])
|
||||||
|
if err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req TaskGetRequest
|
||||||
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the service method
|
||||||
|
result, svcErr := service.Get(ctx, req.TaskID)
|
||||||
|
if svcErr != nil {
|
||||||
|
taskWriteError(p, stack, svcErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write JSON response to plugin memory
|
||||||
|
resp := TaskGetResponse{
|
||||||
|
Result: result,
|
||||||
|
}
|
||||||
|
taskWriteResponse(p, stack, resp)
|
||||||
|
},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTaskCancelHostFunction(service TaskService) extism.HostFunction {
|
||||||
|
return extism.NewHostFunctionWithStack(
|
||||||
|
"task_cancel",
|
||||||
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||||
|
// Read JSON request from plugin memory
|
||||||
|
reqBytes, err := p.ReadBytes(stack[0])
|
||||||
|
if err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req TaskCancelRequest
|
||||||
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the service method
|
||||||
|
if svcErr := service.Cancel(ctx, req.TaskID); svcErr != nil {
|
||||||
|
taskWriteError(p, stack, svcErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write JSON response to plugin memory
|
||||||
|
resp := TaskCancelResponse{}
|
||||||
|
taskWriteResponse(p, stack, resp)
|
||||||
|
},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
[]extism.ValueType{extism.ValueTypePTR},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// taskWriteResponse writes a JSON response to plugin memory.
|
||||||
|
func taskWriteResponse(p *extism.CurrentPlugin, stack []uint64, resp any) {
|
||||||
|
respBytes, err := json.Marshal(resp)
|
||||||
|
if err != nil {
|
||||||
|
taskWriteError(p, stack, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
respPtr, err := p.WriteBytes(respBytes)
|
||||||
|
if err != nil {
|
||||||
|
stack[0] = 0
|
||||||
|
return
|
||||||
|
}
|
||||||
|
stack[0] = respPtr
|
||||||
|
}
|
||||||
|
|
||||||
|
// taskWriteError writes an error response to plugin memory.
|
||||||
|
func taskWriteError(p *extism.CurrentPlugin, stack []uint64, err error) {
|
||||||
|
errResp := struct {
|
||||||
|
Error string `json:"error"`
|
||||||
|
}{Error: err.Error()}
|
||||||
|
respBytes, _ := json.Marshal(errResp)
|
||||||
|
respPtr, _ := p.WriteBytes(respBytes)
|
||||||
|
stack[0] = respPtr
|
||||||
|
}
|
||||||
@@ -188,12 +188,6 @@ func (s *schedulerServiceImpl) invokeCallback(ctx context.Context, scheduleID st
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if plugin has the scheduler capability
|
|
||||||
if !hasCapability(instance.capabilities, CapabilityScheduler) {
|
|
||||||
log.Warn(ctx, "Plugin does not have scheduler capability", "plugin", s.pluginName, "scheduleID", scheduleID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prepare callback input
|
// Prepare callback input
|
||||||
input := capabilities.SchedulerCallbackRequest{
|
input := capabilities.SchedulerCallbackRequest{
|
||||||
ScheduleID: scheduleID,
|
ScheduleID: scheduleID,
|
||||||
|
|||||||
566
plugins/host_taskqueue.go
Normal file
566
plugins/host_taskqueue.go
Normal file
@@ -0,0 +1,566 @@
|
|||||||
|
package plugins
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
_ "github.com/mattn/go-sqlite3"
|
||||||
|
"github.com/navidrome/navidrome/conf"
|
||||||
|
"github.com/navidrome/navidrome/log"
|
||||||
|
"github.com/navidrome/navidrome/model/id"
|
||||||
|
"github.com/navidrome/navidrome/plugins/capabilities"
|
||||||
|
"github.com/navidrome/navidrome/plugins/host"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultConcurrency int32 = 1
|
||||||
|
defaultBackoffMs int64 = 1000
|
||||||
|
defaultRetentionMs int64 = 3_600_000 // 1 hour
|
||||||
|
minRetentionMs int64 = 60_000 // 1 minute
|
||||||
|
maxRetentionMs int64 = 604_800_000 // 1 week
|
||||||
|
maxQueueNameLength = 128
|
||||||
|
maxPayloadSize = 1 * 1024 * 1024 // 1MB
|
||||||
|
maxBackoffMs int64 = 3_600_000 // 1 hour
|
||||||
|
cleanupInterval = 5 * time.Minute
|
||||||
|
pollInterval = 5 * time.Second
|
||||||
|
shutdownTimeout = 10 * time.Second
|
||||||
|
|
||||||
|
taskStatusPending = "pending"
|
||||||
|
taskStatusRunning = "running"
|
||||||
|
taskStatusCompleted = "completed"
|
||||||
|
taskStatusFailed = "failed"
|
||||||
|
taskStatusCancelled = "cancelled"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CapabilityTaskWorker indicates the plugin can receive task execution callbacks.
|
||||||
|
const CapabilityTaskWorker Capability = "TaskWorker"
|
||||||
|
|
||||||
|
const FuncTaskWorkerCallback = "nd_task_execute"
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
registerCapability(CapabilityTaskWorker, FuncTaskWorkerCallback)
|
||||||
|
}
|
||||||
|
|
||||||
|
type queueState struct {
|
||||||
|
config host.QueueConfig
|
||||||
|
signal chan struct{}
|
||||||
|
limiter *rate.Limiter
|
||||||
|
}
|
||||||
|
|
||||||
|
// notifyWorkers sends a non-blocking signal to wake up queue workers.
|
||||||
|
func (qs *queueState) notifyWorkers() {
|
||||||
|
select {
|
||||||
|
case qs.signal <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// taskQueueServiceImpl implements host.TaskQueueService with SQLite persistence
|
||||||
|
// and background worker goroutines for task execution.
|
||||||
|
type taskQueueServiceImpl struct {
|
||||||
|
pluginName string
|
||||||
|
manager *Manager
|
||||||
|
maxConcurrency int32
|
||||||
|
db *sql.DB
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
wg sync.WaitGroup
|
||||||
|
mu sync.Mutex
|
||||||
|
queues map[string]*queueState
|
||||||
|
|
||||||
|
// For testing: override how callbacks are invoked
|
||||||
|
invokeCallbackFn func(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newTaskQueueService creates a new taskQueueServiceImpl with its own SQLite database.
|
||||||
|
func newTaskQueueService(pluginName string, manager *Manager, maxConcurrency int32) (*taskQueueServiceImpl, error) {
|
||||||
|
dataDir := filepath.Join(conf.Server.DataFolder, "plugins", pluginName)
|
||||||
|
if err := os.MkdirAll(dataDir, 0700); err != nil {
|
||||||
|
return nil, fmt.Errorf("creating plugin data directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dbPath := filepath.Join(dataDir, "taskqueue.db")
|
||||||
|
db, err := sql.Open("sqlite3", dbPath+"?_busy_timeout=5000&_journal_mode=WAL&_foreign_keys=off")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("opening taskqueue database: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
db.SetMaxOpenConns(3)
|
||||||
|
db.SetMaxIdleConns(1)
|
||||||
|
|
||||||
|
if err := createTaskQueueSchema(db); err != nil {
|
||||||
|
db.Close()
|
||||||
|
return nil, fmt.Errorf("creating taskqueue schema: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(manager.ctx)
|
||||||
|
|
||||||
|
s := &taskQueueServiceImpl{
|
||||||
|
pluginName: pluginName,
|
||||||
|
manager: manager,
|
||||||
|
maxConcurrency: maxConcurrency,
|
||||||
|
db: db,
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
queues: make(map[string]*queueState),
|
||||||
|
}
|
||||||
|
s.invokeCallbackFn = s.defaultInvokeCallback
|
||||||
|
|
||||||
|
s.wg.Go(s.cleanupLoop)
|
||||||
|
|
||||||
|
log.Debug("Initialized plugin taskqueue", "plugin", pluginName, "path", dbPath, "maxConcurrency", maxConcurrency)
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTaskQueueSchema(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(`
|
||||||
|
CREATE TABLE IF NOT EXISTS queues (
|
||||||
|
name TEXT PRIMARY KEY,
|
||||||
|
concurrency INTEGER NOT NULL DEFAULT 1,
|
||||||
|
max_retries INTEGER NOT NULL DEFAULT 0,
|
||||||
|
backoff_ms INTEGER NOT NULL DEFAULT 1000,
|
||||||
|
delay_ms INTEGER NOT NULL DEFAULT 0,
|
||||||
|
retention_ms INTEGER NOT NULL DEFAULT 3600000
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS tasks (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
queue_name TEXT NOT NULL REFERENCES queues(name),
|
||||||
|
payload BLOB NOT NULL,
|
||||||
|
status TEXT NOT NULL DEFAULT 'pending',
|
||||||
|
attempt INTEGER NOT NULL DEFAULT 0,
|
||||||
|
max_retries INTEGER NOT NULL,
|
||||||
|
next_run_at INTEGER NOT NULL,
|
||||||
|
created_at INTEGER NOT NULL,
|
||||||
|
updated_at INTEGER NOT NULL,
|
||||||
|
message TEXT NOT NULL DEFAULT ''
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_tasks_dequeue ON tasks(queue_name, status, next_run_at);
|
||||||
|
`)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// applyConfigDefaults fills zero-value config fields with sensible defaults
|
||||||
|
// and clamps values to valid ranges, logging warnings for clamped values.
|
||||||
|
func (s *taskQueueServiceImpl) applyConfigDefaults(ctx context.Context, name string, config *host.QueueConfig) {
|
||||||
|
if config.Concurrency <= 0 {
|
||||||
|
config.Concurrency = defaultConcurrency
|
||||||
|
}
|
||||||
|
if config.BackoffMs <= 0 {
|
||||||
|
config.BackoffMs = defaultBackoffMs
|
||||||
|
}
|
||||||
|
if config.RetentionMs <= 0 {
|
||||||
|
config.RetentionMs = defaultRetentionMs
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.RetentionMs < minRetentionMs {
|
||||||
|
log.Warn(ctx, "TaskQueue retention clamped to minimum", "plugin", s.pluginName, "queue", name,
|
||||||
|
"requested", config.RetentionMs, "min", minRetentionMs)
|
||||||
|
config.RetentionMs = minRetentionMs
|
||||||
|
}
|
||||||
|
if config.RetentionMs > maxRetentionMs {
|
||||||
|
log.Warn(ctx, "TaskQueue retention clamped to maximum", "plugin", s.pluginName, "queue", name,
|
||||||
|
"requested", config.RetentionMs, "max", maxRetentionMs)
|
||||||
|
config.RetentionMs = maxRetentionMs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// clampConcurrency reduces config.Concurrency if it exceeds the remaining budget.
|
||||||
|
// Returns an error when the concurrency budget is fully exhausted.
|
||||||
|
// Must be called with s.mu held.
|
||||||
|
func (s *taskQueueServiceImpl) clampConcurrency(ctx context.Context, name string, config *host.QueueConfig) error {
|
||||||
|
var allocated int32
|
||||||
|
for _, qs := range s.queues {
|
||||||
|
allocated += qs.config.Concurrency
|
||||||
|
}
|
||||||
|
available := s.maxConcurrency - allocated
|
||||||
|
if available <= 0 {
|
||||||
|
log.Warn(ctx, "TaskQueue concurrency budget exhausted", "plugin", s.pluginName, "queue", name,
|
||||||
|
"allocated", allocated, "maxConcurrency", s.maxConcurrency)
|
||||||
|
return fmt.Errorf("concurrency budget exhausted (%d/%d allocated)", allocated, s.maxConcurrency)
|
||||||
|
}
|
||||||
|
if config.Concurrency > available {
|
||||||
|
log.Warn(ctx, "TaskQueue concurrency clamped", "plugin", s.pluginName, "queue", name,
|
||||||
|
"requested", config.Concurrency, "available", available, "maxConcurrency", s.maxConcurrency)
|
||||||
|
config.Concurrency = available
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *taskQueueServiceImpl) CreateQueue(ctx context.Context, name string, config host.QueueConfig) error {
|
||||||
|
if len(name) == 0 {
|
||||||
|
return fmt.Errorf("queue name cannot be empty")
|
||||||
|
}
|
||||||
|
if len(name) > maxQueueNameLength {
|
||||||
|
return fmt.Errorf("queue name exceeds maximum length of %d bytes", maxQueueNameLength)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.applyConfigDefaults(ctx, name, &config)
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
if err := s.clampConcurrency(ctx, name, &config); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, exists := s.queues[name]; exists {
|
||||||
|
return fmt.Errorf("queue %q already exists", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upsert into queues table (idempotent across restarts)
|
||||||
|
_, err := s.db.ExecContext(ctx, `
|
||||||
|
INSERT INTO queues (name, concurrency, max_retries, backoff_ms, delay_ms, retention_ms)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT(name) DO UPDATE SET
|
||||||
|
concurrency = excluded.concurrency,
|
||||||
|
max_retries = excluded.max_retries,
|
||||||
|
backoff_ms = excluded.backoff_ms,
|
||||||
|
delay_ms = excluded.delay_ms,
|
||||||
|
retention_ms = excluded.retention_ms
|
||||||
|
`, name, config.Concurrency, config.MaxRetries, config.BackoffMs, config.DelayMs, config.RetentionMs)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating queue: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset stale running tasks from previous crash
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
_, err = s.db.ExecContext(ctx, `
|
||||||
|
UPDATE tasks SET status = ?, updated_at = ? WHERE queue_name = ? AND status = ?
|
||||||
|
`, taskStatusPending, now, name, taskStatusRunning)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("resetting stale tasks: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
qs := &queueState{
|
||||||
|
config: config,
|
||||||
|
signal: make(chan struct{}, 1),
|
||||||
|
}
|
||||||
|
if config.DelayMs > 0 {
|
||||||
|
// Rate limit dispatches to enforce delay between tasks.
|
||||||
|
// Burst of 1 allows one immediate dispatch, then enforces the delay interval.
|
||||||
|
qs.limiter = rate.NewLimiter(rate.Every(time.Duration(config.DelayMs)*time.Millisecond), 1)
|
||||||
|
}
|
||||||
|
s.queues[name] = qs
|
||||||
|
|
||||||
|
for i := int32(0); i < config.Concurrency; i++ {
|
||||||
|
s.wg.Go(func() { s.worker(name, qs) })
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug(ctx, "Created task queue", "plugin", s.pluginName, "queue", name,
|
||||||
|
"concurrency", config.Concurrency, "maxRetries", config.MaxRetries,
|
||||||
|
"backoffMs", config.BackoffMs, "delayMs", config.DelayMs, "retentionMs", config.RetentionMs)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *taskQueueServiceImpl) Enqueue(ctx context.Context, queueName string, payload []byte) (string, error) {
|
||||||
|
s.mu.Lock()
|
||||||
|
qs, exists := s.queues[queueName]
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return "", fmt.Errorf("queue %q does not exist", queueName)
|
||||||
|
}
|
||||||
|
if len(payload) > maxPayloadSize {
|
||||||
|
return "", fmt.Errorf("payload size %d exceeds maximum of %d bytes", len(payload), maxPayloadSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
taskID := id.NewRandom()
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
|
||||||
|
_, err := s.db.ExecContext(ctx, `
|
||||||
|
INSERT INTO tasks (id, queue_name, payload, status, attempt, max_retries, next_run_at, created_at, updated_at)
|
||||||
|
VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?)
|
||||||
|
`, taskID, queueName, payload, taskStatusPending, qs.config.MaxRetries, now, now, now)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("enqueuing task: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
qs.notifyWorkers()
|
||||||
|
log.Trace(ctx, "Enqueued task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
|
||||||
|
return taskID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns the current state of a task.
|
||||||
|
func (s *taskQueueServiceImpl) Get(ctx context.Context, taskID string) (*host.TaskInfo, error) {
|
||||||
|
var info host.TaskInfo
|
||||||
|
err := s.db.QueryRowContext(ctx, `SELECT status, message, attempt FROM tasks WHERE id = ?`, taskID).
|
||||||
|
Scan(&info.Status, &info.Message, &info.Attempt)
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return nil, fmt.Errorf("task %q not found", taskID)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("getting task info: %w", err)
|
||||||
|
}
|
||||||
|
return &info, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel cancels a pending task.
|
||||||
|
func (s *taskQueueServiceImpl) Cancel(ctx context.Context, taskID string) error {
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
result, err := s.db.ExecContext(ctx, `
|
||||||
|
UPDATE tasks SET status = ?, updated_at = ? WHERE id = ? AND status = ?
|
||||||
|
`, taskStatusCancelled, now, taskID, taskStatusPending)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cancelling task: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rowsAffected, err := result.RowsAffected()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("checking cancel result: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if rowsAffected == 0 {
|
||||||
|
// Check if task exists at all
|
||||||
|
var status string
|
||||||
|
err := s.db.QueryRowContext(ctx, `SELECT status FROM tasks WHERE id = ?`, taskID).Scan(&status)
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return fmt.Errorf("task %q not found", taskID)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("checking task existence: %w", err)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("task %q cannot be cancelled (status: %s)", taskID, status)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Trace(ctx, "Cancelled task", "plugin", s.pluginName, "taskID", taskID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// worker is the main loop for a single worker goroutine.
|
||||||
|
func (s *taskQueueServiceImpl) worker(queueName string, qs *queueState) {
|
||||||
|
// Process any existing pending tasks immediately on startup
|
||||||
|
s.drainQueue(queueName, qs)
|
||||||
|
|
||||||
|
ticker := time.NewTicker(pollInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
case <-qs.signal:
|
||||||
|
s.drainQueue(queueName, qs)
|
||||||
|
case <-ticker.C:
|
||||||
|
s.drainQueue(queueName, qs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *taskQueueServiceImpl) drainQueue(queueName string, qs *queueState) {
|
||||||
|
for s.ctx.Err() == nil && s.processTask(queueName, qs) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// processTask dequeues and processes a single task. Returns true if a task was processed.
|
||||||
|
func (s *taskQueueServiceImpl) processTask(queueName string, qs *queueState) bool {
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
|
||||||
|
// Atomically dequeue a task
|
||||||
|
var taskID string
|
||||||
|
var payload []byte
|
||||||
|
var attempt, maxRetries int32
|
||||||
|
err := s.db.QueryRowContext(s.ctx, `
|
||||||
|
UPDATE tasks SET status = ?, attempt = attempt + 1, updated_at = ?
|
||||||
|
WHERE id = (
|
||||||
|
SELECT id FROM tasks
|
||||||
|
WHERE queue_name = ? AND status = ? AND next_run_at <= ?
|
||||||
|
ORDER BY next_run_at, created_at LIMIT 1
|
||||||
|
)
|
||||||
|
RETURNING id, payload, attempt, max_retries
|
||||||
|
`, taskStatusRunning, now, queueName, taskStatusPending, now).Scan(&taskID, &payload, &attempt, &maxRetries)
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.Error(s.ctx, "Failed to dequeue task", "plugin", s.pluginName, "queue", queueName, err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enforce delay between task dispatches using a rate limiter.
|
||||||
|
// This is done after dequeue so that empty polls don't consume rate tokens.
|
||||||
|
if qs.limiter != nil {
|
||||||
|
if err := qs.limiter.Wait(s.ctx); err != nil {
|
||||||
|
// Context cancelled during wait — revert task to pending for recovery
|
||||||
|
s.revertTaskToPending(taskID)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Invoke callback
|
||||||
|
log.Debug(s.ctx, "Executing task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID, "attempt", attempt)
|
||||||
|
message, callbackErr := s.invokeCallbackFn(s.ctx, queueName, taskID, payload, attempt)
|
||||||
|
|
||||||
|
// If context was cancelled (shutdown), revert task to pending for recovery
|
||||||
|
if s.ctx.Err() != nil {
|
||||||
|
s.revertTaskToPending(taskID)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if callbackErr == nil {
|
||||||
|
s.completeTask(queueName, taskID, message)
|
||||||
|
} else {
|
||||||
|
s.handleTaskFailure(queueName, taskID, attempt, maxRetries, qs, callbackErr, message)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *taskQueueServiceImpl) completeTask(queueName, taskID, message string) {
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusCompleted, message, now, taskID); err != nil {
|
||||||
|
log.Error(s.ctx, "Failed to mark task as completed", "plugin", s.pluginName, "taskID", taskID, err)
|
||||||
|
}
|
||||||
|
log.Debug(s.ctx, "Task completed", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *taskQueueServiceImpl) handleTaskFailure(queueName, taskID string, attempt, maxRetries int32, qs *queueState, callbackErr error, message string) {
|
||||||
|
log.Warn(s.ctx, "Task execution failed", "plugin", s.pluginName, "queue", queueName,
|
||||||
|
"taskID", taskID, "attempt", attempt, "maxRetries", maxRetries, "err", callbackErr)
|
||||||
|
|
||||||
|
// Use error message as fallback if no message was provided
|
||||||
|
if message == "" {
|
||||||
|
message = callbackErr.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
if attempt > maxRetries {
|
||||||
|
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusFailed, message, now, taskID); err != nil {
|
||||||
|
log.Error(s.ctx, "Failed to mark task as failed", "plugin", s.pluginName, "taskID", taskID, err)
|
||||||
|
}
|
||||||
|
log.Warn(s.ctx, "Task failed after all retries", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exponential backoff: backoffMs * 2^(attempt-1)
|
||||||
|
backoff := qs.config.BackoffMs << (attempt - 1)
|
||||||
|
if backoff <= 0 || backoff > maxBackoffMs {
|
||||||
|
backoff = maxBackoffMs
|
||||||
|
}
|
||||||
|
nextRunAt := now + backoff
|
||||||
|
if _, err := s.db.ExecContext(s.ctx, `
|
||||||
|
UPDATE tasks SET status = ?, next_run_at = ?, updated_at = ? WHERE id = ?
|
||||||
|
`, taskStatusPending, nextRunAt, now, taskID); err != nil {
|
||||||
|
log.Error(s.ctx, "Failed to reschedule task for retry", "plugin", s.pluginName, "taskID", taskID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wake worker after backoff expires
|
||||||
|
time.AfterFunc(time.Duration(backoff)*time.Millisecond, func() {
|
||||||
|
qs.notifyWorkers()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// revertTaskToPending puts a running task back to pending status and decrements the attempt
|
||||||
|
// counter (used during shutdown to ensure the interrupted attempt doesn't count).
|
||||||
|
func (s *taskQueueServiceImpl) revertTaskToPending(taskID string) {
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
_, err := s.db.Exec(`UPDATE tasks SET status = ?, attempt = MAX(attempt - 1, 0), updated_at = ? WHERE id = ? AND status = ?`, taskStatusPending, now, taskID, taskStatusRunning)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Failed to revert task to pending", "plugin", s.pluginName, "taskID", taskID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// defaultInvokeCallback calls the plugin's nd_task_execute function.
|
||||||
|
func (s *taskQueueServiceImpl) defaultInvokeCallback(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error) {
|
||||||
|
s.manager.mu.RLock()
|
||||||
|
p, ok := s.manager.plugins[s.pluginName]
|
||||||
|
s.manager.mu.RUnlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return "", fmt.Errorf("plugin %s not loaded", s.pluginName)
|
||||||
|
}
|
||||||
|
|
||||||
|
input := capabilities.TaskExecuteRequest{
|
||||||
|
QueueName: queueName,
|
||||||
|
TaskID: taskID,
|
||||||
|
Payload: payload,
|
||||||
|
Attempt: attempt,
|
||||||
|
}
|
||||||
|
|
||||||
|
message, err := callPluginFunction[capabilities.TaskExecuteRequest, string](ctx, p, FuncTaskWorkerCallback, input)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return message, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanupLoop periodically removes terminal tasks past their retention period.
|
||||||
|
func (s *taskQueueServiceImpl) cleanupLoop() {
|
||||||
|
ticker := time.NewTicker(cleanupInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
s.runCleanup()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// runCleanup deletes terminal tasks past their retention period.
|
||||||
|
func (s *taskQueueServiceImpl) runCleanup() {
|
||||||
|
s.mu.Lock()
|
||||||
|
queues := make(map[string]*queueState, len(s.queues))
|
||||||
|
for k, v := range s.queues {
|
||||||
|
queues[k] = v
|
||||||
|
}
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
for name, qs := range queues {
|
||||||
|
result, err := s.db.ExecContext(s.ctx, `
|
||||||
|
DELETE FROM tasks WHERE queue_name = ? AND status IN (?, ?, ?) AND updated_at + ? < ?
|
||||||
|
`, name, taskStatusCompleted, taskStatusFailed, taskStatusCancelled, qs.config.RetentionMs, now)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(s.ctx, "Failed to cleanup tasks", "plugin", s.pluginName, "queue", name, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if deleted, _ := result.RowsAffected(); deleted > 0 {
|
||||||
|
log.Debug(s.ctx, "Cleaned up terminal tasks", "plugin", s.pluginName, "queue", name, "deleted", deleted)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close shuts down the task queue service, stopping all workers and closing the database.
|
||||||
|
func (s *taskQueueServiceImpl) Close() error {
|
||||||
|
// Cancel context to signal all goroutines
|
||||||
|
s.cancel()
|
||||||
|
|
||||||
|
// Wait for goroutines with timeout
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
s.wg.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(shutdownTimeout):
|
||||||
|
log.Warn("TaskQueue shutdown timed out", "plugin", s.pluginName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark running tasks as pending for recovery on next startup
|
||||||
|
if s.db != nil {
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
if _, err := s.db.Exec(`UPDATE tasks SET status = ?, updated_at = ? WHERE status = ?`, taskStatusPending, now, taskStatusRunning); err != nil {
|
||||||
|
log.Error("Failed to reset running tasks on shutdown", "plugin", s.pluginName, err)
|
||||||
|
}
|
||||||
|
log.Debug("Closing plugin taskqueue", "plugin", s.pluginName)
|
||||||
|
return s.db.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compile-time verification
|
||||||
|
var _ host.TaskService = (*taskQueueServiceImpl)(nil)
|
||||||
|
var _ io.Closer = (*taskQueueServiceImpl)(nil)
|
||||||
1074
plugins/host_taskqueue_test.go
Normal file
1074
plugins/host_taskqueue_test.go
Normal file
File diff suppressed because it is too large
Load Diff
@@ -128,6 +128,23 @@ var hostServices = []hostServiceEntry{
|
|||||||
return host.RegisterHTTPHostFunctions(service), nil
|
return host.RegisterHTTPHostFunctions(service), nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "Task",
|
||||||
|
hasPermission: func(p *Permissions) bool { return p != nil && p.Taskqueue != nil },
|
||||||
|
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
||||||
|
perm := ctx.permissions.Taskqueue
|
||||||
|
maxConcurrency := int32(1)
|
||||||
|
if perm.MaxConcurrency > 0 {
|
||||||
|
maxConcurrency = int32(perm.MaxConcurrency)
|
||||||
|
}
|
||||||
|
service, err := newTaskQueueService(ctx.pluginName, ctx.manager, maxConcurrency)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Failed to create Task service", "plugin", ctx.pluginName, err)
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return host.RegisterTaskHostFunctions(service), service
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// extractManifest reads manifest from an .ndp package and computes its SHA-256 hash.
|
// extractManifest reads manifest from an .ndp package and computes its SHA-256 hash.
|
||||||
|
|||||||
@@ -110,6 +110,9 @@
|
|||||||
},
|
},
|
||||||
"users": {
|
"users": {
|
||||||
"$ref": "#/$defs/UsersPermission"
|
"$ref": "#/$defs/UsersPermission"
|
||||||
|
},
|
||||||
|
"taskqueue": {
|
||||||
|
"$ref": "#/$defs/TaskQueuePermission"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@@ -224,6 +227,23 @@
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"TaskQueuePermission": {
|
||||||
|
"type": "object",
|
||||||
|
"description": "Task queue permissions for background task processing",
|
||||||
|
"additionalProperties": false,
|
||||||
|
"properties": {
|
||||||
|
"reason": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Explanation for why task queue access is needed"
|
||||||
|
},
|
||||||
|
"maxConcurrency": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Maximum total concurrent workers across all queues. Default: 1",
|
||||||
|
"minimum": 1,
|
||||||
|
"default": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
"UsersPermission": {
|
"UsersPermission": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"description": "Users service permissions for accessing user information",
|
"description": "Users service permissions for accessing user information",
|
||||||
|
|||||||
@@ -64,6 +64,21 @@ func ValidateWithCapabilities(m *Manifest, capabilities []Capability) error {
|
|||||||
return fmt.Errorf("scrobbler capability requires 'users' permission to be declared in manifest")
|
return fmt.Errorf("scrobbler capability requires 'users' permission to be declared in manifest")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Scheduler permission requires SchedulerCallback capability
|
||||||
|
if m.Permissions != nil && m.Permissions.Scheduler != nil {
|
||||||
|
if !hasCapability(capabilities, CapabilityScheduler) {
|
||||||
|
return fmt.Errorf("'scheduler' permission requires plugin to export '%s' function", FuncSchedulerCallback)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Task (taskqueue) permission requires TaskWorker capability
|
||||||
|
if m.Permissions != nil && m.Permissions.Taskqueue != nil {
|
||||||
|
if !hasCapability(capabilities, CapabilityTaskWorker) {
|
||||||
|
return fmt.Errorf("'taskqueue' permission requires plugin to export '%s' function", FuncTaskWorkerCallback)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -181,6 +181,9 @@ type Permissions struct {
|
|||||||
// Subsonicapi corresponds to the JSON schema field "subsonicapi".
|
// Subsonicapi corresponds to the JSON schema field "subsonicapi".
|
||||||
Subsonicapi *SubsonicAPIPermission `json:"subsonicapi,omitempty" yaml:"subsonicapi,omitempty" mapstructure:"subsonicapi,omitempty"`
|
Subsonicapi *SubsonicAPIPermission `json:"subsonicapi,omitempty" yaml:"subsonicapi,omitempty" mapstructure:"subsonicapi,omitempty"`
|
||||||
|
|
||||||
|
// Taskqueue corresponds to the JSON schema field "taskqueue".
|
||||||
|
Taskqueue *TaskQueuePermission `json:"taskqueue,omitempty" yaml:"taskqueue,omitempty" mapstructure:"taskqueue,omitempty"`
|
||||||
|
|
||||||
// Users corresponds to the JSON schema field "users".
|
// Users corresponds to the JSON schema field "users".
|
||||||
Users *UsersPermission `json:"users,omitempty" yaml:"users,omitempty" mapstructure:"users,omitempty"`
|
Users *UsersPermission `json:"users,omitempty" yaml:"users,omitempty" mapstructure:"users,omitempty"`
|
||||||
|
|
||||||
@@ -200,6 +203,36 @@ type SubsonicAPIPermission struct {
|
|||||||
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
|
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Task queue permissions for background task processing
|
||||||
|
type TaskQueuePermission struct {
|
||||||
|
// Maximum total concurrent workers across all queues. Default: 1
|
||||||
|
MaxConcurrency int `json:"maxConcurrency,omitempty" yaml:"maxConcurrency,omitempty" mapstructure:"maxConcurrency,omitempty"`
|
||||||
|
|
||||||
|
// Explanation for why task queue access is needed
|
||||||
|
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON implements json.Unmarshaler.
|
||||||
|
func (j *TaskQueuePermission) UnmarshalJSON(value []byte) error {
|
||||||
|
var raw map[string]interface{}
|
||||||
|
if err := json.Unmarshal(value, &raw); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
type Plain TaskQueuePermission
|
||||||
|
var plain Plain
|
||||||
|
if err := json.Unmarshal(value, &plain); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if v, ok := raw["maxConcurrency"]; !ok || v == nil {
|
||||||
|
plain.MaxConcurrency = 1.0
|
||||||
|
}
|
||||||
|
if 1 > plain.MaxConcurrency {
|
||||||
|
return fmt.Errorf("field %s: must be >= %v", "maxConcurrency", 1)
|
||||||
|
}
|
||||||
|
*j = TaskQueuePermission(plain)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Enable experimental WebAssembly threads support
|
// Enable experimental WebAssembly threads support
|
||||||
type ThreadsFeature struct {
|
type ThreadsFeature struct {
|
||||||
// Explanation for why threads support is needed
|
// Explanation for why threads support is needed
|
||||||
|
|||||||
@@ -6,3 +6,10 @@ require (
|
|||||||
github.com/extism/go-pdk v1.1.3
|
github.com/extism/go-pdk v1.1.3
|
||||||
github.com/stretchr/testify v1.11.1
|
github.com/stretchr/testify v1.11.1
|
||||||
)
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
|
github.com/stretchr/objx v0.5.2 // indirect
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
|
)
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ The following host services are available:
|
|||||||
- Library: provides access to music library metadata for plugins.
|
- Library: provides access to music library metadata for plugins.
|
||||||
- Scheduler: provides task scheduling capabilities for plugins.
|
- Scheduler: provides task scheduling capabilities for plugins.
|
||||||
- SubsonicAPI: provides access to Navidrome's Subsonic API from plugins.
|
- SubsonicAPI: provides access to Navidrome's Subsonic API from plugins.
|
||||||
|
- Task: provides persistent task queues for plugins.
|
||||||
- Users: provides access to user information for plugins.
|
- Users: provides access to user information for plugins.
|
||||||
- WebSocket: provides WebSocket communication capabilities for plugins.
|
- WebSocket: provides WebSocket communication capabilities for plugins.
|
||||||
|
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// HTTPRequest represents the HTTPRequest data structure.
|
||||||
// HTTPRequest represents an outbound HTTP request from a plugin.
|
// HTTPRequest represents an outbound HTTP request from a plugin.
|
||||||
type HTTPRequest struct {
|
type HTTPRequest struct {
|
||||||
Method string `json:"method"`
|
Method string `json:"method"`
|
||||||
@@ -23,6 +24,7 @@ type HTTPRequest struct {
|
|||||||
TimeoutMs int32 `json:"timeoutMs"`
|
TimeoutMs int32 `json:"timeoutMs"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HTTPResponse represents the HTTPResponse data structure.
|
||||||
// HTTPResponse represents the response from an outbound HTTP request.
|
// HTTPResponse represents the response from an outbound HTTP request.
|
||||||
type HTTPResponse struct {
|
type HTTPResponse struct {
|
||||||
StatusCode int32 `json:"statusCode"`
|
StatusCode int32 `json:"statusCode"`
|
||||||
@@ -35,11 +37,11 @@ type HTTPResponse struct {
|
|||||||
//go:wasmimport extism:host/user http_send
|
//go:wasmimport extism:host/user http_send
|
||||||
func http_send(uint64) uint64
|
func http_send(uint64) uint64
|
||||||
|
|
||||||
type httpSendRequest struct {
|
type hTTPSendRequest struct {
|
||||||
Request HTTPRequest `json:"request"`
|
Request HTTPRequest `json:"request"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type httpSendResponse struct {
|
type hTTPSendResponse struct {
|
||||||
Result *HTTPResponse `json:"result,omitempty"`
|
Result *HTTPResponse `json:"result,omitempty"`
|
||||||
Error string `json:"error,omitempty"`
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
@@ -55,7 +57,7 @@ type httpSendResponse struct {
|
|||||||
// Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error.
|
// Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error.
|
||||||
func HTTPSend(request HTTPRequest) (*HTTPResponse, error) {
|
func HTTPSend(request HTTPRequest) (*HTTPResponse, error) {
|
||||||
// Marshal request to JSON
|
// Marshal request to JSON
|
||||||
req := httpSendRequest{
|
req := hTTPSendRequest{
|
||||||
Request: request,
|
Request: request,
|
||||||
}
|
}
|
||||||
reqBytes, err := json.Marshal(req)
|
reqBytes, err := json.Marshal(req)
|
||||||
@@ -73,7 +75,7 @@ func HTTPSend(request HTTPRequest) (*HTTPResponse, error) {
|
|||||||
responseBytes := responseMem.ReadBytes()
|
responseBytes := responseMem.ReadBytes()
|
||||||
|
|
||||||
// Parse the response
|
// Parse the response
|
||||||
var response httpSendResponse
|
var response hTTPSendResponse
|
||||||
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -10,6 +10,7 @@ package host
|
|||||||
|
|
||||||
import "github.com/stretchr/testify/mock"
|
import "github.com/stretchr/testify/mock"
|
||||||
|
|
||||||
|
// HTTPRequest represents the HTTPRequest data structure.
|
||||||
// HTTPRequest represents an outbound HTTP request from a plugin.
|
// HTTPRequest represents an outbound HTTP request from a plugin.
|
||||||
type HTTPRequest struct {
|
type HTTPRequest struct {
|
||||||
Method string `json:"method"`
|
Method string `json:"method"`
|
||||||
@@ -19,6 +20,7 @@ type HTTPRequest struct {
|
|||||||
TimeoutMs int32 `json:"timeoutMs"`
|
TimeoutMs int32 `json:"timeoutMs"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HTTPResponse represents the HTTPResponse data structure.
|
||||||
// HTTPResponse represents the response from an outbound HTTP request.
|
// HTTPResponse represents the response from an outbound HTTP request.
|
||||||
type HTTPResponse struct {
|
type HTTPResponse struct {
|
||||||
StatusCode int32 `json:"statusCode"`
|
StatusCode int32 `json:"statusCode"`
|
||||||
@@ -8,7 +8,6 @@
|
|||||||
package host
|
package host
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
@@ -38,6 +37,12 @@ type subsonicAPICallRawRequest struct {
|
|||||||
Uri string `json:"uri"`
|
Uri string `json:"uri"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type subsonicAPICallRawResponse struct {
|
||||||
|
ContentType string `json:"contentType,omitempty"`
|
||||||
|
Data []byte `json:"data,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
// SubsonicAPICall calls the subsonicapi_call host function.
|
// SubsonicAPICall calls the subsonicapi_call host function.
|
||||||
// Call executes a Subsonic API request and returns the JSON response.
|
// Call executes a Subsonic API request and returns the JSON response.
|
||||||
//
|
//
|
||||||
@@ -78,8 +83,8 @@ func SubsonicAPICall(uri string) (string, error) {
|
|||||||
|
|
||||||
// SubsonicAPICallRaw calls the subsonicapi_callraw host function.
|
// SubsonicAPICallRaw calls the subsonicapi_callraw host function.
|
||||||
// CallRaw executes a Subsonic API request and returns the raw binary response.
|
// CallRaw executes a Subsonic API request and returns the raw binary response.
|
||||||
// Optimized for binary endpoints like getCoverArt and stream that return
|
// Designed for binary endpoints like getCoverArt and stream that return
|
||||||
// non-JSON data. The response is returned as raw bytes without JSON encoding overhead.
|
// non-JSON data. The data is base64-encoded over JSON on the wire.
|
||||||
func SubsonicAPICallRaw(uri string) (string, []byte, error) {
|
func SubsonicAPICallRaw(uri string) (string, []byte, error) {
|
||||||
// Marshal request to JSON
|
// Marshal request to JSON
|
||||||
req := subsonicAPICallRawRequest{
|
req := subsonicAPICallRawRequest{
|
||||||
@@ -99,22 +104,16 @@ func SubsonicAPICallRaw(uri string) (string, []byte, error) {
|
|||||||
responseMem := pdk.FindMemory(responsePtr)
|
responseMem := pdk.FindMemory(responsePtr)
|
||||||
responseBytes := responseMem.ReadBytes()
|
responseBytes := responseMem.ReadBytes()
|
||||||
|
|
||||||
// Parse binary-framed response
|
// Parse the response
|
||||||
if len(responseBytes) == 0 {
|
var response subsonicAPICallRawResponse
|
||||||
return "", nil, errors.New("empty response from host")
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||||
|
return "", nil, err
|
||||||
}
|
}
|
||||||
if responseBytes[0] == 0x01 { // error
|
|
||||||
return "", nil, errors.New(string(responseBytes[1:]))
|
// Convert Error field to Go error
|
||||||
|
if response.Error != "" {
|
||||||
|
return "", nil, errors.New(response.Error)
|
||||||
}
|
}
|
||||||
if responseBytes[0] != 0x00 {
|
|
||||||
return "", nil, errors.New("unknown response status")
|
return response.ContentType, response.Data, nil
|
||||||
}
|
|
||||||
if len(responseBytes) < 5 {
|
|
||||||
return "", nil, errors.New("malformed raw response: incomplete header")
|
|
||||||
}
|
|
||||||
ctLen := binary.BigEndian.Uint32(responseBytes[1:5])
|
|
||||||
if uint32(len(responseBytes)) < 5+ctLen {
|
|
||||||
return "", nil, errors.New("malformed raw response: content-type overflow")
|
|
||||||
}
|
|
||||||
return string(responseBytes[5 : 5+ctLen]), responseBytes[5+ctLen:], nil
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -42,8 +42,8 @@ func (m *mockSubsonicAPIService) CallRaw(uri string) (string, []byte, error) {
|
|||||||
|
|
||||||
// SubsonicAPICallRaw delegates to the mock instance.
|
// SubsonicAPICallRaw delegates to the mock instance.
|
||||||
// CallRaw executes a Subsonic API request and returns the raw binary response.
|
// CallRaw executes a Subsonic API request and returns the raw binary response.
|
||||||
// Optimized for binary endpoints like getCoverArt and stream that return
|
// Designed for binary endpoints like getCoverArt and stream that return
|
||||||
// non-JSON data. The response is returned as raw bytes without JSON encoding overhead.
|
// non-JSON data. The data is base64-encoded over JSON on the wire.
|
||||||
func SubsonicAPICallRaw(uri string) (string, []byte, error) {
|
func SubsonicAPICallRaw(uri string) (string, []byte, error) {
|
||||||
return SubsonicAPIMock.CallRaw(uri)
|
return SubsonicAPIMock.CallRaw(uri)
|
||||||
}
|
}
|
||||||
|
|||||||
227
plugins/pdk/go/host/nd_host_task.go
Normal file
227
plugins/pdk/go/host/nd_host_task.go
Normal file
@@ -0,0 +1,227 @@
|
|||||||
|
// Code generated by ndpgen. DO NOT EDIT.
|
||||||
|
//
|
||||||
|
// This file contains client wrappers for the Task host service.
|
||||||
|
// It is intended for use in Navidrome plugins built with TinyGo.
|
||||||
|
//
|
||||||
|
//go:build wasip1
|
||||||
|
|
||||||
|
package host
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
||||||
|
)
|
||||||
|
|
||||||
|
// QueueConfig represents the QueueConfig data structure.
|
||||||
|
// QueueConfig holds configuration for a task queue.
|
||||||
|
type QueueConfig struct {
|
||||||
|
Concurrency int32 `json:"concurrency"`
|
||||||
|
MaxRetries int32 `json:"maxRetries"`
|
||||||
|
BackoffMs int64 `json:"backoffMs"`
|
||||||
|
DelayMs int64 `json:"delayMs"`
|
||||||
|
RetentionMs int64 `json:"retentionMs"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskInfo represents the TaskInfo data structure.
|
||||||
|
// TaskInfo holds the current state of a task.
|
||||||
|
type TaskInfo struct {
|
||||||
|
Status string `json:"status"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
Attempt int32 `json:"attempt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// task_createqueue is the host function provided by Navidrome.
|
||||||
|
//
|
||||||
|
//go:wasmimport extism:host/user task_createqueue
|
||||||
|
func task_createqueue(uint64) uint64
|
||||||
|
|
||||||
|
// task_enqueue is the host function provided by Navidrome.
|
||||||
|
//
|
||||||
|
//go:wasmimport extism:host/user task_enqueue
|
||||||
|
func task_enqueue(uint64) uint64
|
||||||
|
|
||||||
|
// task_get is the host function provided by Navidrome.
|
||||||
|
//
|
||||||
|
//go:wasmimport extism:host/user task_get
|
||||||
|
func task_get(uint64) uint64
|
||||||
|
|
||||||
|
// task_cancel is the host function provided by Navidrome.
|
||||||
|
//
|
||||||
|
//go:wasmimport extism:host/user task_cancel
|
||||||
|
func task_cancel(uint64) uint64
|
||||||
|
|
||||||
|
type taskCreateQueueRequest struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Config QueueConfig `json:"config"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type taskEnqueueRequest struct {
|
||||||
|
QueueName string `json:"queueName"`
|
||||||
|
Payload []byte `json:"payload"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type taskEnqueueResponse struct {
|
||||||
|
Result string `json:"result,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type taskGetRequest struct {
|
||||||
|
TaskID string `json:"taskId"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type taskGetResponse struct {
|
||||||
|
Result *TaskInfo `json:"result,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type taskCancelRequest struct {
|
||||||
|
TaskID string `json:"taskId"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskCreateQueue calls the task_createqueue host function.
|
||||||
|
// CreateQueue creates a named task queue with the given configuration.
|
||||||
|
// Zero-value fields in config use sensible defaults.
|
||||||
|
// If a queue with the same name already exists, returns an error.
|
||||||
|
// On startup, this also recovers any stale "running" tasks from a previous crash.
|
||||||
|
func TaskCreateQueue(name string, config QueueConfig) error {
|
||||||
|
// Marshal request to JSON
|
||||||
|
req := taskCreateQueueRequest{
|
||||||
|
Name: name,
|
||||||
|
Config: config,
|
||||||
|
}
|
||||||
|
reqBytes, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
reqMem := pdk.AllocateBytes(reqBytes)
|
||||||
|
defer reqMem.Free()
|
||||||
|
|
||||||
|
// Call the host function
|
||||||
|
responsePtr := task_createqueue(reqMem.Offset())
|
||||||
|
|
||||||
|
// Read the response from memory
|
||||||
|
responseMem := pdk.FindMemory(responsePtr)
|
||||||
|
responseBytes := responseMem.ReadBytes()
|
||||||
|
|
||||||
|
// Parse error-only response
|
||||||
|
var response struct {
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if response.Error != "" {
|
||||||
|
return errors.New(response.Error)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskEnqueue calls the task_enqueue host function.
|
||||||
|
// Enqueue adds a task to the named queue. Returns the task ID.
|
||||||
|
// payload is opaque bytes passed back to the plugin on execution.
|
||||||
|
func TaskEnqueue(queueName string, payload []byte) (string, error) {
|
||||||
|
// Marshal request to JSON
|
||||||
|
req := taskEnqueueRequest{
|
||||||
|
QueueName: queueName,
|
||||||
|
Payload: payload,
|
||||||
|
}
|
||||||
|
reqBytes, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
reqMem := pdk.AllocateBytes(reqBytes)
|
||||||
|
defer reqMem.Free()
|
||||||
|
|
||||||
|
// Call the host function
|
||||||
|
responsePtr := task_enqueue(reqMem.Offset())
|
||||||
|
|
||||||
|
// Read the response from memory
|
||||||
|
responseMem := pdk.FindMemory(responsePtr)
|
||||||
|
responseBytes := responseMem.ReadBytes()
|
||||||
|
|
||||||
|
// Parse the response
|
||||||
|
var response taskEnqueueResponse
|
||||||
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert Error field to Go error
|
||||||
|
if response.Error != "" {
|
||||||
|
return "", errors.New(response.Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
return response.Result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskGet calls the task_get host function.
|
||||||
|
// Get returns the current state of a task including its status,
|
||||||
|
// message, and attempt count.
|
||||||
|
func TaskGet(taskID string) (*TaskInfo, error) {
|
||||||
|
// Marshal request to JSON
|
||||||
|
req := taskGetRequest{
|
||||||
|
TaskID: taskID,
|
||||||
|
}
|
||||||
|
reqBytes, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
reqMem := pdk.AllocateBytes(reqBytes)
|
||||||
|
defer reqMem.Free()
|
||||||
|
|
||||||
|
// Call the host function
|
||||||
|
responsePtr := task_get(reqMem.Offset())
|
||||||
|
|
||||||
|
// Read the response from memory
|
||||||
|
responseMem := pdk.FindMemory(responsePtr)
|
||||||
|
responseBytes := responseMem.ReadBytes()
|
||||||
|
|
||||||
|
// Parse the response
|
||||||
|
var response taskGetResponse
|
||||||
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert Error field to Go error
|
||||||
|
if response.Error != "" {
|
||||||
|
return nil, errors.New(response.Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
return response.Result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskCancel calls the task_cancel host function.
|
||||||
|
// Cancel cancels a pending task. Returns error if already
|
||||||
|
// running, completed, or failed.
|
||||||
|
func TaskCancel(taskID string) error {
|
||||||
|
// Marshal request to JSON
|
||||||
|
req := taskCancelRequest{
|
||||||
|
TaskID: taskID,
|
||||||
|
}
|
||||||
|
reqBytes, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
reqMem := pdk.AllocateBytes(reqBytes)
|
||||||
|
defer reqMem.Free()
|
||||||
|
|
||||||
|
// Call the host function
|
||||||
|
responsePtr := task_cancel(reqMem.Offset())
|
||||||
|
|
||||||
|
// Read the response from memory
|
||||||
|
responseMem := pdk.FindMemory(responsePtr)
|
||||||
|
responseBytes := responseMem.ReadBytes()
|
||||||
|
|
||||||
|
// Parse error-only response
|
||||||
|
var response struct {
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if response.Error != "" {
|
||||||
|
return errors.New(response.Error)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
92
plugins/pdk/go/host/nd_host_task_stub.go
Normal file
92
plugins/pdk/go/host/nd_host_task_stub.go
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
// Code generated by ndpgen. DO NOT EDIT.
|
||||||
|
//
|
||||||
|
// This file contains mock implementations for non-WASM builds.
|
||||||
|
// These mocks allow IDE support, compilation, and unit testing on non-WASM platforms.
|
||||||
|
// Plugin authors can use the exported mock instances to set expectations in tests.
|
||||||
|
//
|
||||||
|
//go:build !wasip1
|
||||||
|
|
||||||
|
package host
|
||||||
|
|
||||||
|
import "github.com/stretchr/testify/mock"
|
||||||
|
|
||||||
|
// QueueConfig represents the QueueConfig data structure.
|
||||||
|
// QueueConfig holds configuration for a task queue.
|
||||||
|
type QueueConfig struct {
|
||||||
|
Concurrency int32 `json:"concurrency"`
|
||||||
|
MaxRetries int32 `json:"maxRetries"`
|
||||||
|
BackoffMs int64 `json:"backoffMs"`
|
||||||
|
DelayMs int64 `json:"delayMs"`
|
||||||
|
RetentionMs int64 `json:"retentionMs"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskInfo represents the TaskInfo data structure.
|
||||||
|
// TaskInfo holds the current state of a task.
|
||||||
|
type TaskInfo struct {
|
||||||
|
Status string `json:"status"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
Attempt int32 `json:"attempt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// mockTaskService is the mock implementation for testing.
|
||||||
|
type mockTaskService struct {
|
||||||
|
mock.Mock
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskMock is the auto-instantiated mock instance for testing.
|
||||||
|
// Use this to set expectations: host.TaskMock.On("MethodName", args...).Return(values...)
|
||||||
|
var TaskMock = &mockTaskService{}
|
||||||
|
|
||||||
|
// CreateQueue is the mock method for TaskCreateQueue.
|
||||||
|
func (m *mockTaskService) CreateQueue(name string, config QueueConfig) error {
|
||||||
|
args := m.Called(name, config)
|
||||||
|
return args.Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskCreateQueue delegates to the mock instance.
|
||||||
|
// CreateQueue creates a named task queue with the given configuration.
|
||||||
|
// Zero-value fields in config use sensible defaults.
|
||||||
|
// If a queue with the same name already exists, returns an error.
|
||||||
|
// On startup, this also recovers any stale "running" tasks from a previous crash.
|
||||||
|
func TaskCreateQueue(name string, config QueueConfig) error {
|
||||||
|
return TaskMock.CreateQueue(name, config)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enqueue is the mock method for TaskEnqueue.
|
||||||
|
func (m *mockTaskService) Enqueue(queueName string, payload []byte) (string, error) {
|
||||||
|
args := m.Called(queueName, payload)
|
||||||
|
return args.String(0), args.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskEnqueue delegates to the mock instance.
|
||||||
|
// Enqueue adds a task to the named queue. Returns the task ID.
|
||||||
|
// payload is opaque bytes passed back to the plugin on execution.
|
||||||
|
func TaskEnqueue(queueName string, payload []byte) (string, error) {
|
||||||
|
return TaskMock.Enqueue(queueName, payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get is the mock method for TaskGet.
|
||||||
|
func (m *mockTaskService) Get(taskID string) (*TaskInfo, error) {
|
||||||
|
args := m.Called(taskID)
|
||||||
|
return args.Get(0).(*TaskInfo), args.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskGet delegates to the mock instance.
|
||||||
|
// Get returns the current state of a task including its status,
|
||||||
|
// message, and attempt count.
|
||||||
|
func TaskGet(taskID string) (*TaskInfo, error) {
|
||||||
|
return TaskMock.Get(taskID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel is the mock method for TaskCancel.
|
||||||
|
func (m *mockTaskService) Cancel(taskID string) error {
|
||||||
|
args := m.Called(taskID)
|
||||||
|
return args.Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskCancel delegates to the mock instance.
|
||||||
|
// Cancel cancels a pending task. Returns error if already
|
||||||
|
// running, completed, or failed.
|
||||||
|
func TaskCancel(taskID string) error {
|
||||||
|
return TaskMock.Cancel(taskID)
|
||||||
|
}
|
||||||
79
plugins/pdk/go/taskworker/taskworker.go
Normal file
79
plugins/pdk/go/taskworker/taskworker.go
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
// Code generated by ndpgen. DO NOT EDIT.
|
||||||
|
//
|
||||||
|
// This file contains export wrappers for the TaskWorker capability.
|
||||||
|
// It is intended for use in Navidrome plugins built with TinyGo.
|
||||||
|
//
|
||||||
|
//go:build wasip1
|
||||||
|
|
||||||
|
package taskworker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TaskExecuteRequest is the request provided when a task is ready to execute.
|
||||||
|
type TaskExecuteRequest struct {
|
||||||
|
// QueueName is the name of the queue this task belongs to.
|
||||||
|
QueueName string `json:"queueName"`
|
||||||
|
// TaskID is the unique identifier for this task.
|
||||||
|
TaskID string `json:"taskId"`
|
||||||
|
// Payload is the opaque data provided when the task was enqueued.
|
||||||
|
Payload []byte `json:"payload"`
|
||||||
|
// Attempt is the current attempt number (1-based: first attempt = 1).
|
||||||
|
Attempt int32 `json:"attempt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskWorker is the marker interface for taskworker plugins.
|
||||||
|
// Implement one or more of the provider interfaces below.
|
||||||
|
// TaskWorker provides task execution handling.
|
||||||
|
// This capability allows plugins to receive callbacks when their queued tasks
|
||||||
|
// are ready to execute. Plugins that use the taskqueue host service must
|
||||||
|
// implement this capability.
|
||||||
|
type TaskWorker interface{}
|
||||||
|
|
||||||
|
// TaskExecuteProvider provides the OnTaskExecute function.
|
||||||
|
type TaskExecuteProvider interface {
|
||||||
|
OnTaskExecute(TaskExecuteRequest) (string, error)
|
||||||
|
} // Internal implementation holders
|
||||||
|
var (
|
||||||
|
taskExecuteImpl func(TaskExecuteRequest) (string, error)
|
||||||
|
)
|
||||||
|
|
||||||
|
// Register registers a taskworker implementation.
|
||||||
|
// The implementation is checked for optional provider interfaces.
|
||||||
|
func Register(impl TaskWorker) {
|
||||||
|
if p, ok := impl.(TaskExecuteProvider); ok {
|
||||||
|
taskExecuteImpl = p.OnTaskExecute
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotImplementedCode is the standard return code for unimplemented functions.
|
||||||
|
// The host recognizes this and skips the plugin gracefully.
|
||||||
|
const NotImplementedCode int32 = -2
|
||||||
|
|
||||||
|
//go:wasmexport nd_task_execute
|
||||||
|
func _NdTaskExecute() int32 {
|
||||||
|
if taskExecuteImpl == nil {
|
||||||
|
// Return standard code - host will skip this plugin gracefully
|
||||||
|
return NotImplementedCode
|
||||||
|
}
|
||||||
|
|
||||||
|
var input TaskExecuteRequest
|
||||||
|
if err := pdk.InputJSON(&input); err != nil {
|
||||||
|
pdk.SetError(err)
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
output, err := taskExecuteImpl(input)
|
||||||
|
if err != nil {
|
||||||
|
pdk.SetError(err)
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pdk.OutputJSON(output); err != nil {
|
||||||
|
pdk.SetError(err)
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0
|
||||||
|
}
|
||||||
41
plugins/pdk/go/taskworker/taskworker_stub.go
Normal file
41
plugins/pdk/go/taskworker/taskworker_stub.go
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
// Code generated by ndpgen. DO NOT EDIT.
|
||||||
|
//
|
||||||
|
// This file provides stub implementations for non-WASM platforms.
|
||||||
|
// It allows Go plugins to compile and run tests outside of WASM,
|
||||||
|
// but the actual functionality is only available in WASM builds.
|
||||||
|
//
|
||||||
|
//go:build !wasip1
|
||||||
|
|
||||||
|
package taskworker
|
||||||
|
|
||||||
|
// TaskExecuteRequest is the request provided when a task is ready to execute.
|
||||||
|
type TaskExecuteRequest struct {
|
||||||
|
// QueueName is the name of the queue this task belongs to.
|
||||||
|
QueueName string `json:"queueName"`
|
||||||
|
// TaskID is the unique identifier for this task.
|
||||||
|
TaskID string `json:"taskId"`
|
||||||
|
// Payload is the opaque data provided when the task was enqueued.
|
||||||
|
Payload []byte `json:"payload"`
|
||||||
|
// Attempt is the current attempt number (1-based: first attempt = 1).
|
||||||
|
Attempt int32 `json:"attempt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskWorker is the marker interface for taskworker plugins.
|
||||||
|
// Implement one or more of the provider interfaces below.
|
||||||
|
// TaskWorker provides task execution handling.
|
||||||
|
// This capability allows plugins to receive callbacks when their queued tasks
|
||||||
|
// are ready to execute. Plugins that use the taskqueue host service must
|
||||||
|
// implement this capability.
|
||||||
|
type TaskWorker interface{}
|
||||||
|
|
||||||
|
// TaskExecuteProvider provides the OnTaskExecute function.
|
||||||
|
type TaskExecuteProvider interface {
|
||||||
|
OnTaskExecute(TaskExecuteRequest) (string, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotImplementedCode is the standard return code for unimplemented functions.
|
||||||
|
const NotImplementedCode int32 = -2
|
||||||
|
|
||||||
|
// Register is a no-op on non-WASM platforms.
|
||||||
|
// This stub allows code to compile outside of WASM.
|
||||||
|
func Register(_ TaskWorker) {}
|
||||||
@@ -12,6 +12,7 @@ from typing import Any
|
|||||||
|
|
||||||
import extism
|
import extism
|
||||||
import json
|
import json
|
||||||
|
import base64
|
||||||
|
|
||||||
|
|
||||||
class HostFunctionError(Exception):
|
class HostFunctionError(Exception):
|
||||||
@@ -337,7 +338,7 @@ Returns an error if the operation fails.
|
|||||||
"""
|
"""
|
||||||
request = {
|
request = {
|
||||||
"key": key,
|
"key": key,
|
||||||
"value": value,
|
"value": base64.b64encode(value).decode("ascii"),
|
||||||
"ttlSeconds": ttl_seconds,
|
"ttlSeconds": ttl_seconds,
|
||||||
}
|
}
|
||||||
request_bytes = json.dumps(request).encode("utf-8")
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
@@ -382,7 +383,7 @@ or the stored value is not a byte slice, exists will be false.
|
|||||||
raise HostFunctionError(response["error"])
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
return CacheGetBytesResult(
|
return CacheGetBytesResult(
|
||||||
value=response.get("value", b""),
|
value=base64.b64decode(response.get("value", "")),
|
||||||
exists=response.get("exists", False),
|
exists=response.get("exists", False),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
60
plugins/pdk/python/host/nd_host_http.py
Normal file
60
plugins/pdk/python/host/nd_host_http.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
# Code generated by ndpgen. DO NOT EDIT.
|
||||||
|
#
|
||||||
|
# This file contains client wrappers for the HTTP host service.
|
||||||
|
# It is intended for use in Navidrome plugins built with extism-py.
|
||||||
|
#
|
||||||
|
# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly.
|
||||||
|
# The @extism.import_fn decorators are only detected when defined in the plugin's
|
||||||
|
# main __init__.py file. Copy the needed functions from this file into your plugin.
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import extism
|
||||||
|
import json
|
||||||
|
import base64
|
||||||
|
|
||||||
|
|
||||||
|
class HostFunctionError(Exception):
|
||||||
|
"""Raised when a host function returns an error."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@extism.import_fn("extism:host/user", "http_send")
|
||||||
|
def _http_send(offset: int) -> int:
|
||||||
|
"""Raw host function - do not call directly."""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
def http_send(request: Any) -> Any:
|
||||||
|
"""Send executes an HTTP request and returns the response.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
- request: The HTTP request to execute, including method, URL, headers, body, and timeout
|
||||||
|
|
||||||
|
Returns the HTTP response with status code, headers, and body.
|
||||||
|
Network errors, timeouts, and permission failures are returned as Go errors.
|
||||||
|
Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
request: Any parameter.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Any: The result value.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HostFunctionError: If the host function returns an error.
|
||||||
|
"""
|
||||||
|
request = {
|
||||||
|
"request": request,
|
||||||
|
}
|
||||||
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
|
response_offset = _http_send(request_mem.offset)
|
||||||
|
response_mem = extism.memory.find(response_offset)
|
||||||
|
response = json.loads(extism.memory.string(response_mem))
|
||||||
|
|
||||||
|
if response.get("error"):
|
||||||
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
|
return response.get("result", None)
|
||||||
@@ -12,6 +12,7 @@ from typing import Any
|
|||||||
|
|
||||||
import extism
|
import extism
|
||||||
import json
|
import json
|
||||||
|
import base64
|
||||||
|
|
||||||
|
|
||||||
class HostFunctionError(Exception):
|
class HostFunctionError(Exception):
|
||||||
@@ -80,7 +81,7 @@ Returns an error if the storage limit would be exceeded or the operation fails.
|
|||||||
"""
|
"""
|
||||||
request = {
|
request = {
|
||||||
"key": key,
|
"key": key,
|
||||||
"value": value,
|
"value": base64.b64encode(value).decode("ascii"),
|
||||||
}
|
}
|
||||||
request_bytes = json.dumps(request).encode("utf-8")
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
request_mem = extism.memory.alloc(request_bytes)
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
@@ -123,7 +124,7 @@ Returns the value and whether the key exists.
|
|||||||
raise HostFunctionError(response["error"])
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
return KVStoreGetResult(
|
return KVStoreGetResult(
|
||||||
value=response.get("value", b""),
|
value=base64.b64decode(response.get("value", "")),
|
||||||
exists=response.get("exists", False),
|
exists=response.get("exists", False),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -8,11 +8,11 @@
|
|||||||
# main __init__.py file. Copy the needed functions from this file into your plugin.
|
# main __init__.py file. Copy the needed functions from this file into your plugin.
|
||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Any, Tuple
|
from typing import Any
|
||||||
|
|
||||||
import extism
|
import extism
|
||||||
import json
|
import json
|
||||||
import struct
|
import base64
|
||||||
|
|
||||||
|
|
||||||
class HostFunctionError(Exception):
|
class HostFunctionError(Exception):
|
||||||
@@ -32,6 +32,13 @@ def _subsonicapi_callraw(offset: int) -> int:
|
|||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SubsonicAPICallRawResult:
|
||||||
|
"""Result type for subsonicapi_call_raw."""
|
||||||
|
content_type: str
|
||||||
|
data: bytes
|
||||||
|
|
||||||
|
|
||||||
def subsonicapi_call(uri: str) -> str:
|
def subsonicapi_call(uri: str) -> str:
|
||||||
"""Call executes a Subsonic API request and returns the JSON response.
|
"""Call executes a Subsonic API request and returns the JSON response.
|
||||||
|
|
||||||
@@ -62,16 +69,16 @@ e.g., "getAlbumList2?type=random&size=10". The response is returned as raw JSON.
|
|||||||
return response.get("responseJson", "")
|
return response.get("responseJson", "")
|
||||||
|
|
||||||
|
|
||||||
def subsonicapi_call_raw(uri: str) -> Tuple[str, bytes]:
|
def subsonicapi_call_raw(uri: str) -> SubsonicAPICallRawResult:
|
||||||
"""CallRaw executes a Subsonic API request and returns the raw binary response.
|
"""CallRaw executes a Subsonic API request and returns the raw binary response.
|
||||||
Optimized for binary endpoints like getCoverArt and stream that return
|
Designed for binary endpoints like getCoverArt and stream that return
|
||||||
non-JSON data. The response is returned as raw bytes without JSON encoding overhead.
|
non-JSON data. The data is base64-encoded over JSON on the wire.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
uri: str parameter.
|
uri: str parameter.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Tuple of (content_type, data) with the raw binary response.
|
SubsonicAPICallRawResult containing content_type, data,.
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
HostFunctionError: If the host function returns an error.
|
HostFunctionError: If the host function returns an error.
|
||||||
@@ -83,19 +90,12 @@ non-JSON data. The response is returned as raw bytes without JSON encoding overh
|
|||||||
request_mem = extism.memory.alloc(request_bytes)
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
response_offset = _subsonicapi_callraw(request_mem.offset)
|
response_offset = _subsonicapi_callraw(request_mem.offset)
|
||||||
response_mem = extism.memory.find(response_offset)
|
response_mem = extism.memory.find(response_offset)
|
||||||
response_bytes = response_mem.bytes()
|
response = json.loads(extism.memory.string(response_mem))
|
||||||
|
|
||||||
if len(response_bytes) == 0:
|
if response.get("error"):
|
||||||
raise HostFunctionError("empty response from host")
|
raise HostFunctionError(response["error"])
|
||||||
if response_bytes[0] == 0x01:
|
|
||||||
raise HostFunctionError(response_bytes[1:].decode("utf-8"))
|
return SubsonicAPICallRawResult(
|
||||||
if response_bytes[0] != 0x00:
|
content_type=response.get("contentType", ""),
|
||||||
raise HostFunctionError("unknown response status")
|
data=base64.b64decode(response.get("data", "")),
|
||||||
if len(response_bytes) < 5:
|
)
|
||||||
raise HostFunctionError("malformed raw response: incomplete header")
|
|
||||||
ct_len = struct.unpack(">I", response_bytes[1:5])[0]
|
|
||||||
if len(response_bytes) < 5 + ct_len:
|
|
||||||
raise HostFunctionError("malformed raw response: content-type overflow")
|
|
||||||
content_type = response_bytes[5:5 + ct_len].decode("utf-8")
|
|
||||||
data = response_bytes[5 + ct_len:]
|
|
||||||
return content_type, data
|
|
||||||
|
|||||||
154
plugins/pdk/python/host/nd_host_task.py
Normal file
154
plugins/pdk/python/host/nd_host_task.py
Normal file
@@ -0,0 +1,154 @@
|
|||||||
|
# Code generated by ndpgen. DO NOT EDIT.
|
||||||
|
#
|
||||||
|
# This file contains client wrappers for the Task host service.
|
||||||
|
# It is intended for use in Navidrome plugins built with extism-py.
|
||||||
|
#
|
||||||
|
# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly.
|
||||||
|
# The @extism.import_fn decorators are only detected when defined in the plugin's
|
||||||
|
# main __init__.py file. Copy the needed functions from this file into your plugin.
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import extism
|
||||||
|
import json
|
||||||
|
import base64
|
||||||
|
|
||||||
|
|
||||||
|
class HostFunctionError(Exception):
|
||||||
|
"""Raised when a host function returns an error."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@extism.import_fn("extism:host/user", "task_createqueue")
|
||||||
|
def _task_createqueue(offset: int) -> int:
|
||||||
|
"""Raw host function - do not call directly."""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
@extism.import_fn("extism:host/user", "task_enqueue")
|
||||||
|
def _task_enqueue(offset: int) -> int:
|
||||||
|
"""Raw host function - do not call directly."""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
@extism.import_fn("extism:host/user", "task_get")
|
||||||
|
def _task_get(offset: int) -> int:
|
||||||
|
"""Raw host function - do not call directly."""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
@extism.import_fn("extism:host/user", "task_cancel")
|
||||||
|
def _task_cancel(offset: int) -> int:
|
||||||
|
"""Raw host function - do not call directly."""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
def task_create_queue(name: str, config: Any) -> None:
|
||||||
|
"""CreateQueue creates a named task queue with the given configuration.
|
||||||
|
Zero-value fields in config use sensible defaults.
|
||||||
|
If a queue with the same name already exists, returns an error.
|
||||||
|
On startup, this also recovers any stale "running" tasks from a previous crash.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: str parameter.
|
||||||
|
config: Any parameter.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HostFunctionError: If the host function returns an error.
|
||||||
|
"""
|
||||||
|
request = {
|
||||||
|
"name": name,
|
||||||
|
"config": config,
|
||||||
|
}
|
||||||
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
|
response_offset = _task_createqueue(request_mem.offset)
|
||||||
|
response_mem = extism.memory.find(response_offset)
|
||||||
|
response = json.loads(extism.memory.string(response_mem))
|
||||||
|
|
||||||
|
if response.get("error"):
|
||||||
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def task_enqueue(queue_name: str, payload: bytes) -> str:
|
||||||
|
"""Enqueue adds a task to the named queue. Returns the task ID.
|
||||||
|
payload is opaque bytes passed back to the plugin on execution.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
queue_name: str parameter.
|
||||||
|
payload: bytes parameter.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The result value.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HostFunctionError: If the host function returns an error.
|
||||||
|
"""
|
||||||
|
request = {
|
||||||
|
"queueName": queue_name,
|
||||||
|
"payload": base64.b64encode(payload).decode("ascii"),
|
||||||
|
}
|
||||||
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
|
response_offset = _task_enqueue(request_mem.offset)
|
||||||
|
response_mem = extism.memory.find(response_offset)
|
||||||
|
response = json.loads(extism.memory.string(response_mem))
|
||||||
|
|
||||||
|
if response.get("error"):
|
||||||
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
|
return response.get("result", "")
|
||||||
|
|
||||||
|
|
||||||
|
def task_get(task_id: str) -> Any:
|
||||||
|
"""Get returns the current state of a task including its status,
|
||||||
|
message, and attempt count.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task_id: str parameter.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Any: The result value.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HostFunctionError: If the host function returns an error.
|
||||||
|
"""
|
||||||
|
request = {
|
||||||
|
"taskId": task_id,
|
||||||
|
}
|
||||||
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
|
response_offset = _task_get(request_mem.offset)
|
||||||
|
response_mem = extism.memory.find(response_offset)
|
||||||
|
response = json.loads(extism.memory.string(response_mem))
|
||||||
|
|
||||||
|
if response.get("error"):
|
||||||
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
|
return response.get("result", None)
|
||||||
|
|
||||||
|
|
||||||
|
def task_cancel(task_id: str) -> None:
|
||||||
|
"""Cancel cancels a pending task. Returns error if already
|
||||||
|
running, completed, or failed.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task_id: str parameter.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HostFunctionError: If the host function returns an error.
|
||||||
|
"""
|
||||||
|
request = {
|
||||||
|
"taskId": task_id,
|
||||||
|
}
|
||||||
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
|
response_offset = _task_cancel(request_mem.offset)
|
||||||
|
response_mem = extism.memory.find(response_offset)
|
||||||
|
response = json.loads(extism.memory.string(response_mem))
|
||||||
|
|
||||||
|
if response.get("error"):
|
||||||
|
raise HostFunctionError(response["error"])
|
||||||
|
|
||||||
@@ -12,6 +12,7 @@ from typing import Any
|
|||||||
|
|
||||||
import extism
|
import extism
|
||||||
import json
|
import json
|
||||||
|
import base64
|
||||||
|
|
||||||
|
|
||||||
class HostFunctionError(Exception):
|
class HostFunctionError(Exception):
|
||||||
@@ -134,7 +135,7 @@ Returns an error if the connection is not found or if sending fails.
|
|||||||
"""
|
"""
|
||||||
request = {
|
request = {
|
||||||
"connectionId": connection_id,
|
"connectionId": connection_id,
|
||||||
"data": data,
|
"data": base64.b64encode(data).decode("ascii"),
|
||||||
}
|
}
|
||||||
request_bytes = json.dumps(request).encode("utf-8")
|
request_bytes = json.dumps(request).encode("utf-8")
|
||||||
request_mem = extism.memory.alloc(request_bytes)
|
request_mem = extism.memory.alloc(request_bytes)
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ path = "src/lib.rs"
|
|||||||
crate-type = ["rlib"]
|
crate-type = ["rlib"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
base64 = "0.22"
|
||||||
extism-pdk = "1.2"
|
extism-pdk = "1.2"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
|
|||||||
@@ -9,4 +9,5 @@ pub mod lifecycle;
|
|||||||
pub mod metadata;
|
pub mod metadata;
|
||||||
pub mod scheduler;
|
pub mod scheduler;
|
||||||
pub mod scrobbler;
|
pub mod scrobbler;
|
||||||
|
pub mod taskworker;
|
||||||
pub mod websocket;
|
pub mod websocket;
|
||||||
|
|||||||
102
plugins/pdk/rust/nd-pdk-capabilities/src/taskworker.rs
Normal file
102
plugins/pdk/rust/nd-pdk-capabilities/src/taskworker.rs
Normal file
@@ -0,0 +1,102 @@
|
|||||||
|
// Code generated by ndpgen. DO NOT EDIT.
|
||||||
|
//
|
||||||
|
// This file contains export wrappers for the TaskWorker capability.
|
||||||
|
// It is intended for use in Navidrome plugins built with extism-pdk.
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
mod base64_bytes {
|
||||||
|
use serde::{self, Deserialize, Deserializer, Serializer};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
serializer.serialize_str(&BASE64.encode(bytes))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let s = String::deserialize(deserializer)?;
|
||||||
|
BASE64.decode(&s).map_err(serde::de::Error::custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper functions for skip_serializing_if with numeric types
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn is_zero_i32(value: &i32) -> bool { *value == 0 }
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn is_zero_u32(value: &u32) -> bool { *value == 0 }
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn is_zero_i64(value: &i64) -> bool { *value == 0 }
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn is_zero_u64(value: &u64) -> bool { *value == 0 }
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn is_zero_f32(value: &f32) -> bool { *value == 0.0 }
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn is_zero_f64(value: &f64) -> bool { *value == 0.0 }
|
||||||
|
/// TaskExecuteRequest is the request provided when a task is ready to execute.
|
||||||
|
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct TaskExecuteRequest {
|
||||||
|
/// QueueName is the name of the queue this task belongs to.
|
||||||
|
#[serde(default)]
|
||||||
|
pub queue_name: String,
|
||||||
|
/// TaskID is the unique identifier for this task.
|
||||||
|
#[serde(default)]
|
||||||
|
pub task_id: String,
|
||||||
|
/// Payload is the opaque data provided when the task was enqueued.
|
||||||
|
#[serde(default)]
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
|
pub payload: Vec<u8>,
|
||||||
|
/// Attempt is the current attempt number (1-based: first attempt = 1).
|
||||||
|
#[serde(default)]
|
||||||
|
pub attempt: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Error represents an error from a capability method.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Error {
|
||||||
|
pub message: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for Error {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "{}", self.message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for Error {}
|
||||||
|
|
||||||
|
impl Error {
|
||||||
|
pub fn new(message: impl Into<String>) -> Self {
|
||||||
|
Self { message: message.into() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// TaskExecuteProvider provides the OnTaskExecute function.
|
||||||
|
pub trait TaskExecuteProvider {
|
||||||
|
fn on_task_execute(&self, req: TaskExecuteRequest) -> Result<String, Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register the on_task_execute export.
|
||||||
|
/// This macro generates the WASM export function for this method.
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! register_taskworker_task_execute {
|
||||||
|
($plugin_type:ty) => {
|
||||||
|
#[extism_pdk::plugin_fn]
|
||||||
|
pub fn nd_task_execute(
|
||||||
|
req: extism_pdk::Json<$crate::taskworker::TaskExecuteRequest>
|
||||||
|
) -> extism_pdk::FnResult<extism_pdk::Json<String>> {
|
||||||
|
let plugin = <$plugin_type>::default();
|
||||||
|
let result = $crate::taskworker::TaskExecuteProvider::on_task_execute(&plugin, req.into_inner())?;
|
||||||
|
Ok(extism_pdk::Json(result))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -11,6 +11,7 @@ readme = "README.md"
|
|||||||
crate-type = ["rlib"]
|
crate-type = ["rlib"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
base64 = "0.22"
|
||||||
extism-pdk = "1.2"
|
extism-pdk = "1.2"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
|
|||||||
@@ -40,6 +40,7 @@
|
|||||||
//! - [`library`] - provides access to music library metadata for plugins.
|
//! - [`library`] - provides access to music library metadata for plugins.
|
||||||
//! - [`scheduler`] - provides task scheduling capabilities for plugins.
|
//! - [`scheduler`] - provides task scheduling capabilities for plugins.
|
||||||
//! - [`subsonicapi`] - provides access to Navidrome's Subsonic API from plugins.
|
//! - [`subsonicapi`] - provides access to Navidrome's Subsonic API from plugins.
|
||||||
|
//! - [`task`] - provides persistent task queues for plugins.
|
||||||
//! - [`users`] - provides access to user information for plugins.
|
//! - [`users`] - provides access to user information for plugins.
|
||||||
//! - [`websocket`] - provides WebSocket communication capabilities for plugins.
|
//! - [`websocket`] - provides WebSocket communication capabilities for plugins.
|
||||||
|
|
||||||
@@ -99,6 +100,13 @@ pub mod subsonicapi {
|
|||||||
pub use super::nd_host_subsonicapi::*;
|
pub use super::nd_host_subsonicapi::*;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
|
mod nd_host_task;
|
||||||
|
/// provides persistent task queues for plugins.
|
||||||
|
pub mod task {
|
||||||
|
pub use super::nd_host_task::*;
|
||||||
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
mod nd_host_users;
|
mod nd_host_users;
|
||||||
/// provides access to user information for plugins.
|
/// provides access to user information for plugins.
|
||||||
|
|||||||
@@ -5,6 +5,29 @@
|
|||||||
|
|
||||||
use extism_pdk::*;
|
use extism_pdk::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
mod base64_bytes {
|
||||||
|
use serde::{self, Deserialize, Deserializer, Serializer};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
serializer.serialize_str(&BASE64.encode(bytes))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let s = String::deserialize(deserializer)?;
|
||||||
|
BASE64.decode(&s).map_err(serde::de::Error::custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
@@ -106,6 +129,7 @@ struct CacheGetFloatResponse {
|
|||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct CacheSetBytesRequest {
|
struct CacheSetBytesRequest {
|
||||||
key: String,
|
key: String,
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
value: Vec<u8>,
|
value: Vec<u8>,
|
||||||
ttl_seconds: i64,
|
ttl_seconds: i64,
|
||||||
}
|
}
|
||||||
@@ -127,6 +151,7 @@ struct CacheGetBytesRequest {
|
|||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct CacheGetBytesResponse {
|
struct CacheGetBytesResponse {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
value: Vec<u8>,
|
value: Vec<u8>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
exists: bool,
|
exists: bool,
|
||||||
|
|||||||
@@ -5,16 +5,40 @@
|
|||||||
|
|
||||||
use extism_pdk::*;
|
use extism_pdk::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
mod base64_bytes {
|
||||||
|
use serde::{self, Deserialize, Deserializer, Serializer};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
serializer.serialize_str(&BASE64.encode(bytes))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let s = String::deserialize(deserializer)?;
|
||||||
|
BASE64.decode(&s).map_err(serde::de::Error::custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// HTTPRequest represents an outbound HTTP request from a plugin.
|
/// HTTPRequest represents an outbound HTTP request from a plugin.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct HttpRequest {
|
pub struct HTTPRequest {
|
||||||
pub method: String,
|
pub method: String,
|
||||||
pub url: String,
|
pub url: String,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub headers: std::collections::HashMap<String, String>,
|
pub headers: std::collections::HashMap<String, String>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
pub body: Vec<u8>,
|
pub body: Vec<u8>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub timeout_ms: i32,
|
pub timeout_ms: i32,
|
||||||
@@ -23,25 +47,26 @@ pub struct HttpRequest {
|
|||||||
/// HTTPResponse represents the response from an outbound HTTP request.
|
/// HTTPResponse represents the response from an outbound HTTP request.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct HttpResponse {
|
pub struct HTTPResponse {
|
||||||
pub status_code: i32,
|
pub status_code: i32,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub headers: std::collections::HashMap<String, String>,
|
pub headers: std::collections::HashMap<String, String>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
pub body: Vec<u8>,
|
pub body: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct HTTPSendRequest {
|
struct HTTPSendRequest {
|
||||||
request: HttpRequest,
|
request: HTTPRequest,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct HTTPSendResponse {
|
struct HTTPSendResponse {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
result: Option<HttpResponse>,
|
result: Option<HTTPResponse>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
error: Option<String>,
|
error: Option<String>,
|
||||||
}
|
}
|
||||||
@@ -57,18 +82,18 @@ extern "ExtismHost" {
|
|||||||
/// - request: The HTTP request to execute, including method, URL, headers, body, and timeout
|
/// - request: The HTTP request to execute, including method, URL, headers, body, and timeout
|
||||||
///
|
///
|
||||||
/// Returns the HTTP response with status code, headers, and body.
|
/// Returns the HTTP response with status code, headers, and body.
|
||||||
/// Network errors, timeouts, and permission failures are returned as errors.
|
/// Network errors, timeouts, and permission failures are returned as Go errors.
|
||||||
/// Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error.
|
/// Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error.
|
||||||
///
|
///
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
/// * `request` - HttpRequest parameter.
|
/// * `request` - HTTPRequest parameter.
|
||||||
///
|
///
|
||||||
/// # Returns
|
/// # Returns
|
||||||
/// The result value.
|
/// The result value.
|
||||||
///
|
///
|
||||||
/// # Errors
|
/// # Errors
|
||||||
/// Returns an error if the host function call fails.
|
/// Returns an error if the host function call fails.
|
||||||
pub fn send(request: HttpRequest) -> Result<Option<HttpResponse>, Error> {
|
pub fn send(request: HTTPRequest) -> Result<Option<HTTPResponse>, Error> {
|
||||||
let response = unsafe {
|
let response = unsafe {
|
||||||
http_send(Json(HTTPSendRequest {
|
http_send(Json(HTTPSendRequest {
|
||||||
request: request,
|
request: request,
|
||||||
|
|||||||
@@ -5,11 +5,35 @@
|
|||||||
|
|
||||||
use extism_pdk::*;
|
use extism_pdk::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
mod base64_bytes {
|
||||||
|
use serde::{self, Deserialize, Deserializer, Serializer};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
serializer.serialize_str(&BASE64.encode(bytes))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let s = String::deserialize(deserializer)?;
|
||||||
|
BASE64.decode(&s).map_err(serde::de::Error::custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct KVStoreSetRequest {
|
struct KVStoreSetRequest {
|
||||||
key: String,
|
key: String,
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
value: Vec<u8>,
|
value: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -30,6 +54,7 @@ struct KVStoreGetRequest {
|
|||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct KVStoreGetResponse {
|
struct KVStoreGetResponse {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
value: Vec<u8>,
|
value: Vec<u8>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
exists: bool,
|
exists: bool,
|
||||||
|
|||||||
@@ -5,6 +5,29 @@
|
|||||||
|
|
||||||
use extism_pdk::*;
|
use extism_pdk::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
mod base64_bytes {
|
||||||
|
use serde::{self, Deserialize, Deserializer, Serializer};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
serializer.serialize_str(&BASE64.encode(bytes))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let s = String::deserialize(deserializer)?;
|
||||||
|
BASE64.decode(&s).map_err(serde::de::Error::custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
@@ -27,14 +50,22 @@ struct SubsonicAPICallRawRequest {
|
|||||||
uri: String,
|
uri: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct SubsonicAPICallRawResponse {
|
||||||
|
#[serde(default)]
|
||||||
|
content_type: String,
|
||||||
|
#[serde(default)]
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
|
data: Vec<u8>,
|
||||||
|
#[serde(default)]
|
||||||
|
error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
#[host_fn]
|
#[host_fn]
|
||||||
extern "ExtismHost" {
|
extern "ExtismHost" {
|
||||||
fn subsonicapi_call(input: Json<SubsonicAPICallRequest>) -> Json<SubsonicAPICallResponse>;
|
fn subsonicapi_call(input: Json<SubsonicAPICallRequest>) -> Json<SubsonicAPICallResponse>;
|
||||||
}
|
fn subsonicapi_callraw(input: Json<SubsonicAPICallRawRequest>) -> Json<SubsonicAPICallRawResponse>;
|
||||||
|
|
||||||
#[link(wasm_import_module = "extism:host/user")]
|
|
||||||
extern "C" {
|
|
||||||
fn subsonicapi_callraw(offset: u64) -> u64;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Call executes a Subsonic API request and returns the JSON response.
|
/// Call executes a Subsonic API request and returns the JSON response.
|
||||||
@@ -65,54 +96,27 @@ pub fn call(uri: &str) -> Result<String, Error> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// CallRaw executes a Subsonic API request and returns the raw binary response.
|
/// CallRaw executes a Subsonic API request and returns the raw binary response.
|
||||||
/// Optimized for binary endpoints like getCoverArt and stream that return
|
/// Designed for binary endpoints like getCoverArt and stream that return
|
||||||
/// non-JSON data. The response is returned as raw bytes without JSON encoding overhead.
|
/// non-JSON data. The data is base64-encoded over JSON on the wire.
|
||||||
///
|
///
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
/// * `uri` - String parameter.
|
/// * `uri` - String parameter.
|
||||||
///
|
///
|
||||||
/// # Returns
|
/// # Returns
|
||||||
/// A tuple of (content_type, data) with the raw binary response.
|
/// A tuple of (content_type, data).
|
||||||
///
|
///
|
||||||
/// # Errors
|
/// # Errors
|
||||||
/// Returns an error if the host function call fails.
|
/// Returns an error if the host function call fails.
|
||||||
pub fn call_raw(uri: &str) -> Result<(String, Vec<u8>), Error> {
|
pub fn call_raw(uri: &str) -> Result<(String, Vec<u8>), Error> {
|
||||||
let req = SubsonicAPICallRawRequest {
|
let response = unsafe {
|
||||||
uri: uri.to_owned(),
|
subsonicapi_callraw(Json(SubsonicAPICallRawRequest {
|
||||||
|
uri: uri.to_owned(),
|
||||||
|
}))?
|
||||||
};
|
};
|
||||||
let input_bytes = serde_json::to_vec(&req).map_err(|e| Error::msg(e.to_string()))?;
|
|
||||||
let input_mem = Memory::from_bytes(&input_bytes).map_err(|e| Error::msg(e.to_string()))?;
|
|
||||||
|
|
||||||
let response_offset = unsafe { subsonicapi_callraw(input_mem.offset()) };
|
if let Some(err) = response.0.error {
|
||||||
|
return Err(Error::msg(err));
|
||||||
|
}
|
||||||
|
|
||||||
let response_mem = Memory::find(response_offset)
|
Ok((response.0.content_type, response.0.data))
|
||||||
.ok_or_else(|| Error::msg("empty response from host"))?;
|
|
||||||
let response_bytes = response_mem.to_vec();
|
|
||||||
|
|
||||||
if response_bytes.is_empty() {
|
|
||||||
return Err(Error::msg("empty response from host"));
|
|
||||||
}
|
|
||||||
if response_bytes[0] == 0x01 {
|
|
||||||
let msg = String::from_utf8_lossy(&response_bytes[1..]).to_string();
|
|
||||||
return Err(Error::msg(msg));
|
|
||||||
}
|
|
||||||
if response_bytes[0] != 0x00 {
|
|
||||||
return Err(Error::msg("unknown response status"));
|
|
||||||
}
|
|
||||||
if response_bytes.len() < 5 {
|
|
||||||
return Err(Error::msg("malformed raw response: incomplete header"));
|
|
||||||
}
|
|
||||||
let ct_len = u32::from_be_bytes([
|
|
||||||
response_bytes[1],
|
|
||||||
response_bytes[2],
|
|
||||||
response_bytes[3],
|
|
||||||
response_bytes[4],
|
|
||||||
]) as usize;
|
|
||||||
if ct_len > response_bytes.len() - 5 {
|
|
||||||
return Err(Error::msg("malformed raw response: content-type overflow"));
|
|
||||||
}
|
|
||||||
let ct_end = 5 + ct_len;
|
|
||||||
let content_type = String::from_utf8_lossy(&response_bytes[5..ct_end]).to_string();
|
|
||||||
let data = response_bytes[ct_end..].to_vec();
|
|
||||||
Ok((content_type, data))
|
|
||||||
}
|
}
|
||||||
|
|||||||
217
plugins/pdk/rust/nd-pdk-host/src/nd_host_task.rs
Normal file
217
plugins/pdk/rust/nd-pdk-host/src/nd_host_task.rs
Normal file
@@ -0,0 +1,217 @@
|
|||||||
|
// Code generated by ndpgen. DO NOT EDIT.
|
||||||
|
//
|
||||||
|
// This file contains client wrappers for the Task host service.
|
||||||
|
// It is intended for use in Navidrome plugins built with extism-pdk.
|
||||||
|
|
||||||
|
use extism_pdk::*;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
mod base64_bytes {
|
||||||
|
use serde::{self, Deserialize, Deserializer, Serializer};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
serializer.serialize_str(&BASE64.encode(bytes))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let s = String::deserialize(deserializer)?;
|
||||||
|
BASE64.decode(&s).map_err(serde::de::Error::custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// QueueConfig holds configuration for a task queue.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct QueueConfig {
|
||||||
|
pub concurrency: i32,
|
||||||
|
pub max_retries: i32,
|
||||||
|
pub backoff_ms: i64,
|
||||||
|
pub delay_ms: i64,
|
||||||
|
pub retention_ms: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// TaskInfo holds the current state of a task.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct TaskInfo {
|
||||||
|
pub status: String,
|
||||||
|
pub message: String,
|
||||||
|
pub attempt: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskCreateQueueRequest {
|
||||||
|
name: String,
|
||||||
|
config: QueueConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskCreateQueueResponse {
|
||||||
|
#[serde(default)]
|
||||||
|
error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskEnqueueRequest {
|
||||||
|
queue_name: String,
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
|
payload: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskEnqueueResponse {
|
||||||
|
#[serde(default)]
|
||||||
|
result: String,
|
||||||
|
#[serde(default)]
|
||||||
|
error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskGetRequest {
|
||||||
|
task_id: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskGetResponse {
|
||||||
|
#[serde(default)]
|
||||||
|
result: Option<TaskInfo>,
|
||||||
|
#[serde(default)]
|
||||||
|
error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskCancelRequest {
|
||||||
|
task_id: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct TaskCancelResponse {
|
||||||
|
#[serde(default)]
|
||||||
|
error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[host_fn]
|
||||||
|
extern "ExtismHost" {
|
||||||
|
fn task_createqueue(input: Json<TaskCreateQueueRequest>) -> Json<TaskCreateQueueResponse>;
|
||||||
|
fn task_enqueue(input: Json<TaskEnqueueRequest>) -> Json<TaskEnqueueResponse>;
|
||||||
|
fn task_get(input: Json<TaskGetRequest>) -> Json<TaskGetResponse>;
|
||||||
|
fn task_cancel(input: Json<TaskCancelRequest>) -> Json<TaskCancelResponse>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// CreateQueue creates a named task queue with the given configuration.
|
||||||
|
/// Zero-value fields in config use sensible defaults.
|
||||||
|
/// If a queue with the same name already exists, returns an error.
|
||||||
|
/// On startup, this also recovers any stale "running" tasks from a previous crash.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `name` - String parameter.
|
||||||
|
/// * `config` - QueueConfig parameter.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns an error if the host function call fails.
|
||||||
|
pub fn create_queue(name: &str, config: QueueConfig) -> Result<(), Error> {
|
||||||
|
let response = unsafe {
|
||||||
|
task_createqueue(Json(TaskCreateQueueRequest {
|
||||||
|
name: name.to_owned(),
|
||||||
|
config: config,
|
||||||
|
}))?
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(err) = response.0.error {
|
||||||
|
return Err(Error::msg(err));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Enqueue adds a task to the named queue. Returns the task ID.
|
||||||
|
/// payload is opaque bytes passed back to the plugin on execution.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `queue_name` - String parameter.
|
||||||
|
/// * `payload` - Vec<u8> parameter.
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// The result value.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns an error if the host function call fails.
|
||||||
|
pub fn enqueue(queue_name: &str, payload: Vec<u8>) -> Result<String, Error> {
|
||||||
|
let response = unsafe {
|
||||||
|
task_enqueue(Json(TaskEnqueueRequest {
|
||||||
|
queue_name: queue_name.to_owned(),
|
||||||
|
payload: payload,
|
||||||
|
}))?
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(err) = response.0.error {
|
||||||
|
return Err(Error::msg(err));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(response.0.result)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get returns the current state of a task including its status,
|
||||||
|
/// message, and attempt count.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `task_id` - String parameter.
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// The result value.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns an error if the host function call fails.
|
||||||
|
pub fn get(task_id: &str) -> Result<Option<TaskInfo>, Error> {
|
||||||
|
let response = unsafe {
|
||||||
|
task_get(Json(TaskGetRequest {
|
||||||
|
task_id: task_id.to_owned(),
|
||||||
|
}))?
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(err) = response.0.error {
|
||||||
|
return Err(Error::msg(err));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(response.0.result)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cancel cancels a pending task. Returns error if already
|
||||||
|
/// running, completed, or failed.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `task_id` - String parameter.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns an error if the host function call fails.
|
||||||
|
pub fn cancel(task_id: &str) -> Result<(), Error> {
|
||||||
|
let response = unsafe {
|
||||||
|
task_cancel(Json(TaskCancelRequest {
|
||||||
|
task_id: task_id.to_owned(),
|
||||||
|
}))?
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(err) = response.0.error {
|
||||||
|
return Err(Error::msg(err));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -5,6 +5,29 @@
|
|||||||
|
|
||||||
use extism_pdk::*;
|
use extism_pdk::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
mod base64_bytes {
|
||||||
|
use serde::{self, Deserialize, Deserializer, Serializer};
|
||||||
|
use base64::Engine as _;
|
||||||
|
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||||
|
|
||||||
|
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
serializer.serialize_str(&BASE64.encode(bytes))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let s = String::deserialize(deserializer)?;
|
||||||
|
BASE64.decode(&s).map_err(serde::de::Error::custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
@@ -41,6 +64,7 @@ struct WebSocketSendTextResponse {
|
|||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct WebSocketSendBinaryRequest {
|
struct WebSocketSendBinaryRequest {
|
||||||
connection_id: String,
|
connection_id: String,
|
||||||
|
#[serde(with = "base64_bytes")]
|
||||||
data: Vec<u8>,
|
data: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
16
plugins/testdata/test-taskqueue/go.mod
vendored
Normal file
16
plugins/testdata/test-taskqueue/go.mod
vendored
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
module test-taskqueue
|
||||||
|
|
||||||
|
go 1.25
|
||||||
|
|
||||||
|
require github.com/navidrome/navidrome/plugins/pdk/go v0.0.0
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
|
github.com/extism/go-pdk v1.1.3 // indirect
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
|
github.com/stretchr/objx v0.5.2 // indirect
|
||||||
|
github.com/stretchr/testify v1.11.1 // indirect
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
|
)
|
||||||
|
|
||||||
|
replace github.com/navidrome/navidrome/plugins/pdk/go => ../../pdk/go
|
||||||
14
plugins/testdata/test-taskqueue/go.sum
vendored
Normal file
14
plugins/testdata/test-taskqueue/go.sum
vendored
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/extism/go-pdk v1.1.3 h1:hfViMPWrqjN6u67cIYRALZTZLk/enSPpNKa+rZ9X2SQ=
|
||||||
|
github.com/extism/go-pdk v1.1.3/go.mod h1:Gz+LIU/YCKnKXhgge8yo5Yu1F/lbv7KtKFkiCSzW/P4=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
|
||||||
|
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
||||||
|
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||||
|
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
104
plugins/testdata/test-taskqueue/main.go
vendored
Normal file
104
plugins/testdata/test-taskqueue/main.go
vendored
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
// Test TaskQueue plugin for Navidrome plugin system integration tests.
|
||||||
|
// Build with: tinygo build -o ../test-taskqueue.wasm -target wasip1 -buildmode=c-shared .
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/navidrome/navidrome/plugins/pdk/go/host"
|
||||||
|
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
||||||
|
"github.com/navidrome/navidrome/plugins/pdk/go/taskworker"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
taskworker.Register(&handler{})
|
||||||
|
}
|
||||||
|
|
||||||
|
type handler struct{}
|
||||||
|
|
||||||
|
func (h *handler) OnTaskExecute(req taskworker.TaskExecuteRequest) (string, error) {
|
||||||
|
payload := string(req.Payload)
|
||||||
|
if payload == "fail" {
|
||||||
|
return "", fmt.Errorf("task failed as instructed")
|
||||||
|
}
|
||||||
|
if payload == "fail-then-succeed" && req.Attempt < 2 {
|
||||||
|
return "", fmt.Errorf("transient failure")
|
||||||
|
}
|
||||||
|
return "completed successfully", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test helper types
|
||||||
|
type TestInput struct {
|
||||||
|
Operation string `json:"operation"`
|
||||||
|
QueueName string `json:"queueName,omitempty"`
|
||||||
|
Config *host.QueueConfig `json:"config,omitempty"`
|
||||||
|
Payload []byte `json:"payload,omitempty"`
|
||||||
|
TaskID string `json:"taskId,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type TestOutput struct {
|
||||||
|
TaskID string `json:"taskId,omitempty"`
|
||||||
|
Status string `json:"status,omitempty"`
|
||||||
|
Message string `json:"message,omitempty"`
|
||||||
|
Attempt int32 `json:"attempt,omitempty"`
|
||||||
|
Error *string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
//go:wasmexport nd_test_taskqueue
|
||||||
|
func ndTestTaskQueue() int32 {
|
||||||
|
var input TestInput
|
||||||
|
if err := pdk.InputJSON(&input); err != nil {
|
||||||
|
errStr := err.Error()
|
||||||
|
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
switch input.Operation {
|
||||||
|
case "create_queue":
|
||||||
|
config := host.QueueConfig{}
|
||||||
|
if input.Config != nil {
|
||||||
|
config = *input.Config
|
||||||
|
}
|
||||||
|
err := host.TaskCreateQueue(input.QueueName, config)
|
||||||
|
if err != nil {
|
||||||
|
errStr := err.Error()
|
||||||
|
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
pdk.OutputJSON(TestOutput{})
|
||||||
|
|
||||||
|
case "enqueue":
|
||||||
|
taskID, err := host.TaskEnqueue(input.QueueName, input.Payload)
|
||||||
|
if err != nil {
|
||||||
|
errStr := err.Error()
|
||||||
|
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
pdk.OutputJSON(TestOutput{TaskID: taskID})
|
||||||
|
|
||||||
|
case "get_task_status":
|
||||||
|
info, err := host.TaskGet(input.TaskID)
|
||||||
|
if err != nil {
|
||||||
|
errStr := err.Error()
|
||||||
|
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
pdk.OutputJSON(TestOutput{Status: info.Status, Message: info.Message, Attempt: info.Attempt})
|
||||||
|
|
||||||
|
case "cancel_task":
|
||||||
|
err := host.TaskCancel(input.TaskID)
|
||||||
|
if err != nil {
|
||||||
|
errStr := err.Error()
|
||||||
|
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
pdk.OutputJSON(TestOutput{})
|
||||||
|
|
||||||
|
default:
|
||||||
|
errStr := "unknown operation: " + input.Operation
|
||||||
|
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {}
|
||||||
12
plugins/testdata/test-taskqueue/manifest.json
vendored
Normal file
12
plugins/testdata/test-taskqueue/manifest.json
vendored
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
{
|
||||||
|
"name": "Test TaskQueue Plugin",
|
||||||
|
"author": "Navidrome Test",
|
||||||
|
"version": "1.0.0",
|
||||||
|
"description": "A test plugin for TaskQueue integration testing",
|
||||||
|
"permissions": {
|
||||||
|
"taskqueue": {
|
||||||
|
"reason": "For testing task queue operations",
|
||||||
|
"maxConcurrency": 10
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -443,7 +443,7 @@ func (api *Router) buildArtist(r *http.Request, artist *model.Artist) (*response
|
|||||||
func (api *Router) buildAlbumDirectory(ctx context.Context, album *model.Album) (*responses.Directory, error) {
|
func (api *Router) buildAlbumDirectory(ctx context.Context, album *model.Album) (*responses.Directory, error) {
|
||||||
dir := &responses.Directory{}
|
dir := &responses.Directory{}
|
||||||
dir.Id = album.ID
|
dir.Id = album.ID
|
||||||
dir.Name = album.Name
|
dir.Name = album.FullName()
|
||||||
dir.Parent = album.AlbumArtistID
|
dir.Parent = album.AlbumArtistID
|
||||||
dir.PlayCount = album.PlayCount
|
dir.PlayCount = album.PlayCount
|
||||||
if album.PlayCount > 0 {
|
if album.PlayCount > 0 {
|
||||||
|
|||||||
@@ -197,7 +197,7 @@ func childFromMediaFile(ctx context.Context, mf model.MediaFile) responses.Child
|
|||||||
}
|
}
|
||||||
|
|
||||||
child.Parent = mf.AlbumID
|
child.Parent = mf.AlbumID
|
||||||
child.Album = mf.Album
|
child.Album = mf.FullAlbumName()
|
||||||
child.Year = int32(mf.Year)
|
child.Year = int32(mf.Year)
|
||||||
child.Artist = mf.Artist
|
child.Artist = mf.Artist
|
||||||
child.Genre = mf.Genre
|
child.Genre = mf.Genre
|
||||||
@@ -302,7 +302,7 @@ func artistRefs(participants model.ParticipantList) []responses.ArtistID3Ref {
|
|||||||
func fakePath(mf model.MediaFile) string {
|
func fakePath(mf model.MediaFile) string {
|
||||||
builder := strings.Builder{}
|
builder := strings.Builder{}
|
||||||
|
|
||||||
builder.WriteString(fmt.Sprintf("%s/%s/", sanitizeSlashes(mf.AlbumArtist), sanitizeSlashes(mf.Album)))
|
builder.WriteString(fmt.Sprintf("%s/%s/", sanitizeSlashes(mf.AlbumArtist), sanitizeSlashes(mf.FullAlbumName())))
|
||||||
if mf.DiscNumber != 0 {
|
if mf.DiscNumber != 0 {
|
||||||
builder.WriteString(fmt.Sprintf("%02d-", mf.DiscNumber))
|
builder.WriteString(fmt.Sprintf("%02d-", mf.DiscNumber))
|
||||||
}
|
}
|
||||||
@@ -321,9 +321,10 @@ func childFromAlbum(ctx context.Context, al model.Album) responses.Child {
|
|||||||
child := responses.Child{}
|
child := responses.Child{}
|
||||||
child.Id = al.ID
|
child.Id = al.ID
|
||||||
child.IsDir = true
|
child.IsDir = true
|
||||||
child.Title = al.Name
|
fullName := al.FullName()
|
||||||
child.Name = al.Name
|
child.Title = fullName
|
||||||
child.Album = al.Name
|
child.Name = fullName
|
||||||
|
child.Album = fullName
|
||||||
child.Artist = al.AlbumArtist
|
child.Artist = al.AlbumArtist
|
||||||
child.Year = int32(cmp.Or(al.MaxOriginalYear, al.MaxYear))
|
child.Year = int32(cmp.Or(al.MaxOriginalYear, al.MaxYear))
|
||||||
child.Genre = al.Genre
|
child.Genre = al.Genre
|
||||||
@@ -405,7 +406,7 @@ func buildDiscSubtitles(a model.Album) []responses.DiscTitle {
|
|||||||
func buildAlbumID3(ctx context.Context, album model.Album) responses.AlbumID3 {
|
func buildAlbumID3(ctx context.Context, album model.Album) responses.AlbumID3 {
|
||||||
dir := responses.AlbumID3{}
|
dir := responses.AlbumID3{}
|
||||||
dir.Id = album.ID
|
dir.Id = album.ID
|
||||||
dir.Name = album.Name
|
dir.Name = album.FullName()
|
||||||
dir.Artist = album.AlbumArtist
|
dir.Artist = album.AlbumArtist
|
||||||
dir.ArtistId = album.AlbumArtistID
|
dir.ArtistId = album.AlbumArtistID
|
||||||
dir.CoverArt = album.CoverArtID().String()
|
dir.CoverArt = album.CoverArtID().String()
|
||||||
|
|||||||
Reference in New Issue
Block a user