mirror of
https://github.com/navidrome/navidrome.git
synced 2026-02-27 04:16:03 -05:00
Compare commits
22 Commits
dependabot
...
feat/plugi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
28ebd754a1 | ||
|
|
7a4147c489 | ||
|
|
c40bf3540a | ||
|
|
c492ae19f3 | ||
|
|
f9beb3c2d7 | ||
|
|
2ea20f2511 | ||
|
|
6e8b826022 | ||
|
|
57b39685bc | ||
|
|
525aa0e861 | ||
|
|
11461d5f2c | ||
|
|
1cfc2d9741 | ||
|
|
f5dca3a2db | ||
|
|
7180952103 | ||
|
|
8238ed6a2c | ||
|
|
516e229b27 | ||
|
|
582d1b3cd9 | ||
|
|
cdd3432788 | ||
|
|
5bc2bbb70e | ||
|
|
14343d91b0 | ||
|
|
fc36f1daa6 | ||
|
|
652c27690b | ||
|
|
2bb13e5ff1 |
@@ -155,6 +155,7 @@ type scannerOptions struct {
|
||||
|
||||
type subsonicOptions struct {
|
||||
AppendSubtitle bool
|
||||
AppendAlbumVersion bool
|
||||
ArtistParticipations bool
|
||||
DefaultReportRealPath bool
|
||||
EnableAverageRating bool
|
||||
@@ -250,6 +251,7 @@ type pluginsOptions struct {
|
||||
type extAuthOptions struct {
|
||||
TrustedSources string
|
||||
UserHeader string
|
||||
LogoutURL string
|
||||
}
|
||||
|
||||
type searchOptions struct {
|
||||
@@ -345,6 +347,7 @@ func Load(noConfigDump bool) {
|
||||
validateBackupSchedule,
|
||||
validatePlaylistsPath,
|
||||
validatePurgeMissingOption,
|
||||
validateURL("ExtAuth.LogoutURL", Server.ExtAuth.LogoutURL),
|
||||
)
|
||||
if err != nil {
|
||||
os.Exit(1)
|
||||
@@ -548,6 +551,33 @@ func validateSchedule(schedule, field string) (string, error) {
|
||||
return schedule, err
|
||||
}
|
||||
|
||||
// validateURL checks if the provided URL is valid and has either http or https scheme.
|
||||
// It returns a function that can be used as a hook to validate URLs in the config.
|
||||
func validateURL(optionName, optionURL string) func() error {
|
||||
return func() error {
|
||||
if optionURL == "" {
|
||||
return nil
|
||||
}
|
||||
u, err := url.Parse(optionURL)
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprintf("Invalid %s: it could not be parsed", optionName), "url", optionURL, "err", err)
|
||||
return err
|
||||
}
|
||||
if u.Scheme != "http" && u.Scheme != "https" {
|
||||
err := fmt.Errorf("invalid scheme for %s: '%s'. Only 'http' and 'https' are allowed", optionName, u.Scheme)
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
// Require an absolute URL with a non-empty host and no opaque component.
|
||||
if u.Host == "" || u.Opaque != "" {
|
||||
err := fmt.Errorf("invalid %s: '%s'. A full http(s) URL with a non-empty host is required", optionName, optionURL)
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func normalizeSearchBackend(value string) string {
|
||||
v := strings.ToLower(strings.TrimSpace(value))
|
||||
switch v {
|
||||
@@ -641,6 +671,7 @@ func setViperDefaults() {
|
||||
viper.SetDefault("passwordencryptionkey", "")
|
||||
viper.SetDefault("extauth.userheader", "Remote-User")
|
||||
viper.SetDefault("extauth.trustedsources", "")
|
||||
viper.SetDefault("extauth.logouturl", "")
|
||||
viper.SetDefault("prometheus.enabled", false)
|
||||
viper.SetDefault("prometheus.metricspath", consts.PrometheusDefaultPath)
|
||||
viper.SetDefault("prometheus.password", "")
|
||||
@@ -659,6 +690,7 @@ func setViperDefaults() {
|
||||
viper.SetDefault("scanner.followsymlinks", true)
|
||||
viper.SetDefault("scanner.purgemissing", consts.PurgeMissingNever)
|
||||
viper.SetDefault("subsonic.appendsubtitle", true)
|
||||
viper.SetDefault("subsonic.appendalbumversion", true)
|
||||
viper.SetDefault("subsonic.artistparticipations", false)
|
||||
viper.SetDefault("subsonic.defaultreportrealpath", false)
|
||||
viper.SetDefault("subsonic.enableaveragerating", true)
|
||||
|
||||
@@ -52,6 +52,48 @@ var _ = Describe("Configuration", func() {
|
||||
})
|
||||
})
|
||||
|
||||
Describe("ValidateURL", func() {
|
||||
It("accepts a valid http URL", func() {
|
||||
fn := conf.ValidateURL("TestOption", "http://example.com/path")
|
||||
Expect(fn()).To(Succeed())
|
||||
})
|
||||
|
||||
It("accepts a valid https URL", func() {
|
||||
fn := conf.ValidateURL("TestOption", "https://example.com/path")
|
||||
Expect(fn()).To(Succeed())
|
||||
})
|
||||
|
||||
It("rejects a URL with no scheme", func() {
|
||||
fn := conf.ValidateURL("TestOption", "example.com/path")
|
||||
Expect(fn()).To(MatchError(ContainSubstring("invalid scheme")))
|
||||
})
|
||||
|
||||
It("rejects a URL with an unsupported scheme", func() {
|
||||
fn := conf.ValidateURL("TestOption", "javascript://example.com/path")
|
||||
Expect(fn()).To(MatchError(ContainSubstring("invalid scheme")))
|
||||
})
|
||||
|
||||
It("accepts an empty URL (optional config)", func() {
|
||||
fn := conf.ValidateURL("TestOption", "")
|
||||
Expect(fn()).To(Succeed())
|
||||
})
|
||||
|
||||
It("includes the option name in the error message", func() {
|
||||
fn := conf.ValidateURL("MyOption", "ftp://example.com")
|
||||
Expect(fn()).To(MatchError(ContainSubstring("MyOption")))
|
||||
})
|
||||
|
||||
It("rejects a URL that cannot be parsed", func() {
|
||||
fn := conf.ValidateURL("TestOption", "://invalid")
|
||||
Expect(fn()).To(HaveOccurred())
|
||||
})
|
||||
|
||||
It("rejects a URL without a host", func() {
|
||||
fn := conf.ValidateURL("TestOption", "http:///path")
|
||||
Expect(fn()).To(MatchError(ContainSubstring("non-empty host is required")))
|
||||
})
|
||||
})
|
||||
|
||||
DescribeTable("NormalizeSearchBackend",
|
||||
func(input, expected string) {
|
||||
Expect(conf.NormalizeSearchBackend(input)).To(Equal(expected))
|
||||
|
||||
@@ -8,4 +8,6 @@ var SetViperDefaults = setViperDefaults
|
||||
|
||||
var ParseLanguages = parseLanguages
|
||||
|
||||
var ValidateURL = validateURL
|
||||
|
||||
var NormalizeSearchBackend = normalizeSearchBackend
|
||||
|
||||
9
go.mod
9
go.mod
@@ -1,13 +1,13 @@
|
||||
module github.com/navidrome/navidrome
|
||||
|
||||
go 1.25
|
||||
go 1.25.0
|
||||
|
||||
replace (
|
||||
// Fork to fix https://github.com/navidrome/navidrome/issues/3254
|
||||
github.com/dhowden/tag v0.0.0-20240417053706-3d75831295e8 => github.com/deluan/tag v0.0.0-20241002021117-dfe5e6ea396d
|
||||
|
||||
// Fork to implement raw tags support
|
||||
go.senan.xyz/taglib => github.com/deluan/go-taglib v0.0.0-20260221220301-2fab4903f48e
|
||||
go.senan.xyz/taglib => github.com/deluan/go-taglib v0.0.0-20260225021432-1699562530f1
|
||||
)
|
||||
|
||||
require (
|
||||
@@ -53,7 +53,7 @@ require (
|
||||
github.com/onsi/gomega v1.39.1
|
||||
github.com/pelletier/go-toml/v2 v2.2.4
|
||||
github.com/pocketbase/dbx v1.12.0
|
||||
github.com/pressly/goose/v3 v3.26.0
|
||||
github.com/pressly/goose/v3 v3.27.0
|
||||
github.com/prometheus/client_golang v1.23.2
|
||||
github.com/rjeczalik/notify v0.9.3
|
||||
github.com/robfig/cron/v3 v3.0.1
|
||||
@@ -88,7 +88,7 @@ require (
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/creack/pty v1.1.24 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.1 // indirect
|
||||
github.com/dylibso/observe-sdk/go v0.0.0-20240828172851-9145d8ad07e1 // indirect
|
||||
github.com/fsnotify/fsnotify v1.9.0 // indirect
|
||||
github.com/go-logr/logr v1.4.3 // indirect
|
||||
@@ -140,7 +140,6 @@ require (
|
||||
go.yaml.in/yaml/v2 v2.4.3 // indirect
|
||||
go.yaml.in/yaml/v3 v3.0.4 // indirect
|
||||
golang.org/x/crypto v0.48.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20260112195511-716be5621a96 // indirect
|
||||
golang.org/x/mod v0.33.0 // indirect
|
||||
golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4 // indirect
|
||||
golang.org/x/tools v0.42.0 // indirect
|
||||
|
||||
36
go.sum
36
go.sum
@@ -1,7 +1,7 @@
|
||||
dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8=
|
||||
dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA=
|
||||
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
|
||||
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
|
||||
filippo.io/edwards25519 v1.2.0 h1:crnVqOiS4jqYleHd9vaKZ+HKtHfllngJIiOpNpoJsjo=
|
||||
filippo.io/edwards25519 v1.2.0/go.mod h1:xzAOLCNug/yB62zG1bQ8uziwrIqIuxhctzJT18Q77mc=
|
||||
github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0=
|
||||
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
|
||||
github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM=
|
||||
@@ -34,10 +34,10 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 h1:NMZiJj8QnKe1LgsbDayM4UoHwbvwDRwnI3hwNaAHRnc=
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0/go.mod h1:ZXNYxsqcloTdSy/rNShjYzMhyjf0LaoftYK0p+A3h40=
|
||||
github.com/deluan/go-taglib v0.0.0-20260221220301-2fab4903f48e h1:yQF3eOcI2dMMtxqdKXm3cgfYZlDcq9SUDDv90bsMj2I=
|
||||
github.com/deluan/go-taglib v0.0.0-20260221220301-2fab4903f48e/go.mod h1:sKDN0U4qXDlq6LFK+aOAkDH4Me5nDV1V/A4B+B69xBA=
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.1 h1:5RVFMOWjMyRy8cARdy79nAmgYw3hK/4HUq48LQ6Wwqo=
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.1/go.mod h1:ZXNYxsqcloTdSy/rNShjYzMhyjf0LaoftYK0p+A3h40=
|
||||
github.com/deluan/go-taglib v0.0.0-20260225021432-1699562530f1 h1:seWJmkPAb+M1ysRNGzTGS7FfdrUe9wQTHhB9p2fxDWg=
|
||||
github.com/deluan/go-taglib v0.0.0-20260225021432-1699562530f1/go.mod h1:sKDN0U4qXDlq6LFK+aOAkDH4Me5nDV1V/A4B+B69xBA=
|
||||
github.com/deluan/rest v0.0.0-20211102003136-6260bc399cbf h1:tb246l2Zmpt/GpF9EcHCKTtwzrd0HGfEmoODFA/qnk4=
|
||||
github.com/deluan/rest v0.0.0-20211102003136-6260bc399cbf/go.mod h1:tSgDythFsl0QgS/PFWfIZqcJKnkADWneY80jaVRlqK8=
|
||||
github.com/deluan/sanitize v0.0.0-20241120162836-fdfd8fdfaa55 h1:wSCnggTs2f2ji6nFwQmfwgINcmSMj0xF0oHnoyRSPe4=
|
||||
@@ -143,8 +143,8 @@ github.com/kardianos/service v1.2.4 h1:XNlGtZOYNx2u91urOdg/Kfmc+gfmuIo1Dd3rEi2Og
|
||||
github.com/kardianos/service v1.2.4/go.mod h1:E4V9ufUuY82F7Ztlu1eN9VXWIQxg8NoLQlmFe0MtrXc=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
|
||||
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
|
||||
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
|
||||
github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c=
|
||||
github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
|
||||
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
|
||||
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
@@ -193,8 +193,8 @@ github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQ
|
||||
github.com/mitchellh/go-wordwrap v1.0.1/go.mod h1:R62XHJLzvMFRBbcrT7m7WgmE1eOyTSsCt+hzestvNj0=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
|
||||
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
|
||||
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
||||
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
|
||||
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
||||
github.com/ogier/pflag v0.0.1 h1:RW6JSWSu/RkSatfcLtogGfFgpim5p7ARQ10ECk5O750=
|
||||
github.com/ogier/pflag v0.0.1/go.mod h1:zkFki7tvTa0tafRvTBIZTvzYyAu6kQhPZFnshFFPE+g=
|
||||
github.com/onsi/ginkgo/v2 v2.28.1 h1:S4hj+HbZp40fNKuLUQOYLDgZLwNUVn19N3Atb98NCyI=
|
||||
@@ -212,8 +212,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/pocketbase/dbx v1.12.0 h1:/oLErM+A0b4xI0PWTGPqSDVjzix48PqI/bng2l0PzoA=
|
||||
github.com/pocketbase/dbx v1.12.0/go.mod h1:xXRCIAKTHMgUCyCKZm55pUOdvFziJjQfXaWKhu2vhMs=
|
||||
github.com/pressly/goose/v3 v3.26.0 h1:KJakav68jdH0WDvoAcj8+n61WqOIaPGgH0bJWS6jpmM=
|
||||
github.com/pressly/goose/v3 v3.26.0/go.mod h1:4hC1KrritdCxtuFsqgs1R4AU5bWtTAf+cnWvfhf2DNY=
|
||||
github.com/pressly/goose/v3 v3.27.0 h1:/D30gVTuQhu0WsNZYbJi4DMOsx1lNq+6SkLe+Wp59BM=
|
||||
github.com/pressly/goose/v3 v3.27.0/go.mod h1:3ZBeCXqzkgIRvrEMDkYh1guvtoJTU5oMMuDdkutoM78=
|
||||
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
|
||||
github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
|
||||
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
|
||||
@@ -321,8 +321,8 @@ golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v
|
||||
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
|
||||
golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts=
|
||||
golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos=
|
||||
golang.org/x/exp v0.0.0-20260112195511-716be5621a96 h1:Z/6YuSHTLOHfNFdb8zVZomZr7cqNgTJvA8+Qz75D8gU=
|
||||
golang.org/x/exp v0.0.0-20260112195511-716be5621a96/go.mod h1:nzimsREAkjBCIEFtHiYkrJyT+2uy9YZJB7H1k68CXZU=
|
||||
golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa h1:Zt3DZoOFFYkKhDT3v7Lm9FDMEV06GpzjG2jrqW+QTE0=
|
||||
golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa/go.mod h1:K79w1Vqn7PoiZn+TkNpx3BUWUQksGO3JcVX6qIjytmA=
|
||||
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
|
||||
golang.org/x/image v0.36.0 h1:Iknbfm1afbgtwPTmHnS2gTM/6PPZfH+z2EFuOkSbqwc=
|
||||
golang.org/x/image v0.36.0/go.mod h1:YsWD2TyyGKiIX1kZlu9QfKIsQ4nAAK9bdgdrIsE7xy4=
|
||||
@@ -423,11 +423,11 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
modernc.org/libc v1.66.3 h1:cfCbjTUcdsKyyZZfEUKfoHcP3S0Wkvz3jgSzByEWVCQ=
|
||||
modernc.org/libc v1.66.3/go.mod h1:XD9zO8kt59cANKvHPXpx7yS2ELPheAey0vjIuZOhOU8=
|
||||
modernc.org/libc v1.68.0 h1:PJ5ikFOV5pwpW+VqCK1hKJuEWsonkIJhhIXyuF/91pQ=
|
||||
modernc.org/libc v1.68.0/go.mod h1:NnKCYeoYgsEqnY3PgvNgAeaJnso968ygU8Z0DxjoEc0=
|
||||
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
|
||||
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
|
||||
modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI=
|
||||
modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
|
||||
modernc.org/sqlite v1.38.2 h1:Aclu7+tgjgcQVShZqim41Bbw9Cho0y/7WzYptXqkEek=
|
||||
modernc.org/sqlite v1.38.2/go.mod h1:cPTJYSlgg3Sfg046yBShXENNtPrWrDX8bsbAQBzgQ5E=
|
||||
modernc.org/sqlite v1.46.1 h1:eFJ2ShBLIEnUWlLy12raN0Z1plqmFX9Qe3rjQTKt6sU=
|
||||
modernc.org/sqlite v1.46.1/go.mod h1:CzbrU2lSB1DKUusvwGz7rqEKIq+NUd8GWuBBZDs9/nA=
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"iter"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/navidrome/navidrome/conf"
|
||||
|
||||
"github.com/gohugoio/hashstructure"
|
||||
)
|
||||
|
||||
@@ -70,6 +73,13 @@ func (a Album) CoverArtID() ArtworkID {
|
||||
return artworkIDFromAlbum(a)
|
||||
}
|
||||
|
||||
func (a Album) FullName() string {
|
||||
if conf.Server.Subsonic.AppendAlbumVersion && len(a.Tags[TagAlbumVersion]) > 0 {
|
||||
return fmt.Sprintf("%s (%s)", a.Name, a.Tags[TagAlbumVersion][0])
|
||||
}
|
||||
return a.Name
|
||||
}
|
||||
|
||||
// Equals compares two Album structs, ignoring calculated fields
|
||||
func (a Album) Equals(other Album) bool {
|
||||
// Normalize float32 values to avoid false negatives
|
||||
|
||||
@@ -3,11 +3,30 @@ package model_test
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/navidrome/navidrome/conf"
|
||||
"github.com/navidrome/navidrome/conf/configtest"
|
||||
. "github.com/navidrome/navidrome/model"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("Album", func() {
|
||||
BeforeEach(func() {
|
||||
DeferCleanup(configtest.SetupConfig())
|
||||
})
|
||||
DescribeTable("FullName",
|
||||
func(enabled bool, tags Tags, expected string) {
|
||||
conf.Server.Subsonic.AppendAlbumVersion = enabled
|
||||
a := Album{Name: "Album", Tags: tags}
|
||||
Expect(a.FullName()).To(Equal(expected))
|
||||
},
|
||||
Entry("appends version when enabled and tag is present", true, Tags{TagAlbumVersion: []string{"Remastered"}}, "Album (Remastered)"),
|
||||
Entry("returns just name when disabled", false, Tags{TagAlbumVersion: []string{"Remastered"}}, "Album"),
|
||||
Entry("returns just name when tag is absent", true, Tags{}, "Album"),
|
||||
Entry("returns just name when tag is an empty slice", true, Tags{TagAlbumVersion: []string{}}, "Album"),
|
||||
)
|
||||
})
|
||||
|
||||
var _ = Describe("Albums", func() {
|
||||
var albums Albums
|
||||
|
||||
|
||||
@@ -95,12 +95,19 @@ type MediaFile struct {
|
||||
}
|
||||
|
||||
func (mf MediaFile) FullTitle() string {
|
||||
if conf.Server.Subsonic.AppendSubtitle && mf.Tags[TagSubtitle] != nil {
|
||||
if conf.Server.Subsonic.AppendSubtitle && len(mf.Tags[TagSubtitle]) > 0 {
|
||||
return fmt.Sprintf("%s (%s)", mf.Title, mf.Tags[TagSubtitle][0])
|
||||
}
|
||||
return mf.Title
|
||||
}
|
||||
|
||||
func (mf MediaFile) FullAlbumName() string {
|
||||
if conf.Server.Subsonic.AppendAlbumVersion && len(mf.Tags[TagAlbumVersion]) > 0 {
|
||||
return fmt.Sprintf("%s (%s)", mf.Album, mf.Tags[TagAlbumVersion][0])
|
||||
}
|
||||
return mf.Album
|
||||
}
|
||||
|
||||
func (mf MediaFile) ContentType() string {
|
||||
return mime.TypeByExtension("." + mf.Suffix)
|
||||
}
|
||||
|
||||
@@ -475,7 +475,29 @@ var _ = Describe("MediaFile", func() {
|
||||
DeferCleanup(configtest.SetupConfig())
|
||||
conf.Server.EnableMediaFileCoverArt = true
|
||||
})
|
||||
Describe(".CoverArtId()", func() {
|
||||
DescribeTable("FullTitle",
|
||||
func(enabled bool, tags Tags, expected string) {
|
||||
conf.Server.Subsonic.AppendSubtitle = enabled
|
||||
mf := MediaFile{Title: "Song", Tags: tags}
|
||||
Expect(mf.FullTitle()).To(Equal(expected))
|
||||
},
|
||||
Entry("appends subtitle when enabled and tag is present", true, Tags{TagSubtitle: []string{"Live"}}, "Song (Live)"),
|
||||
Entry("returns just title when disabled", false, Tags{TagSubtitle: []string{"Live"}}, "Song"),
|
||||
Entry("returns just title when tag is absent", true, Tags{}, "Song"),
|
||||
Entry("returns just title when tag is an empty slice", true, Tags{TagSubtitle: []string{}}, "Song"),
|
||||
)
|
||||
DescribeTable("FullAlbumName",
|
||||
func(enabled bool, tags Tags, expected string) {
|
||||
conf.Server.Subsonic.AppendAlbumVersion = enabled
|
||||
mf := MediaFile{Album: "Album", Tags: tags}
|
||||
Expect(mf.FullAlbumName()).To(Equal(expected))
|
||||
},
|
||||
Entry("appends version when enabled and tag is present", true, Tags{TagAlbumVersion: []string{"Deluxe Edition"}}, "Album (Deluxe Edition)"),
|
||||
Entry("returns just album name when disabled", false, Tags{TagAlbumVersion: []string{"Deluxe Edition"}}, "Album"),
|
||||
Entry("returns just album name when tag is absent", true, Tags{}, "Album"),
|
||||
Entry("returns just album name when tag is an empty slice", true, Tags{TagAlbumVersion: []string{}}, "Album"),
|
||||
)
|
||||
Describe("CoverArtId()", func() {
|
||||
It("returns its own id if it HasCoverArt", func() {
|
||||
mf := MediaFile{ID: "111", AlbumID: "1", HasCoverArt: true}
|
||||
id := mf.CoverArtID()
|
||||
|
||||
33
plugins/capabilities/taskworker.go
Normal file
33
plugins/capabilities/taskworker.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package capabilities
|
||||
|
||||
// TaskWorker provides task execution handling.
|
||||
// This capability allows plugins to receive callbacks when their queued tasks
|
||||
// are ready to execute. Plugins that use the taskqueue host service must
|
||||
// implement this capability.
|
||||
//
|
||||
//nd:capability name=taskworker
|
||||
type TaskWorker interface {
|
||||
// OnTaskExecute is called when a queued task is ready to run.
|
||||
// Return an error to trigger retry (if retries are configured).
|
||||
//nd:export name=nd_task_execute
|
||||
OnTaskExecute(TaskExecuteRequest) (TaskExecuteResponse, error)
|
||||
}
|
||||
|
||||
// TaskExecuteRequest is the request provided when a task is ready to execute.
|
||||
type TaskExecuteRequest struct {
|
||||
// QueueName is the name of the queue this task belongs to.
|
||||
QueueName string `json:"queueName"`
|
||||
// TaskID is the unique identifier for this task.
|
||||
TaskID string `json:"taskId"`
|
||||
// Payload is the opaque data provided when the task was enqueued.
|
||||
Payload []byte `json:"payload"`
|
||||
// Attempt is the current attempt number (1-based: first attempt = 1).
|
||||
Attempt int32 `json:"attempt"`
|
||||
}
|
||||
|
||||
// TaskExecuteResponse is the response from task execution.
|
||||
type TaskExecuteResponse struct {
|
||||
// Error, if non-empty, indicates the task failed. The task will be retried
|
||||
// if retries are configured and attempts remain.
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
45
plugins/capabilities/taskworker.yaml
Normal file
45
plugins/capabilities/taskworker.yaml
Normal file
@@ -0,0 +1,45 @@
|
||||
version: v1-draft
|
||||
exports:
|
||||
nd_task_execute:
|
||||
description: |-
|
||||
OnTaskExecute is called when a queued task is ready to run.
|
||||
Return an error to trigger retry (if retries are configured).
|
||||
input:
|
||||
$ref: '#/components/schemas/TaskExecuteRequest'
|
||||
contentType: application/json
|
||||
output:
|
||||
$ref: '#/components/schemas/TaskExecuteResponse'
|
||||
contentType: application/json
|
||||
components:
|
||||
schemas:
|
||||
TaskExecuteRequest:
|
||||
description: TaskExecuteRequest is the request provided when a task is ready to execute.
|
||||
properties:
|
||||
queueName:
|
||||
type: string
|
||||
description: QueueName is the name of the queue this task belongs to.
|
||||
taskId:
|
||||
type: string
|
||||
description: TaskID is the unique identifier for this task.
|
||||
payload:
|
||||
type: array
|
||||
description: Payload is the opaque data provided when the task was enqueued.
|
||||
items:
|
||||
type: object
|
||||
attempt:
|
||||
type: integer
|
||||
format: int32
|
||||
description: 'Attempt is the current attempt number (1-based: first attempt = 1).'
|
||||
required:
|
||||
- queueName
|
||||
- taskId
|
||||
- payload
|
||||
- attempt
|
||||
TaskExecuteResponse:
|
||||
description: TaskExecuteResponse is the response from task execution.
|
||||
properties:
|
||||
error:
|
||||
type: string
|
||||
description: |-
|
||||
Error, if non-empty, indicates the task failed. The task will be retried
|
||||
if retries are configured and attempts remain.
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/navidrome/navidrome/plugins/pdk/go/host"
|
||||
"github.com/navidrome/navidrome/plugins/pdk/go/metadata"
|
||||
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
||||
)
|
||||
@@ -77,21 +78,28 @@ func sparqlQuery(endpoint, query string) (*SPARQLResult, error) {
|
||||
form := url.Values{}
|
||||
form.Set("query", query)
|
||||
|
||||
req := pdk.NewHTTPRequest(pdk.MethodPost, endpoint)
|
||||
req.SetHeader("Accept", "application/sparql-results+json")
|
||||
req.SetHeader("Content-Type", "application/x-www-form-urlencoded")
|
||||
req.SetHeader("User-Agent", "NavidromeWikimediaPlugin/1.0")
|
||||
req.SetBody([]byte(form.Encode()))
|
||||
|
||||
pdk.Log(pdk.LogDebug, fmt.Sprintf("SPARQL query to %s: %s", endpoint, query))
|
||||
|
||||
resp := req.Send()
|
||||
if resp.Status() != 200 {
|
||||
return nil, fmt.Errorf("SPARQL HTTP error: status %d", resp.Status())
|
||||
resp, err := host.HTTPSend(host.HTTPRequest{
|
||||
Method: "POST",
|
||||
URL: endpoint,
|
||||
Headers: map[string]string{
|
||||
"Accept": "application/sparql-results+json",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"User-Agent": "NavidromeWikimediaPlugin/1.0",
|
||||
},
|
||||
Body: []byte(form.Encode()),
|
||||
TimeoutMs: 10000,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("SPARQL HTTP error: %w", err)
|
||||
}
|
||||
if resp.StatusCode != 200 {
|
||||
return nil, fmt.Errorf("SPARQL HTTP error: status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
var result SPARQLResult
|
||||
if err := json.Unmarshal(resp.Body(), &result); err != nil {
|
||||
if err := json.Unmarshal(resp.Body, &result); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse SPARQL response: %w", err)
|
||||
}
|
||||
if len(result.Results.Bindings) == 0 {
|
||||
@@ -104,15 +112,22 @@ func sparqlQuery(endpoint, query string) (*SPARQLResult, error) {
|
||||
func mediawikiQuery(params url.Values) ([]byte, error) {
|
||||
apiURL := fmt.Sprintf("%s?%s", mediawikiAPIEndpoint, params.Encode())
|
||||
|
||||
req := pdk.NewHTTPRequest(pdk.MethodGet, apiURL)
|
||||
req.SetHeader("Accept", "application/json")
|
||||
req.SetHeader("User-Agent", "NavidromeWikimediaPlugin/1.0")
|
||||
|
||||
resp := req.Send()
|
||||
if resp.Status() != 200 {
|
||||
return nil, fmt.Errorf("MediaWiki HTTP error: status %d", resp.Status())
|
||||
resp, err := host.HTTPSend(host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: apiURL,
|
||||
Headers: map[string]string{
|
||||
"Accept": "application/json",
|
||||
"User-Agent": "NavidromeWikimediaPlugin/1.0",
|
||||
},
|
||||
TimeoutMs: 10000,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("MediaWiki HTTP error: %w", err)
|
||||
}
|
||||
return resp.Body(), nil
|
||||
if resp.StatusCode != 200 {
|
||||
return nil, fmt.Errorf("MediaWiki HTTP error: status %d", resp.StatusCode)
|
||||
}
|
||||
return resp.Body, nil
|
||||
}
|
||||
|
||||
// getWikidataWikipediaURL fetches the Wikipedia URL from Wikidata using MBID or name
|
||||
|
||||
40
plugins/host/http.go
Normal file
40
plugins/host/http.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package host
|
||||
|
||||
import "context"
|
||||
|
||||
// HTTPRequest represents an outbound HTTP request from a plugin.
|
||||
type HTTPRequest struct {
|
||||
Method string `json:"method"`
|
||||
URL string `json:"url"`
|
||||
Headers map[string]string `json:"headers,omitempty"`
|
||||
Body []byte `json:"body,omitempty"`
|
||||
TimeoutMs int32 `json:"timeoutMs,omitempty"`
|
||||
}
|
||||
|
||||
// HTTPResponse represents the response from an outbound HTTP request.
|
||||
type HTTPResponse struct {
|
||||
StatusCode int32 `json:"statusCode"`
|
||||
Headers map[string]string `json:"headers,omitempty"`
|
||||
Body []byte `json:"body,omitempty"`
|
||||
}
|
||||
|
||||
// HTTPService provides outbound HTTP request capabilities for plugins.
|
||||
//
|
||||
// This service allows plugins to make HTTP requests to external services.
|
||||
// Requests are validated against the plugin's declared requiredHosts patterns
|
||||
// from the http permission in the manifest. Redirects are followed but each
|
||||
// redirect destination is also validated against the allowed hosts.
|
||||
//
|
||||
//nd:hostservice name=HTTP permission=http
|
||||
type HTTPService interface {
|
||||
// Send executes an HTTP request and returns the response.
|
||||
//
|
||||
// Parameters:
|
||||
// - request: The HTTP request to execute, including method, URL, headers, body, and timeout
|
||||
//
|
||||
// Returns the HTTP response with status code, headers, and body.
|
||||
// Network errors, timeouts, and permission failures are returned as Go errors.
|
||||
// Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error.
|
||||
//nd:hostfunc
|
||||
Send(ctx context.Context, request HTTPRequest) (*HTTPResponse, error)
|
||||
}
|
||||
88
plugins/host/http_gen.go
Normal file
88
plugins/host/http_gen.go
Normal file
@@ -0,0 +1,88 @@
|
||||
// Code generated by ndpgen. DO NOT EDIT.
|
||||
|
||||
package host
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
extism "github.com/extism/go-sdk"
|
||||
)
|
||||
|
||||
// HTTPSendRequest is the request type for HTTP.Send.
|
||||
type HTTPSendRequest struct {
|
||||
Request HTTPRequest `json:"request"`
|
||||
}
|
||||
|
||||
// HTTPSendResponse is the response type for HTTP.Send.
|
||||
type HTTPSendResponse struct {
|
||||
Result *HTTPResponse `json:"result,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// RegisterHTTPHostFunctions registers HTTP service host functions.
|
||||
// The returned host functions should be added to the plugin's configuration.
|
||||
func RegisterHTTPHostFunctions(service HTTPService) []extism.HostFunction {
|
||||
return []extism.HostFunction{
|
||||
newHTTPSendHostFunction(service),
|
||||
}
|
||||
}
|
||||
|
||||
func newHTTPSendHostFunction(service HTTPService) extism.HostFunction {
|
||||
return extism.NewHostFunctionWithStack(
|
||||
"http_send",
|
||||
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||
// Read JSON request from plugin memory
|
||||
reqBytes, err := p.ReadBytes(stack[0])
|
||||
if err != nil {
|
||||
httpWriteError(p, stack, err)
|
||||
return
|
||||
}
|
||||
var req HTTPSendRequest
|
||||
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||
httpWriteError(p, stack, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Call the service method
|
||||
result, svcErr := service.Send(ctx, req.Request)
|
||||
if svcErr != nil {
|
||||
httpWriteError(p, stack, svcErr)
|
||||
return
|
||||
}
|
||||
|
||||
// Write JSON response to plugin memory
|
||||
resp := HTTPSendResponse{
|
||||
Result: result,
|
||||
}
|
||||
httpWriteResponse(p, stack, resp)
|
||||
},
|
||||
[]extism.ValueType{extism.ValueTypePTR},
|
||||
[]extism.ValueType{extism.ValueTypePTR},
|
||||
)
|
||||
}
|
||||
|
||||
// httpWriteResponse writes a JSON response to plugin memory.
|
||||
func httpWriteResponse(p *extism.CurrentPlugin, stack []uint64, resp any) {
|
||||
respBytes, err := json.Marshal(resp)
|
||||
if err != nil {
|
||||
httpWriteError(p, stack, err)
|
||||
return
|
||||
}
|
||||
respPtr, err := p.WriteBytes(respBytes)
|
||||
if err != nil {
|
||||
stack[0] = 0
|
||||
return
|
||||
}
|
||||
stack[0] = respPtr
|
||||
}
|
||||
|
||||
// httpWriteError writes an error response to plugin memory.
|
||||
func httpWriteError(p *extism.CurrentPlugin, stack []uint64, err error) {
|
||||
errResp := struct {
|
||||
Error string `json:"error"`
|
||||
}{Error: err.Error()}
|
||||
respBytes, _ := json.Marshal(errResp)
|
||||
respPtr, _ := p.WriteBytes(respBytes)
|
||||
stack[0] = respPtr
|
||||
}
|
||||
57
plugins/host/taskqueue.go
Normal file
57
plugins/host/taskqueue.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package host
|
||||
|
||||
import "context"
|
||||
|
||||
// QueueConfig holds configuration for a task queue.
|
||||
type QueueConfig struct {
|
||||
// Concurrency is the max number of parallel workers. Default: 1.
|
||||
// Capped by the plugin's manifest maxConcurrency.
|
||||
Concurrency int32 `json:"concurrency"`
|
||||
|
||||
// MaxRetries is the number of times to retry a failed task. Default: 0.
|
||||
MaxRetries int32 `json:"maxRetries"`
|
||||
|
||||
// BackoffMs is the initial backoff between retries in milliseconds.
|
||||
// Doubles each retry (exponential: backoffMs * 2^(attempt-1)). Default: 1000.
|
||||
BackoffMs int64 `json:"backoffMs"`
|
||||
|
||||
// DelayMs is the minimum delay between starting consecutive tasks
|
||||
// in milliseconds. Useful for rate limiting. Default: 0.
|
||||
DelayMs int64 `json:"delayMs"`
|
||||
|
||||
// RetentionMs is how long completed/failed/cancelled tasks are kept
|
||||
// in milliseconds. Default: 3600000 (1h). Min: 60000 (1m). Max: 604800000 (1w).
|
||||
RetentionMs int64 `json:"retentionMs"`
|
||||
}
|
||||
|
||||
// TaskQueueService provides persistent task queues for plugins.
|
||||
//
|
||||
// This service allows plugins to create named queues with configurable concurrency,
|
||||
// retry policies, and rate limiting. Tasks are persisted to SQLite and survive
|
||||
// server restarts. When a task is ready to execute, the host calls the plugin's
|
||||
// nd_task_execute callback function.
|
||||
//
|
||||
//nd:hostservice name=TaskQueue permission=taskqueue
|
||||
type TaskQueueService interface {
|
||||
// CreateQueue creates a named task queue with the given configuration.
|
||||
// Zero-value fields in config use sensible defaults.
|
||||
// If a queue with the same name already exists, returns an error.
|
||||
// On startup, this also recovers any stale "running" tasks from a previous crash.
|
||||
//nd:hostfunc
|
||||
CreateQueue(ctx context.Context, name string, config QueueConfig) error
|
||||
|
||||
// Enqueue adds a task to the named queue. Returns the task ID.
|
||||
// payload is opaque bytes passed back to the plugin on execution.
|
||||
//nd:hostfunc
|
||||
Enqueue(ctx context.Context, queueName string, payload []byte) (string, error)
|
||||
|
||||
// GetTaskStatus returns the status of a task: "pending", "running",
|
||||
// "completed", "failed", or "cancelled".
|
||||
//nd:hostfunc
|
||||
GetTaskStatus(ctx context.Context, taskID string) (string, error)
|
||||
|
||||
// CancelTask cancels a pending task. Returns error if already
|
||||
// running, completed, or failed.
|
||||
//nd:hostfunc
|
||||
CancelTask(ctx context.Context, taskID string) error
|
||||
}
|
||||
220
plugins/host/taskqueue_gen.go
Normal file
220
plugins/host/taskqueue_gen.go
Normal file
@@ -0,0 +1,220 @@
|
||||
// Code generated by ndpgen. DO NOT EDIT.
|
||||
|
||||
package host
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
extism "github.com/extism/go-sdk"
|
||||
)
|
||||
|
||||
// TaskQueueCreateQueueRequest is the request type for TaskQueue.CreateQueue.
|
||||
type TaskQueueCreateQueueRequest struct {
|
||||
Name string `json:"name"`
|
||||
Config QueueConfig `json:"config"`
|
||||
}
|
||||
|
||||
// TaskQueueCreateQueueResponse is the response type for TaskQueue.CreateQueue.
|
||||
type TaskQueueCreateQueueResponse struct {
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// TaskQueueEnqueueRequest is the request type for TaskQueue.Enqueue.
|
||||
type TaskQueueEnqueueRequest struct {
|
||||
QueueName string `json:"queueName"`
|
||||
Payload []byte `json:"payload"`
|
||||
}
|
||||
|
||||
// TaskQueueEnqueueResponse is the response type for TaskQueue.Enqueue.
|
||||
type TaskQueueEnqueueResponse struct {
|
||||
Result string `json:"result,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// TaskQueueGetTaskStatusRequest is the request type for TaskQueue.GetTaskStatus.
|
||||
type TaskQueueGetTaskStatusRequest struct {
|
||||
TaskID string `json:"taskId"`
|
||||
}
|
||||
|
||||
// TaskQueueGetTaskStatusResponse is the response type for TaskQueue.GetTaskStatus.
|
||||
type TaskQueueGetTaskStatusResponse struct {
|
||||
Result string `json:"result,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// TaskQueueCancelTaskRequest is the request type for TaskQueue.CancelTask.
|
||||
type TaskQueueCancelTaskRequest struct {
|
||||
TaskID string `json:"taskId"`
|
||||
}
|
||||
|
||||
// TaskQueueCancelTaskResponse is the response type for TaskQueue.CancelTask.
|
||||
type TaskQueueCancelTaskResponse struct {
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// RegisterTaskQueueHostFunctions registers TaskQueue service host functions.
|
||||
// The returned host functions should be added to the plugin's configuration.
|
||||
func RegisterTaskQueueHostFunctions(service TaskQueueService) []extism.HostFunction {
|
||||
return []extism.HostFunction{
|
||||
newTaskQueueCreateQueueHostFunction(service),
|
||||
newTaskQueueEnqueueHostFunction(service),
|
||||
newTaskQueueGetTaskStatusHostFunction(service),
|
||||
newTaskQueueCancelTaskHostFunction(service),
|
||||
}
|
||||
}
|
||||
|
||||
func newTaskQueueCreateQueueHostFunction(service TaskQueueService) extism.HostFunction {
|
||||
return extism.NewHostFunctionWithStack(
|
||||
"taskqueue_createqueue",
|
||||
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||
// Read JSON request from plugin memory
|
||||
reqBytes, err := p.ReadBytes(stack[0])
|
||||
if err != nil {
|
||||
taskqueueWriteError(p, stack, err)
|
||||
return
|
||||
}
|
||||
var req TaskQueueCreateQueueRequest
|
||||
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||
taskqueueWriteError(p, stack, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Call the service method
|
||||
if svcErr := service.CreateQueue(ctx, req.Name, req.Config); svcErr != nil {
|
||||
taskqueueWriteError(p, stack, svcErr)
|
||||
return
|
||||
}
|
||||
|
||||
// Write JSON response to plugin memory
|
||||
resp := TaskQueueCreateQueueResponse{}
|
||||
taskqueueWriteResponse(p, stack, resp)
|
||||
},
|
||||
[]extism.ValueType{extism.ValueTypePTR},
|
||||
[]extism.ValueType{extism.ValueTypePTR},
|
||||
)
|
||||
}
|
||||
|
||||
func newTaskQueueEnqueueHostFunction(service TaskQueueService) extism.HostFunction {
|
||||
return extism.NewHostFunctionWithStack(
|
||||
"taskqueue_enqueue",
|
||||
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||
// Read JSON request from plugin memory
|
||||
reqBytes, err := p.ReadBytes(stack[0])
|
||||
if err != nil {
|
||||
taskqueueWriteError(p, stack, err)
|
||||
return
|
||||
}
|
||||
var req TaskQueueEnqueueRequest
|
||||
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||
taskqueueWriteError(p, stack, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Call the service method
|
||||
result, svcErr := service.Enqueue(ctx, req.QueueName, req.Payload)
|
||||
if svcErr != nil {
|
||||
taskqueueWriteError(p, stack, svcErr)
|
||||
return
|
||||
}
|
||||
|
||||
// Write JSON response to plugin memory
|
||||
resp := TaskQueueEnqueueResponse{
|
||||
Result: result,
|
||||
}
|
||||
taskqueueWriteResponse(p, stack, resp)
|
||||
},
|
||||
[]extism.ValueType{extism.ValueTypePTR},
|
||||
[]extism.ValueType{extism.ValueTypePTR},
|
||||
)
|
||||
}
|
||||
|
||||
func newTaskQueueGetTaskStatusHostFunction(service TaskQueueService) extism.HostFunction {
|
||||
return extism.NewHostFunctionWithStack(
|
||||
"taskqueue_gettaskstatus",
|
||||
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||
// Read JSON request from plugin memory
|
||||
reqBytes, err := p.ReadBytes(stack[0])
|
||||
if err != nil {
|
||||
taskqueueWriteError(p, stack, err)
|
||||
return
|
||||
}
|
||||
var req TaskQueueGetTaskStatusRequest
|
||||
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||
taskqueueWriteError(p, stack, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Call the service method
|
||||
result, svcErr := service.GetTaskStatus(ctx, req.TaskID)
|
||||
if svcErr != nil {
|
||||
taskqueueWriteError(p, stack, svcErr)
|
||||
return
|
||||
}
|
||||
|
||||
// Write JSON response to plugin memory
|
||||
resp := TaskQueueGetTaskStatusResponse{
|
||||
Result: result,
|
||||
}
|
||||
taskqueueWriteResponse(p, stack, resp)
|
||||
},
|
||||
[]extism.ValueType{extism.ValueTypePTR},
|
||||
[]extism.ValueType{extism.ValueTypePTR},
|
||||
)
|
||||
}
|
||||
|
||||
func newTaskQueueCancelTaskHostFunction(service TaskQueueService) extism.HostFunction {
|
||||
return extism.NewHostFunctionWithStack(
|
||||
"taskqueue_canceltask",
|
||||
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
||||
// Read JSON request from plugin memory
|
||||
reqBytes, err := p.ReadBytes(stack[0])
|
||||
if err != nil {
|
||||
taskqueueWriteError(p, stack, err)
|
||||
return
|
||||
}
|
||||
var req TaskQueueCancelTaskRequest
|
||||
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
||||
taskqueueWriteError(p, stack, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Call the service method
|
||||
if svcErr := service.CancelTask(ctx, req.TaskID); svcErr != nil {
|
||||
taskqueueWriteError(p, stack, svcErr)
|
||||
return
|
||||
}
|
||||
|
||||
// Write JSON response to plugin memory
|
||||
resp := TaskQueueCancelTaskResponse{}
|
||||
taskqueueWriteResponse(p, stack, resp)
|
||||
},
|
||||
[]extism.ValueType{extism.ValueTypePTR},
|
||||
[]extism.ValueType{extism.ValueTypePTR},
|
||||
)
|
||||
}
|
||||
|
||||
// taskqueueWriteResponse writes a JSON response to plugin memory.
|
||||
func taskqueueWriteResponse(p *extism.CurrentPlugin, stack []uint64, resp any) {
|
||||
respBytes, err := json.Marshal(resp)
|
||||
if err != nil {
|
||||
taskqueueWriteError(p, stack, err)
|
||||
return
|
||||
}
|
||||
respPtr, err := p.WriteBytes(respBytes)
|
||||
if err != nil {
|
||||
stack[0] = 0
|
||||
return
|
||||
}
|
||||
stack[0] = respPtr
|
||||
}
|
||||
|
||||
// taskqueueWriteError writes an error response to plugin memory.
|
||||
func taskqueueWriteError(p *extism.CurrentPlugin, stack []uint64, err error) {
|
||||
errResp := struct {
|
||||
Error string `json:"error"`
|
||||
}{Error: err.Error()}
|
||||
respBytes, _ := json.Marshal(errResp)
|
||||
respPtr, _ := p.WriteBytes(respBytes)
|
||||
stack[0] = respPtr
|
||||
}
|
||||
190
plugins/host_httpclient.go
Normal file
190
plugins/host_httpclient.go
Normal file
@@ -0,0 +1,190 @@
|
||||
package plugins
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"cmp"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"github.com/navidrome/navidrome/plugins/host"
|
||||
)
|
||||
|
||||
const (
|
||||
httpClientDefaultTimeout = 10 * time.Second
|
||||
httpClientMaxRedirects = 5
|
||||
httpClientMaxResponseBodyLen = 10 * 1024 * 1024 // 10 MB
|
||||
)
|
||||
|
||||
// httpServiceImpl implements host.HTTPService.
|
||||
type httpServiceImpl struct {
|
||||
pluginName string
|
||||
requiredHosts []string
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
// newHTTPService creates a new HTTPService for a plugin.
|
||||
func newHTTPService(pluginName string, permission *HTTPPermission) *httpServiceImpl {
|
||||
var requiredHosts []string
|
||||
if permission != nil {
|
||||
requiredHosts = permission.RequiredHosts
|
||||
}
|
||||
svc := &httpServiceImpl{
|
||||
pluginName: pluginName,
|
||||
requiredHosts: requiredHosts,
|
||||
}
|
||||
svc.client = &http.Client{
|
||||
Transport: http.DefaultTransport,
|
||||
// Timeout is set per-request via context deadline, not here.
|
||||
// CheckRedirect validates hosts and enforces redirect limits.
|
||||
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
||||
if len(via) >= httpClientMaxRedirects {
|
||||
log.Warn(req.Context(), "HTTP redirect limit exceeded", "plugin", svc.pluginName, "url", req.URL.String(), "redirectCount", len(via))
|
||||
return http.ErrUseLastResponse
|
||||
}
|
||||
if err := svc.validateHost(req.Context(), req.URL.Host); err != nil {
|
||||
log.Warn(req.Context(), "HTTP redirect blocked", "plugin", svc.pluginName, "url", req.URL.String(), "err", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
return svc
|
||||
}
|
||||
|
||||
func (s *httpServiceImpl) Send(ctx context.Context, request host.HTTPRequest) (*host.HTTPResponse, error) {
|
||||
// Parse and validate URL
|
||||
parsedURL, err := url.Parse(request.URL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid URL: %w", err)
|
||||
}
|
||||
|
||||
// Validate URL scheme
|
||||
if parsedURL.Scheme != "http" && parsedURL.Scheme != "https" {
|
||||
return nil, fmt.Errorf("invalid URL scheme %q: must be http or https", parsedURL.Scheme)
|
||||
}
|
||||
|
||||
// Validate host against allowed hosts and private IP restrictions
|
||||
if err := s.validateHost(ctx, parsedURL.Host); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Apply per-request timeout via context deadline
|
||||
timeout := cmp.Or(time.Duration(request.TimeoutMs)*time.Millisecond, httpClientDefaultTimeout)
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
// Build request body
|
||||
method := strings.ToUpper(request.Method)
|
||||
var body io.Reader
|
||||
if len(request.Body) > 0 {
|
||||
body = bytes.NewReader(request.Body)
|
||||
}
|
||||
|
||||
// Create HTTP request
|
||||
httpReq, err := http.NewRequestWithContext(ctx, method, request.URL, body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating request: %w", err)
|
||||
}
|
||||
for k, v := range request.Headers {
|
||||
httpReq.Header.Set(k, v)
|
||||
}
|
||||
|
||||
// Execute request
|
||||
resp, err := s.client.Do(httpReq) //nolint:gosec // URL is validated against requiredHosts
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
log.Trace(ctx, "HTTP request", "plugin", s.pluginName, "method", method, "url", request.URL, "status", resp.StatusCode)
|
||||
|
||||
// Read response body (with size limit to prevent memory exhaustion)
|
||||
respBody, err := io.ReadAll(io.LimitReader(resp.Body, httpClientMaxResponseBodyLen))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("reading response body: %w", err)
|
||||
}
|
||||
|
||||
// Flatten response headers (first value only)
|
||||
headers := make(map[string]string, len(resp.Header))
|
||||
for k, v := range resp.Header {
|
||||
if len(v) > 0 {
|
||||
headers[k] = v[0]
|
||||
}
|
||||
}
|
||||
|
||||
return &host.HTTPResponse{
|
||||
StatusCode: int32(resp.StatusCode),
|
||||
Headers: headers,
|
||||
Body: respBody,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// validateHost checks whether a request to the given host is permitted.
|
||||
// When requiredHosts is set, it checks against the allowlist.
|
||||
// When requiredHosts is empty, it blocks private/loopback IPs to prevent SSRF.
|
||||
func (s *httpServiceImpl) validateHost(ctx context.Context, hostStr string) error {
|
||||
hostname := extractHostname(hostStr)
|
||||
|
||||
if len(s.requiredHosts) > 0 {
|
||||
if !s.isHostAllowed(hostname) {
|
||||
return fmt.Errorf("host %q is not allowed", hostStr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// No explicit allowlist: block private/loopback IPs
|
||||
if isPrivateOrLoopback(hostname) {
|
||||
log.Warn(ctx, "HTTP request to private/loopback address blocked", "plugin", s.pluginName, "host", hostStr)
|
||||
return fmt.Errorf("host %q is not allowed: private/loopback addresses require explicit requiredHosts in manifest", hostStr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *httpServiceImpl) isHostAllowed(hostname string) bool {
|
||||
for _, pattern := range s.requiredHosts {
|
||||
if matchHostPattern(pattern, hostname) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// extractHostname returns the hostname portion of a host string, stripping
|
||||
// any port number and IPv6 brackets. It handles IPv6 addresses correctly
|
||||
// (e.g. "[::1]:8080" → "::1", "[::1]" → "::1").
|
||||
func extractHostname(hostStr string) string {
|
||||
if h, _, err := net.SplitHostPort(hostStr); err == nil {
|
||||
return h
|
||||
}
|
||||
// Strip IPv6 brackets when no port is present (e.g. "[::1]" → "::1")
|
||||
if strings.HasPrefix(hostStr, "[") && strings.HasSuffix(hostStr, "]") {
|
||||
return hostStr[1 : len(hostStr)-1]
|
||||
}
|
||||
return hostStr
|
||||
}
|
||||
|
||||
// isPrivateOrLoopback returns true if the given hostname resolves to or is
|
||||
// a private, loopback, or link-local IP address. This includes:
|
||||
// IPv4: 127.0.0.0/8, 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16, 169.254.0.0/16
|
||||
// IPv6: ::1, fc00::/7, fe80::/10
|
||||
// It also blocks "localhost" by name.
|
||||
func isPrivateOrLoopback(hostname string) bool {
|
||||
if strings.EqualFold(hostname, "localhost") {
|
||||
return true
|
||||
}
|
||||
ip := net.ParseIP(hostname)
|
||||
if ip == nil {
|
||||
return false
|
||||
}
|
||||
return ip.IsLoopback() || ip.IsPrivate() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast()
|
||||
}
|
||||
|
||||
// Verify interface implementation
|
||||
var _ host.HTTPService = (*httpServiceImpl)(nil)
|
||||
565
plugins/host_httpclient_test.go
Normal file
565
plugins/host_httpclient_test.go
Normal file
@@ -0,0 +1,565 @@
|
||||
//go:build !windows
|
||||
|
||||
package plugins
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/navidrome/navidrome/plugins/host"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("httpServiceImpl", func() {
|
||||
var (
|
||||
svc *httpServiceImpl
|
||||
ts *httptest.Server
|
||||
)
|
||||
|
||||
AfterEach(func() {
|
||||
if ts != nil {
|
||||
ts.Close()
|
||||
}
|
||||
})
|
||||
|
||||
Context("without host restrictions (default SSRF protection)", func() {
|
||||
BeforeEach(func() {
|
||||
svc = newHTTPService("test-plugin", nil)
|
||||
})
|
||||
|
||||
It("should block requests to loopback IPs", func() {
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
}))
|
||||
_, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: ts.URL,
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("private/loopback"))
|
||||
})
|
||||
|
||||
It("should block requests to localhost by name", func() {
|
||||
_, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: "http://localhost:12345/test",
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("private/loopback"))
|
||||
})
|
||||
|
||||
It("should block requests to private IPs (10.x)", func() {
|
||||
_, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: "http://10.0.0.1/test",
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("private/loopback"))
|
||||
})
|
||||
|
||||
It("should block requests to private IPs (192.168.x)", func() {
|
||||
_, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: "http://192.168.1.1/test",
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("private/loopback"))
|
||||
})
|
||||
|
||||
It("should block requests to private IPs (172.16.x)", func() {
|
||||
_, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: "http://172.16.0.1/test",
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("private/loopback"))
|
||||
})
|
||||
|
||||
It("should block requests to link-local IPs (169.254.x)", func() {
|
||||
_, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: "http://169.254.169.254/latest/meta-data/",
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("private/loopback"))
|
||||
})
|
||||
|
||||
It("should block requests to IPv6 loopback with port", func() {
|
||||
_, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: "http://[::1]:8080/test",
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("private/loopback"))
|
||||
})
|
||||
|
||||
It("should block requests to IPv6 loopback without port", func() {
|
||||
_, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: "http://[::1]/test",
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("private/loopback"))
|
||||
})
|
||||
|
||||
It("should allow requests to public hostnames", func() {
|
||||
// This will fail at the network level (connection refused or DNS),
|
||||
// but it should NOT fail with a "private/loopback" error
|
||||
_, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: "http://203.0.113.1:1/test", // TEST-NET-3, non-routable but not private
|
||||
TimeoutMs: 100,
|
||||
})
|
||||
// Should get a network error, not a permission error
|
||||
if err != nil {
|
||||
Expect(err.Error()).ToNot(ContainSubstring("private/loopback"))
|
||||
}
|
||||
})
|
||||
|
||||
It("should return error for invalid URL", func() {
|
||||
_, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: "://bad-url",
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should reject non-http/https URL schemes", func() {
|
||||
_, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: "ftp://example.com/file",
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("must be http or https"))
|
||||
})
|
||||
})
|
||||
|
||||
Context("with explicit requiredHosts allowing loopback", func() {
|
||||
BeforeEach(func() {
|
||||
svc = newHTTPService("test-plugin", &HTTPPermission{
|
||||
RequiredHosts: []string{"127.0.0.1"},
|
||||
})
|
||||
})
|
||||
|
||||
It("should handle GET requests", func() {
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
Expect(r.Method).To(Equal("GET"))
|
||||
w.Header().Set("X-Test", "ok")
|
||||
w.WriteHeader(201)
|
||||
_, _ = w.Write([]byte("hello"))
|
||||
}))
|
||||
resp, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: ts.URL,
|
||||
Headers: map[string]string{"Accept": "text/plain"},
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(resp.StatusCode).To(Equal(int32(201)))
|
||||
Expect(string(resp.Body)).To(Equal("hello"))
|
||||
Expect(resp.Headers["X-Test"]).To(Equal("ok"))
|
||||
})
|
||||
|
||||
It("should handle POST requests with body", func() {
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
Expect(r.Method).To(Equal("POST"))
|
||||
b, _ := io.ReadAll(r.Body)
|
||||
_, _ = w.Write([]byte("got:" + string(b)))
|
||||
}))
|
||||
resp, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "POST",
|
||||
URL: ts.URL,
|
||||
Body: []byte("abc"),
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(string(resp.Body)).To(Equal("got:abc"))
|
||||
})
|
||||
|
||||
It("should handle PUT requests with body", func() {
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
Expect(r.Method).To(Equal("PUT"))
|
||||
b, _ := io.ReadAll(r.Body)
|
||||
_, _ = w.Write([]byte("put:" + string(b)))
|
||||
}))
|
||||
resp, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "PUT",
|
||||
URL: ts.URL,
|
||||
Body: []byte("xyz"),
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(string(resp.Body)).To(Equal("put:xyz"))
|
||||
})
|
||||
|
||||
It("should handle DELETE requests", func() {
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
Expect(r.Method).To(Equal("DELETE"))
|
||||
w.WriteHeader(204)
|
||||
}))
|
||||
resp, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "DELETE",
|
||||
URL: ts.URL,
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(resp.StatusCode).To(Equal(int32(204)))
|
||||
})
|
||||
|
||||
It("should handle DELETE requests with body", func() {
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
Expect(r.Method).To(Equal("DELETE"))
|
||||
b, _ := io.ReadAll(r.Body)
|
||||
_, _ = w.Write([]byte("del:" + string(b)))
|
||||
}))
|
||||
resp, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "DELETE",
|
||||
URL: ts.URL,
|
||||
Body: []byte(`{"id":"123"}`),
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(string(resp.Body)).To(Equal(`del:{"id":"123"}`))
|
||||
})
|
||||
|
||||
It("should handle PATCH requests with body", func() {
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
Expect(r.Method).To(Equal("PATCH"))
|
||||
b, _ := io.ReadAll(r.Body)
|
||||
_, _ = w.Write([]byte("patch:" + string(b)))
|
||||
}))
|
||||
resp, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "PATCH",
|
||||
URL: ts.URL,
|
||||
Body: []byte("data"),
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(string(resp.Body)).To(Equal("patch:data"))
|
||||
})
|
||||
|
||||
It("should handle HEAD requests", func() {
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
Expect(r.Method).To(Equal("HEAD"))
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(200)
|
||||
}))
|
||||
resp, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "HEAD",
|
||||
URL: ts.URL,
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(resp.StatusCode).To(Equal(int32(200)))
|
||||
Expect(resp.Headers["Content-Type"]).To(Equal("application/json"))
|
||||
Expect(resp.Body).To(BeEmpty())
|
||||
})
|
||||
|
||||
It("should use default timeout when TimeoutMs is 0", func() {
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
}))
|
||||
resp, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: ts.URL,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(resp.StatusCode).To(Equal(int32(200)))
|
||||
})
|
||||
|
||||
It("should return error on timeout", func() {
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}))
|
||||
_, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: ts.URL,
|
||||
TimeoutMs: 1,
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("deadline exceeded"))
|
||||
})
|
||||
|
||||
It("should return error on context cancellation", func() {
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}))
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
cancel()
|
||||
}()
|
||||
_, err := svc.Send(ctx, host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: ts.URL,
|
||||
TimeoutMs: 5000,
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("context canceled"))
|
||||
})
|
||||
|
||||
It("should send request headers", func() {
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte(r.Header.Get("X-Custom")))
|
||||
}))
|
||||
resp, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: ts.URL,
|
||||
Headers: map[string]string{"X-Custom": "myvalue"},
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(string(resp.Body)).To(Equal("myvalue"))
|
||||
})
|
||||
})
|
||||
|
||||
Context("with host restrictions", func() {
|
||||
BeforeEach(func() {
|
||||
svc = newHTTPService("test-plugin", &HTTPPermission{
|
||||
RequiredHosts: []string{"allowed.example.com", "*.allowed.org"},
|
||||
})
|
||||
})
|
||||
|
||||
It("should block requests to non-allowed hosts", func() {
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
}))
|
||||
// httptest server is on 127.0.0.1 which is not in requiredHosts
|
||||
_, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: ts.URL,
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("not allowed"))
|
||||
})
|
||||
|
||||
It("should follow redirects to allowed hosts", func() {
|
||||
// Create a destination server
|
||||
dest := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte("final"))
|
||||
}))
|
||||
defer dest.Close()
|
||||
// Create a redirect server
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
http.Redirect(w, r, dest.URL, http.StatusFound)
|
||||
}))
|
||||
// Allow both servers (both on 127.0.0.1)
|
||||
svc.requiredHosts = []string{"127.0.0.1"}
|
||||
resp, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: ts.URL,
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(resp.StatusCode).To(Equal(int32(200)))
|
||||
Expect(string(resp.Body)).To(Equal("final"))
|
||||
})
|
||||
|
||||
It("should block redirects to non-allowed hosts", func() {
|
||||
// Server that redirects to a disallowed host
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
http.Redirect(w, r, "http://evil.example.com/steal", http.StatusFound)
|
||||
}))
|
||||
// Override requiredHosts to allow the test server
|
||||
svc.requiredHosts = []string{"127.0.0.1"}
|
||||
_, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: ts.URL,
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("not allowed"))
|
||||
})
|
||||
|
||||
It("should block redirects to private IPs when allowlist is set", func() {
|
||||
// Server that redirects to a private IP
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
http.Redirect(w, r, "http://10.0.0.1/internal", http.StatusFound)
|
||||
}))
|
||||
// Allow the test server; redirect to 10.0.0.1 is blocked by allowlist
|
||||
svc.requiredHosts = []string{"127.0.0.1"}
|
||||
resp, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: ts.URL,
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(resp).To(BeNil())
|
||||
})
|
||||
|
||||
It("should allow wildcard host patterns", func() {
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte("wildcard"))
|
||||
}))
|
||||
// *.allowed.org is in the requiredHosts from BeforeEach, but test server is 127.0.0.1
|
||||
// Override with a wildcard that matches the test server
|
||||
svc.requiredHosts = []string{"*.0.0.1"}
|
||||
resp, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: ts.URL,
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(string(resp.Body)).To(Equal("wildcard"))
|
||||
})
|
||||
|
||||
It("should reject hosts not matching wildcard patterns", func() {
|
||||
svc.requiredHosts = []string{"*.example.com"}
|
||||
_, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: "http://evil.other.com/test",
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("not allowed"))
|
||||
})
|
||||
})
|
||||
|
||||
Context("response body size limit", func() {
|
||||
BeforeEach(func() {
|
||||
svc = newHTTPService("test-plugin", &HTTPPermission{
|
||||
RequiredHosts: []string{"127.0.0.1"},
|
||||
})
|
||||
})
|
||||
|
||||
It("should truncate response body at the size limit", func() {
|
||||
// Serve a body larger than the limit
|
||||
oversizedBody := strings.Repeat("x", httpClientMaxResponseBodyLen+1024)
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte(oversizedBody))
|
||||
}))
|
||||
resp, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "GET",
|
||||
URL: ts.URL,
|
||||
TimeoutMs: 5000,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(len(resp.Body)).To(Equal(httpClientMaxResponseBodyLen))
|
||||
})
|
||||
})
|
||||
|
||||
Context("edge cases", func() {
|
||||
BeforeEach(func() {
|
||||
svc = newHTTPService("test-plugin", &HTTPPermission{
|
||||
RequiredHosts: []string{"127.0.0.1"},
|
||||
})
|
||||
})
|
||||
|
||||
It("should default empty method to GET", func() {
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte("method:" + r.Method))
|
||||
}))
|
||||
// Empty method — Go's http.NewRequestWithContext normalizes "" to "GET"
|
||||
resp, err := svc.Send(context.Background(), host.HTTPRequest{
|
||||
Method: "",
|
||||
URL: ts.URL,
|
||||
TimeoutMs: 1000,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(string(resp.Body)).To(Equal("method:GET"))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("extractHostname", func() {
|
||||
It("should extract hostname from host:port", func() {
|
||||
Expect(extractHostname("example.com:8080")).To(Equal("example.com"))
|
||||
})
|
||||
|
||||
It("should return hostname when no port", func() {
|
||||
Expect(extractHostname("example.com")).To(Equal("example.com"))
|
||||
})
|
||||
|
||||
It("should handle IPv6 with port", func() {
|
||||
Expect(extractHostname("[::1]:8080")).To(Equal("::1"))
|
||||
})
|
||||
|
||||
It("should handle IPv6 without port", func() {
|
||||
Expect(extractHostname("::1")).To(Equal("::1"))
|
||||
})
|
||||
|
||||
It("should strip brackets from IPv6 without port", func() {
|
||||
Expect(extractHostname("[::1]")).To(Equal("::1"))
|
||||
})
|
||||
|
||||
It("should handle IPv4 with port", func() {
|
||||
Expect(extractHostname("127.0.0.1:9090")).To(Equal("127.0.0.1"))
|
||||
})
|
||||
|
||||
It("should handle IPv4 without port", func() {
|
||||
Expect(extractHostname("127.0.0.1")).To(Equal("127.0.0.1"))
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("isPrivateOrLoopback", func() {
|
||||
It("should detect IPv4 loopback", func() {
|
||||
Expect(isPrivateOrLoopback("127.0.0.1")).To(BeTrue())
|
||||
Expect(isPrivateOrLoopback("127.0.0.2")).To(BeTrue())
|
||||
})
|
||||
|
||||
It("should detect IPv6 loopback", func() {
|
||||
Expect(isPrivateOrLoopback("::1")).To(BeTrue())
|
||||
})
|
||||
|
||||
It("should detect localhost by name", func() {
|
||||
Expect(isPrivateOrLoopback("localhost")).To(BeTrue())
|
||||
Expect(isPrivateOrLoopback("LOCALHOST")).To(BeTrue())
|
||||
})
|
||||
|
||||
It("should detect 10.x.x.x private range", func() {
|
||||
Expect(isPrivateOrLoopback("10.0.0.1")).To(BeTrue())
|
||||
Expect(isPrivateOrLoopback("10.255.255.255")).To(BeTrue())
|
||||
})
|
||||
|
||||
It("should detect 172.16.x.x private range", func() {
|
||||
Expect(isPrivateOrLoopback("172.16.0.1")).To(BeTrue())
|
||||
Expect(isPrivateOrLoopback("172.31.255.255")).To(BeTrue())
|
||||
})
|
||||
|
||||
It("should detect 192.168.x.x private range", func() {
|
||||
Expect(isPrivateOrLoopback("192.168.0.1")).To(BeTrue())
|
||||
Expect(isPrivateOrLoopback("192.168.255.255")).To(BeTrue())
|
||||
})
|
||||
|
||||
It("should detect link-local addresses", func() {
|
||||
Expect(isPrivateOrLoopback("169.254.169.254")).To(BeTrue())
|
||||
Expect(isPrivateOrLoopback("169.254.0.1")).To(BeTrue())
|
||||
})
|
||||
|
||||
It("should detect IPv6 private (fc00::/7)", func() {
|
||||
Expect(isPrivateOrLoopback("fd00::1")).To(BeTrue())
|
||||
})
|
||||
|
||||
It("should detect IPv6 link-local (fe80::/10)", func() {
|
||||
Expect(isPrivateOrLoopback("fe80::1")).To(BeTrue())
|
||||
})
|
||||
|
||||
It("should allow public IPs", func() {
|
||||
Expect(isPrivateOrLoopback("8.8.8.8")).To(BeFalse())
|
||||
Expect(isPrivateOrLoopback("203.0.113.1")).To(BeFalse())
|
||||
Expect(isPrivateOrLoopback("2001:db8::1")).To(BeFalse())
|
||||
})
|
||||
|
||||
It("should allow non-IP hostnames (DNS names)", func() {
|
||||
Expect(isPrivateOrLoopback("example.com")).To(BeFalse())
|
||||
Expect(isPrivateOrLoopback("api.example.com")).To(BeFalse())
|
||||
})
|
||||
|
||||
It("should not treat 172.32.x.x as private", func() {
|
||||
Expect(isPrivateOrLoopback("172.32.0.1")).To(BeFalse())
|
||||
})
|
||||
})
|
||||
@@ -188,12 +188,6 @@ func (s *schedulerServiceImpl) invokeCallback(ctx context.Context, scheduleID st
|
||||
return
|
||||
}
|
||||
|
||||
// Check if plugin has the scheduler capability
|
||||
if !hasCapability(instance.capabilities, CapabilityScheduler) {
|
||||
log.Warn(ctx, "Plugin does not have scheduler capability", "plugin", s.pluginName, "scheduleID", scheduleID)
|
||||
return
|
||||
}
|
||||
|
||||
// Prepare callback input
|
||||
input := capabilities.SchedulerCallbackRequest{
|
||||
ScheduleID: scheduleID,
|
||||
|
||||
562
plugins/host_taskqueue.go
Normal file
562
plugins/host_taskqueue.go
Normal file
@@ -0,0 +1,562 @@
|
||||
package plugins
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"github.com/navidrome/navidrome/conf"
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"github.com/navidrome/navidrome/model/id"
|
||||
"github.com/navidrome/navidrome/plugins/capabilities"
|
||||
"github.com/navidrome/navidrome/plugins/host"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultConcurrency int32 = 1
|
||||
defaultBackoffMs int64 = 1000
|
||||
defaultRetentionMs int64 = 3_600_000 // 1 hour
|
||||
minRetentionMs int64 = 60_000 // 1 minute
|
||||
maxRetentionMs int64 = 604_800_000 // 1 week
|
||||
maxQueueNameLength = 128
|
||||
maxPayloadSize = 1 * 1024 * 1024 // 1MB
|
||||
maxBackoffMs int64 = 3_600_000 // 1 hour
|
||||
cleanupInterval = 5 * time.Minute
|
||||
pollInterval = 5 * time.Second
|
||||
shutdownTimeout = 10 * time.Second
|
||||
|
||||
taskStatusPending = "pending"
|
||||
taskStatusRunning = "running"
|
||||
taskStatusCompleted = "completed"
|
||||
taskStatusFailed = "failed"
|
||||
taskStatusCancelled = "cancelled"
|
||||
)
|
||||
|
||||
// CapabilityTaskWorker indicates the plugin can receive task execution callbacks.
|
||||
const CapabilityTaskWorker Capability = "TaskWorker"
|
||||
|
||||
const FuncTaskWorkerCallback = "nd_task_execute"
|
||||
|
||||
func init() {
|
||||
registerCapability(CapabilityTaskWorker, FuncTaskWorkerCallback)
|
||||
}
|
||||
|
||||
type queueState struct {
|
||||
config host.QueueConfig
|
||||
signal chan struct{}
|
||||
limiter *rate.Limiter
|
||||
}
|
||||
|
||||
// notifyWorkers sends a non-blocking signal to wake up queue workers.
|
||||
func (qs *queueState) notifyWorkers() {
|
||||
select {
|
||||
case qs.signal <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// taskQueueServiceImpl implements host.TaskQueueService with SQLite persistence
|
||||
// and background worker goroutines for task execution.
|
||||
type taskQueueServiceImpl struct {
|
||||
pluginName string
|
||||
manager *Manager
|
||||
maxConcurrency int32
|
||||
db *sql.DB
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
mu sync.Mutex
|
||||
queues map[string]*queueState
|
||||
|
||||
// For testing: override how callbacks are invoked
|
||||
invokeCallbackFn func(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) error
|
||||
}
|
||||
|
||||
// newTaskQueueService creates a new taskQueueServiceImpl with its own SQLite database.
|
||||
func newTaskQueueService(pluginName string, manager *Manager, maxConcurrency int32) (*taskQueueServiceImpl, error) {
|
||||
dataDir := filepath.Join(conf.Server.DataFolder, "plugins", pluginName)
|
||||
if err := os.MkdirAll(dataDir, 0700); err != nil {
|
||||
return nil, fmt.Errorf("creating plugin data directory: %w", err)
|
||||
}
|
||||
|
||||
dbPath := filepath.Join(dataDir, "taskqueue.db")
|
||||
db, err := sql.Open("sqlite3", dbPath+"?_busy_timeout=5000&_journal_mode=WAL&_foreign_keys=off")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("opening taskqueue database: %w", err)
|
||||
}
|
||||
|
||||
db.SetMaxOpenConns(3)
|
||||
db.SetMaxIdleConns(1)
|
||||
|
||||
if err := createTaskQueueSchema(db); err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("creating taskqueue schema: %w", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(manager.ctx)
|
||||
|
||||
s := &taskQueueServiceImpl{
|
||||
pluginName: pluginName,
|
||||
manager: manager,
|
||||
maxConcurrency: maxConcurrency,
|
||||
db: db,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
queues: make(map[string]*queueState),
|
||||
}
|
||||
s.invokeCallbackFn = s.defaultInvokeCallback
|
||||
|
||||
s.wg.Go(s.cleanupLoop)
|
||||
|
||||
log.Debug("Initialized plugin taskqueue", "plugin", pluginName, "path", dbPath, "maxConcurrency", maxConcurrency)
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func createTaskQueueSchema(db *sql.DB) error {
|
||||
_, err := db.Exec(`
|
||||
CREATE TABLE IF NOT EXISTS queues (
|
||||
name TEXT PRIMARY KEY,
|
||||
concurrency INTEGER NOT NULL DEFAULT 1,
|
||||
max_retries INTEGER NOT NULL DEFAULT 0,
|
||||
backoff_ms INTEGER NOT NULL DEFAULT 1000,
|
||||
delay_ms INTEGER NOT NULL DEFAULT 0,
|
||||
retention_ms INTEGER NOT NULL DEFAULT 3600000
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tasks (
|
||||
id TEXT PRIMARY KEY,
|
||||
queue_name TEXT NOT NULL REFERENCES queues(name),
|
||||
payload BLOB NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'pending',
|
||||
attempt INTEGER NOT NULL DEFAULT 0,
|
||||
max_retries INTEGER NOT NULL,
|
||||
next_run_at INTEGER NOT NULL,
|
||||
created_at INTEGER NOT NULL,
|
||||
updated_at INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_tasks_dequeue ON tasks(queue_name, status, next_run_at);
|
||||
`)
|
||||
return err
|
||||
}
|
||||
|
||||
// applyConfigDefaults fills zero-value config fields with sensible defaults
|
||||
// and clamps values to valid ranges, logging warnings for clamped values.
|
||||
func (s *taskQueueServiceImpl) applyConfigDefaults(ctx context.Context, name string, config *host.QueueConfig) {
|
||||
if config.Concurrency <= 0 {
|
||||
config.Concurrency = defaultConcurrency
|
||||
}
|
||||
if config.BackoffMs <= 0 {
|
||||
config.BackoffMs = defaultBackoffMs
|
||||
}
|
||||
if config.RetentionMs <= 0 {
|
||||
config.RetentionMs = defaultRetentionMs
|
||||
}
|
||||
|
||||
if config.RetentionMs < minRetentionMs {
|
||||
log.Warn(ctx, "TaskQueue retention clamped to minimum", "plugin", s.pluginName, "queue", name,
|
||||
"requested", config.RetentionMs, "min", minRetentionMs)
|
||||
config.RetentionMs = minRetentionMs
|
||||
}
|
||||
if config.RetentionMs > maxRetentionMs {
|
||||
log.Warn(ctx, "TaskQueue retention clamped to maximum", "plugin", s.pluginName, "queue", name,
|
||||
"requested", config.RetentionMs, "max", maxRetentionMs)
|
||||
config.RetentionMs = maxRetentionMs
|
||||
}
|
||||
}
|
||||
|
||||
// clampConcurrency reduces config.Concurrency if it exceeds the remaining budget.
|
||||
// Returns an error when the concurrency budget is fully exhausted.
|
||||
// Must be called with s.mu held.
|
||||
func (s *taskQueueServiceImpl) clampConcurrency(ctx context.Context, name string, config *host.QueueConfig) error {
|
||||
var allocated int32
|
||||
for _, qs := range s.queues {
|
||||
allocated += qs.config.Concurrency
|
||||
}
|
||||
available := s.maxConcurrency - allocated
|
||||
if available <= 0 {
|
||||
log.Warn(ctx, "TaskQueue concurrency budget exhausted", "plugin", s.pluginName, "queue", name,
|
||||
"allocated", allocated, "maxConcurrency", s.maxConcurrency)
|
||||
return fmt.Errorf("concurrency budget exhausted (%d/%d allocated)", allocated, s.maxConcurrency)
|
||||
}
|
||||
if config.Concurrency > available {
|
||||
log.Warn(ctx, "TaskQueue concurrency clamped", "plugin", s.pluginName, "queue", name,
|
||||
"requested", config.Concurrency, "available", available, "maxConcurrency", s.maxConcurrency)
|
||||
config.Concurrency = available
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *taskQueueServiceImpl) CreateQueue(ctx context.Context, name string, config host.QueueConfig) error {
|
||||
if len(name) == 0 {
|
||||
return fmt.Errorf("queue name cannot be empty")
|
||||
}
|
||||
if len(name) > maxQueueNameLength {
|
||||
return fmt.Errorf("queue name exceeds maximum length of %d bytes", maxQueueNameLength)
|
||||
}
|
||||
|
||||
s.applyConfigDefaults(ctx, name, &config)
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if err := s.clampConcurrency(ctx, name, &config); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, exists := s.queues[name]; exists {
|
||||
return fmt.Errorf("queue %q already exists", name)
|
||||
}
|
||||
|
||||
// Upsert into queues table (idempotent across restarts)
|
||||
_, err := s.db.ExecContext(ctx, `
|
||||
INSERT INTO queues (name, concurrency, max_retries, backoff_ms, delay_ms, retention_ms)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(name) DO UPDATE SET
|
||||
concurrency = excluded.concurrency,
|
||||
max_retries = excluded.max_retries,
|
||||
backoff_ms = excluded.backoff_ms,
|
||||
delay_ms = excluded.delay_ms,
|
||||
retention_ms = excluded.retention_ms
|
||||
`, name, config.Concurrency, config.MaxRetries, config.BackoffMs, config.DelayMs, config.RetentionMs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating queue: %w", err)
|
||||
}
|
||||
|
||||
// Reset stale running tasks from previous crash
|
||||
now := time.Now().UnixMilli()
|
||||
_, err = s.db.ExecContext(ctx, `
|
||||
UPDATE tasks SET status = ?, updated_at = ? WHERE queue_name = ? AND status = ?
|
||||
`, taskStatusPending, now, name, taskStatusRunning)
|
||||
if err != nil {
|
||||
return fmt.Errorf("resetting stale tasks: %w", err)
|
||||
}
|
||||
|
||||
qs := &queueState{
|
||||
config: config,
|
||||
signal: make(chan struct{}, 1),
|
||||
}
|
||||
if config.DelayMs > 0 {
|
||||
// Rate limit dispatches to enforce delay between tasks.
|
||||
// Burst of 1 allows one immediate dispatch, then enforces the delay interval.
|
||||
qs.limiter = rate.NewLimiter(rate.Every(time.Duration(config.DelayMs)*time.Millisecond), 1)
|
||||
}
|
||||
s.queues[name] = qs
|
||||
|
||||
for i := int32(0); i < config.Concurrency; i++ {
|
||||
s.wg.Go(func() { s.worker(name, qs) })
|
||||
}
|
||||
|
||||
log.Debug(ctx, "Created task queue", "plugin", s.pluginName, "queue", name,
|
||||
"concurrency", config.Concurrency, "maxRetries", config.MaxRetries,
|
||||
"backoffMs", config.BackoffMs, "delayMs", config.DelayMs, "retentionMs", config.RetentionMs)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *taskQueueServiceImpl) Enqueue(ctx context.Context, queueName string, payload []byte) (string, error) {
|
||||
s.mu.Lock()
|
||||
qs, exists := s.queues[queueName]
|
||||
s.mu.Unlock()
|
||||
|
||||
if !exists {
|
||||
return "", fmt.Errorf("queue %q does not exist", queueName)
|
||||
}
|
||||
if len(payload) > maxPayloadSize {
|
||||
return "", fmt.Errorf("payload size %d exceeds maximum of %d bytes", len(payload), maxPayloadSize)
|
||||
}
|
||||
|
||||
taskID := id.NewRandom()
|
||||
now := time.Now().UnixMilli()
|
||||
|
||||
_, err := s.db.ExecContext(ctx, `
|
||||
INSERT INTO tasks (id, queue_name, payload, status, attempt, max_retries, next_run_at, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?)
|
||||
`, taskID, queueName, payload, taskStatusPending, qs.config.MaxRetries, now, now, now)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("enqueuing task: %w", err)
|
||||
}
|
||||
|
||||
qs.notifyWorkers()
|
||||
log.Trace(ctx, "Enqueued task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
|
||||
return taskID, nil
|
||||
}
|
||||
|
||||
// GetTaskStatus returns the status of a task.
|
||||
func (s *taskQueueServiceImpl) GetTaskStatus(ctx context.Context, taskID string) (string, error) {
|
||||
var status string
|
||||
err := s.db.QueryRowContext(ctx, `SELECT status FROM tasks WHERE id = ?`, taskID).Scan(&status)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return "", fmt.Errorf("task %q not found", taskID)
|
||||
}
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("getting task status: %w", err)
|
||||
}
|
||||
return status, nil
|
||||
}
|
||||
|
||||
// CancelTask cancels a pending task.
|
||||
func (s *taskQueueServiceImpl) CancelTask(ctx context.Context, taskID string) error {
|
||||
now := time.Now().UnixMilli()
|
||||
result, err := s.db.ExecContext(ctx, `
|
||||
UPDATE tasks SET status = ?, updated_at = ? WHERE id = ? AND status = ?
|
||||
`, taskStatusCancelled, now, taskID, taskStatusPending)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cancelling task: %w", err)
|
||||
}
|
||||
|
||||
rowsAffected, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("checking cancel result: %w", err)
|
||||
}
|
||||
|
||||
if rowsAffected == 0 {
|
||||
// Check if task exists at all
|
||||
var status string
|
||||
err := s.db.QueryRowContext(ctx, `SELECT status FROM tasks WHERE id = ?`, taskID).Scan(&status)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return fmt.Errorf("task %q not found", taskID)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("checking task existence: %w", err)
|
||||
}
|
||||
return fmt.Errorf("task %q cannot be cancelled (status: %s)", taskID, status)
|
||||
}
|
||||
|
||||
log.Trace(ctx, "Cancelled task", "plugin", s.pluginName, "taskID", taskID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// worker is the main loop for a single worker goroutine.
|
||||
func (s *taskQueueServiceImpl) worker(queueName string, qs *queueState) {
|
||||
// Process any existing pending tasks immediately on startup
|
||||
s.drainQueue(queueName, qs)
|
||||
|
||||
ticker := time.NewTicker(pollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case <-qs.signal:
|
||||
s.drainQueue(queueName, qs)
|
||||
case <-ticker.C:
|
||||
s.drainQueue(queueName, qs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *taskQueueServiceImpl) drainQueue(queueName string, qs *queueState) {
|
||||
for s.ctx.Err() == nil && s.processTask(queueName, qs) {
|
||||
}
|
||||
}
|
||||
|
||||
// processTask dequeues and processes a single task. Returns true if a task was processed.
|
||||
func (s *taskQueueServiceImpl) processTask(queueName string, qs *queueState) bool {
|
||||
now := time.Now().UnixMilli()
|
||||
|
||||
// Atomically dequeue a task
|
||||
var taskID string
|
||||
var payload []byte
|
||||
var attempt, maxRetries int32
|
||||
err := s.db.QueryRowContext(s.ctx, `
|
||||
UPDATE tasks SET status = ?, attempt = attempt + 1, updated_at = ?
|
||||
WHERE id = (
|
||||
SELECT id FROM tasks
|
||||
WHERE queue_name = ? AND status = ? AND next_run_at <= ?
|
||||
ORDER BY next_run_at, created_at LIMIT 1
|
||||
)
|
||||
RETURNING id, payload, attempt, max_retries
|
||||
`, taskStatusRunning, now, queueName, taskStatusPending, now).Scan(&taskID, &payload, &attempt, &maxRetries)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return false
|
||||
}
|
||||
if err != nil {
|
||||
log.Error(s.ctx, "Failed to dequeue task", "plugin", s.pluginName, "queue", queueName, err)
|
||||
return false
|
||||
}
|
||||
|
||||
// Enforce delay between task dispatches using a rate limiter.
|
||||
// This is done after dequeue so that empty polls don't consume rate tokens.
|
||||
if qs.limiter != nil {
|
||||
if err := qs.limiter.Wait(s.ctx); err != nil {
|
||||
// Context cancelled during wait — revert task to pending for recovery
|
||||
s.revertTaskToPending(taskID)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Invoke callback
|
||||
log.Debug(s.ctx, "Executing task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID, "attempt", attempt)
|
||||
callbackErr := s.invokeCallbackFn(s.ctx, queueName, taskID, payload, attempt)
|
||||
|
||||
// If context was cancelled (shutdown), revert task to pending for recovery
|
||||
if s.ctx.Err() != nil {
|
||||
s.revertTaskToPending(taskID)
|
||||
return false
|
||||
}
|
||||
|
||||
if callbackErr == nil {
|
||||
s.completeTask(queueName, taskID)
|
||||
} else {
|
||||
s.handleTaskFailure(queueName, taskID, attempt, maxRetries, qs, callbackErr)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *taskQueueServiceImpl) completeTask(queueName, taskID string) {
|
||||
now := time.Now().UnixMilli()
|
||||
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, updated_at = ? WHERE id = ?`, taskStatusCompleted, now, taskID); err != nil {
|
||||
log.Error(s.ctx, "Failed to mark task as completed", "plugin", s.pluginName, "taskID", taskID, err)
|
||||
}
|
||||
log.Debug(s.ctx, "Task completed", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
|
||||
}
|
||||
|
||||
func (s *taskQueueServiceImpl) handleTaskFailure(queueName, taskID string, attempt, maxRetries int32, qs *queueState, callbackErr error) {
|
||||
log.Warn(s.ctx, "Task execution failed", "plugin", s.pluginName, "queue", queueName,
|
||||
"taskID", taskID, "attempt", attempt, "maxRetries", maxRetries, "err", callbackErr)
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
if attempt > maxRetries {
|
||||
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, updated_at = ? WHERE id = ?`, taskStatusFailed, now, taskID); err != nil {
|
||||
log.Error(s.ctx, "Failed to mark task as failed", "plugin", s.pluginName, "taskID", taskID, err)
|
||||
}
|
||||
log.Warn(s.ctx, "Task failed after all retries", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
|
||||
return
|
||||
}
|
||||
|
||||
// Exponential backoff: backoffMs * 2^(attempt-1)
|
||||
backoff := qs.config.BackoffMs << (attempt - 1)
|
||||
if backoff <= 0 || backoff > maxBackoffMs {
|
||||
backoff = maxBackoffMs
|
||||
}
|
||||
nextRunAt := now + backoff
|
||||
if _, err := s.db.ExecContext(s.ctx, `
|
||||
UPDATE tasks SET status = ?, next_run_at = ?, updated_at = ? WHERE id = ?
|
||||
`, taskStatusPending, nextRunAt, now, taskID); err != nil {
|
||||
log.Error(s.ctx, "Failed to reschedule task for retry", "plugin", s.pluginName, "taskID", taskID, err)
|
||||
}
|
||||
|
||||
// Wake worker after backoff expires
|
||||
time.AfterFunc(time.Duration(backoff)*time.Millisecond, func() {
|
||||
qs.notifyWorkers()
|
||||
})
|
||||
}
|
||||
|
||||
// revertTaskToPending puts a running task back to pending status and decrements the attempt
|
||||
// counter (used during shutdown to ensure the interrupted attempt doesn't count).
|
||||
func (s *taskQueueServiceImpl) revertTaskToPending(taskID string) {
|
||||
now := time.Now().UnixMilli()
|
||||
_, err := s.db.Exec(`UPDATE tasks SET status = ?, attempt = MAX(attempt - 1, 0), updated_at = ? WHERE id = ? AND status = ?`, taskStatusPending, now, taskID, taskStatusRunning)
|
||||
if err != nil {
|
||||
log.Error("Failed to revert task to pending", "plugin", s.pluginName, "taskID", taskID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// defaultInvokeCallback calls the plugin's nd_task_execute function.
|
||||
func (s *taskQueueServiceImpl) defaultInvokeCallback(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) error {
|
||||
s.manager.mu.RLock()
|
||||
p, ok := s.manager.plugins[s.pluginName]
|
||||
s.manager.mu.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return fmt.Errorf("plugin %s not loaded", s.pluginName)
|
||||
}
|
||||
|
||||
input := capabilities.TaskExecuteRequest{
|
||||
QueueName: queueName,
|
||||
TaskID: taskID,
|
||||
Payload: payload,
|
||||
Attempt: attempt,
|
||||
}
|
||||
|
||||
result, err := callPluginFunction[capabilities.TaskExecuteRequest, capabilities.TaskExecuteResponse](ctx, p, FuncTaskWorkerCallback, input)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if result.Error != "" {
|
||||
return fmt.Errorf("%s", result.Error)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// cleanupLoop periodically removes terminal tasks past their retention period.
|
||||
func (s *taskQueueServiceImpl) cleanupLoop() {
|
||||
ticker := time.NewTicker(cleanupInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.runCleanup()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runCleanup deletes terminal tasks past their retention period.
|
||||
func (s *taskQueueServiceImpl) runCleanup() {
|
||||
s.mu.Lock()
|
||||
queues := make(map[string]*queueState, len(s.queues))
|
||||
for k, v := range s.queues {
|
||||
queues[k] = v
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
for name, qs := range queues {
|
||||
result, err := s.db.ExecContext(s.ctx, `
|
||||
DELETE FROM tasks WHERE queue_name = ? AND status IN (?, ?, ?) AND updated_at + ? < ?
|
||||
`, name, taskStatusCompleted, taskStatusFailed, taskStatusCancelled, qs.config.RetentionMs, now)
|
||||
if err != nil {
|
||||
log.Error(s.ctx, "Failed to cleanup tasks", "plugin", s.pluginName, "queue", name, err)
|
||||
continue
|
||||
}
|
||||
if deleted, _ := result.RowsAffected(); deleted > 0 {
|
||||
log.Debug(s.ctx, "Cleaned up terminal tasks", "plugin", s.pluginName, "queue", name, "deleted", deleted)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close shuts down the task queue service, stopping all workers and closing the database.
|
||||
func (s *taskQueueServiceImpl) Close() error {
|
||||
// Cancel context to signal all goroutines
|
||||
s.cancel()
|
||||
|
||||
// Wait for goroutines with timeout
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
s.wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(shutdownTimeout):
|
||||
log.Warn("TaskQueue shutdown timed out", "plugin", s.pluginName)
|
||||
}
|
||||
|
||||
// Mark running tasks as pending for recovery on next startup
|
||||
if s.db != nil {
|
||||
now := time.Now().UnixMilli()
|
||||
if _, err := s.db.Exec(`UPDATE tasks SET status = ?, updated_at = ? WHERE status = ?`, taskStatusPending, now, taskStatusRunning); err != nil {
|
||||
log.Error("Failed to reset running tasks on shutdown", "plugin", s.pluginName, err)
|
||||
}
|
||||
log.Debug("Closing plugin taskqueue", "plugin", s.pluginName)
|
||||
return s.db.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Compile-time verification
|
||||
var _ host.TaskQueueService = (*taskQueueServiceImpl)(nil)
|
||||
var _ io.Closer = (*taskQueueServiceImpl)(nil)
|
||||
968
plugins/host_taskqueue_test.go
Normal file
968
plugins/host_taskqueue_test.go
Normal file
@@ -0,0 +1,968 @@
|
||||
//go:build !windows
|
||||
|
||||
package plugins
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/navidrome/navidrome/conf"
|
||||
"github.com/navidrome/navidrome/conf/configtest"
|
||||
"github.com/navidrome/navidrome/model"
|
||||
"github.com/navidrome/navidrome/plugins/host"
|
||||
"github.com/navidrome/navidrome/tests"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("TaskQueueService", func() {
|
||||
var tmpDir string
|
||||
var service *taskQueueServiceImpl
|
||||
var ctx context.Context
|
||||
var manager *Manager
|
||||
|
||||
BeforeEach(func() {
|
||||
ctx = GinkgoT().Context()
|
||||
var err error
|
||||
tmpDir, err = os.MkdirTemp("", "taskqueue-test-*")
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
DeferCleanup(configtest.SetupConfig())
|
||||
conf.Server.DataFolder = tmpDir
|
||||
|
||||
// Create a mock manager with context
|
||||
managerCtx, cancel := context.WithCancel(ctx)
|
||||
manager = &Manager{
|
||||
plugins: make(map[string]*plugin),
|
||||
ctx: managerCtx,
|
||||
}
|
||||
DeferCleanup(cancel)
|
||||
|
||||
service, err = newTaskQueueService("test_plugin", manager, 5)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
if service != nil {
|
||||
service.Close()
|
||||
}
|
||||
os.RemoveAll(tmpDir)
|
||||
})
|
||||
|
||||
Describe("CreateQueue", func() {
|
||||
It("creates a queue successfully", func() {
|
||||
err := service.CreateQueue(ctx, "my-queue", host.QueueConfig{
|
||||
Concurrency: 2,
|
||||
MaxRetries: 3,
|
||||
BackoffMs: 2000,
|
||||
RetentionMs: 7200000,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
service.mu.Lock()
|
||||
qs, exists := service.queues["my-queue"]
|
||||
service.mu.Unlock()
|
||||
Expect(exists).To(BeTrue())
|
||||
Expect(qs.config.Concurrency).To(Equal(int32(2)))
|
||||
Expect(qs.config.MaxRetries).To(Equal(int32(3)))
|
||||
Expect(qs.config.BackoffMs).To(Equal(int64(2000)))
|
||||
Expect(qs.config.RetentionMs).To(Equal(int64(7200000)))
|
||||
})
|
||||
|
||||
It("returns error for duplicate queue name", func() {
|
||||
err := service.CreateQueue(ctx, "dup-queue", host.QueueConfig{})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
err = service.CreateQueue(ctx, "dup-queue", host.QueueConfig{})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("already exists"))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("CreateQueue name validation", func() {
|
||||
It("rejects empty queue name", func() {
|
||||
err := service.CreateQueue(ctx, "", host.QueueConfig{})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("queue name cannot be empty"))
|
||||
})
|
||||
|
||||
It("rejects over-length queue name", func() {
|
||||
longName := strings.Repeat("a", maxQueueNameLength+1)
|
||||
err := service.CreateQueue(ctx, longName, host.QueueConfig{})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("exceeds maximum length"))
|
||||
})
|
||||
|
||||
It("accepts queue name at maximum length", func() {
|
||||
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error {
|
||||
return nil
|
||||
}
|
||||
exactName := strings.Repeat("a", maxQueueNameLength)
|
||||
err := service.CreateQueue(ctx, exactName, host.QueueConfig{})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
})
|
||||
})
|
||||
|
||||
Describe("CreateQueue defaults", func() {
|
||||
It("applies defaults for zero-value config", func() {
|
||||
err := service.CreateQueue(ctx, "defaults-queue", host.QueueConfig{})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
service.mu.Lock()
|
||||
qs := service.queues["defaults-queue"]
|
||||
service.mu.Unlock()
|
||||
Expect(qs.config.Concurrency).To(Equal(defaultConcurrency))
|
||||
Expect(qs.config.BackoffMs).To(Equal(defaultBackoffMs))
|
||||
Expect(qs.config.RetentionMs).To(Equal(defaultRetentionMs))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("CreateQueue defaults with negative values", func() {
|
||||
It("applies default RetentionMs for negative value", func() {
|
||||
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error {
|
||||
return nil
|
||||
}
|
||||
err := service.CreateQueue(ctx, "neg-retention", host.QueueConfig{
|
||||
RetentionMs: -500,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
service.mu.Lock()
|
||||
qs := service.queues["neg-retention"]
|
||||
service.mu.Unlock()
|
||||
Expect(qs.config.RetentionMs).To(Equal(defaultRetentionMs))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("CreateQueue clamping", func() {
|
||||
It("clamps concurrency exceeding maxConcurrency", func() {
|
||||
// maxConcurrency is 5; request 10
|
||||
err := service.CreateQueue(ctx, "clamped-queue", host.QueueConfig{
|
||||
Concurrency: 10,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
service.mu.Lock()
|
||||
qs := service.queues["clamped-queue"]
|
||||
service.mu.Unlock()
|
||||
Expect(qs.config.Concurrency).To(Equal(int32(5)))
|
||||
})
|
||||
|
||||
It("returns error when concurrency budget is exhausted", func() {
|
||||
// maxConcurrency is 5; create a queue that uses all 5
|
||||
err := service.CreateQueue(ctx, "full-budget", host.QueueConfig{
|
||||
Concurrency: 5,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// Next queue should fail — no budget remaining
|
||||
err = service.CreateQueue(ctx, "over-budget", host.QueueConfig{
|
||||
Concurrency: 1,
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("concurrency budget exhausted"))
|
||||
})
|
||||
|
||||
It("clamps retention below minimum", func() {
|
||||
err := service.CreateQueue(ctx, "low-retention", host.QueueConfig{
|
||||
RetentionMs: 100, // below minRetentionMs
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
service.mu.Lock()
|
||||
qs := service.queues["low-retention"]
|
||||
service.mu.Unlock()
|
||||
Expect(qs.config.RetentionMs).To(Equal(minRetentionMs))
|
||||
})
|
||||
|
||||
It("clamps retention above maximum", func() {
|
||||
err := service.CreateQueue(ctx, "high-retention", host.QueueConfig{
|
||||
RetentionMs: 999_999_999_999, // above maxRetentionMs
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
service.mu.Lock()
|
||||
qs := service.queues["high-retention"]
|
||||
service.mu.Unlock()
|
||||
Expect(qs.config.RetentionMs).To(Equal(maxRetentionMs))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Enqueue", func() {
|
||||
BeforeEach(func() {
|
||||
// Use a no-op callback to prevent actual execution attempts
|
||||
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error {
|
||||
return nil
|
||||
}
|
||||
err := service.CreateQueue(ctx, "enqueue-test", host.QueueConfig{})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
})
|
||||
|
||||
It("enqueues a task and returns task ID", func() {
|
||||
taskID, err := service.Enqueue(ctx, "enqueue-test", []byte("payload"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(taskID).ToNot(BeEmpty())
|
||||
})
|
||||
|
||||
It("returns error for non-existent queue", func() {
|
||||
_, err := service.Enqueue(ctx, "no-such-queue", []byte("payload"))
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("does not exist"))
|
||||
})
|
||||
|
||||
It("rejects payload exceeding maximum size", func() {
|
||||
bigPayload := make([]byte, maxPayloadSize+1)
|
||||
_, err := service.Enqueue(ctx, "enqueue-test", bigPayload)
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("exceeds maximum"))
|
||||
})
|
||||
|
||||
It("accepts payload at maximum size", func() {
|
||||
exactPayload := make([]byte, maxPayloadSize)
|
||||
taskID, err := service.Enqueue(ctx, "enqueue-test", exactPayload)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(taskID).ToNot(BeEmpty())
|
||||
})
|
||||
})
|
||||
|
||||
Describe("GetTaskStatus", func() {
|
||||
BeforeEach(func() {
|
||||
// Use a callback that blocks until context is cancelled so tasks stay pending
|
||||
service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) error {
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
})
|
||||
|
||||
It("returns pending for a new task", func() {
|
||||
err := service.CreateQueue(ctx, "status-test", host.QueueConfig{})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
taskID, err := service.Enqueue(ctx, "status-test", []byte("data"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// The task may get picked up quickly; check initial status
|
||||
// Since the callback blocks, it should be either pending or running
|
||||
status, err := service.GetTaskStatus(ctx, taskID)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(status).To(BeElementOf("pending", "running"))
|
||||
})
|
||||
|
||||
It("returns error for unknown task ID", func() {
|
||||
_, err := service.GetTaskStatus(ctx, "nonexistent-id")
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("not found"))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("CancelTask", func() {
|
||||
BeforeEach(func() {
|
||||
// Block callback so tasks stay in pending/running
|
||||
service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) error {
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
})
|
||||
|
||||
It("cancels a pending task", func() {
|
||||
// Block the callback so the first task occupies the worker
|
||||
started := make(chan struct{})
|
||||
service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) error {
|
||||
close(started)
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
err := service.CreateQueue(ctx, "cancel-test", host.QueueConfig{
|
||||
Concurrency: 1,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// Enqueue a blocker task to occupy the single worker
|
||||
_, err = service.Enqueue(ctx, "cancel-test", []byte("blocker"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// Wait for the blocker task to start running
|
||||
Eventually(started).WithTimeout(5 * time.Second).Should(BeClosed())
|
||||
|
||||
// Enqueue a second task — it stays pending since the worker is busy
|
||||
taskID, err := service.Enqueue(ctx, "cancel-test", []byte("cancel-me"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
err = service.CancelTask(ctx, taskID)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
status, err := service.GetTaskStatus(ctx, taskID)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(status).To(Equal("cancelled"))
|
||||
})
|
||||
|
||||
It("returns error for unknown task ID", func() {
|
||||
err := service.CancelTask(ctx, "nonexistent-id")
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("not found"))
|
||||
})
|
||||
|
||||
It("returns error for non-pending task", func() {
|
||||
// Create a queue where tasks complete immediately
|
||||
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error {
|
||||
return nil
|
||||
}
|
||||
err := service.CreateQueue(ctx, "completed-test", host.QueueConfig{})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
taskID, err := service.Enqueue(ctx, "completed-test", []byte("data"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// Wait for task to complete
|
||||
Eventually(func() string {
|
||||
status, _ := service.GetTaskStatus(ctx, taskID)
|
||||
return status
|
||||
}).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed"))
|
||||
|
||||
// Try to cancel completed task
|
||||
err = service.CancelTask(ctx, taskID)
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("cannot be cancelled"))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Worker execution", func() {
|
||||
It("invokes callback and completes task", func() {
|
||||
var callCount atomic.Int32
|
||||
var receivedQueueName, receivedTaskID string
|
||||
var receivedPayload []byte
|
||||
var receivedAttempt int32
|
||||
|
||||
service.invokeCallbackFn = func(_ context.Context, queueName, taskID string, payload []byte, attempt int32) error {
|
||||
callCount.Add(1)
|
||||
receivedQueueName = queueName
|
||||
receivedTaskID = taskID
|
||||
receivedPayload = payload
|
||||
receivedAttempt = attempt
|
||||
return nil
|
||||
}
|
||||
|
||||
err := service.CreateQueue(ctx, "worker-test", host.QueueConfig{})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
taskID, err := service.Enqueue(ctx, "worker-test", []byte("test-payload"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
Eventually(func() string {
|
||||
status, _ := service.GetTaskStatus(ctx, taskID)
|
||||
return status
|
||||
}).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed"))
|
||||
|
||||
Expect(callCount.Load()).To(Equal(int32(1)))
|
||||
Expect(receivedQueueName).To(Equal("worker-test"))
|
||||
Expect(receivedTaskID).To(Equal(taskID))
|
||||
Expect(receivedPayload).To(Equal([]byte("test-payload")))
|
||||
Expect(receivedAttempt).To(Equal(int32(1)))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Retry on failure", func() {
|
||||
It("retries and eventually fails after exhausting retries", func() {
|
||||
var callCount atomic.Int32
|
||||
|
||||
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error {
|
||||
callCount.Add(1)
|
||||
return fmt.Errorf("task failed")
|
||||
}
|
||||
|
||||
err := service.CreateQueue(ctx, "retry-test", host.QueueConfig{
|
||||
MaxRetries: 2,
|
||||
BackoffMs: 10, // Very short for testing
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
taskID, err := service.Enqueue(ctx, "retry-test", []byte("retry-payload"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
Eventually(func() string {
|
||||
status, _ := service.GetTaskStatus(ctx, taskID)
|
||||
return status
|
||||
}).WithTimeout(10 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("failed"))
|
||||
|
||||
// 1 initial attempt + 2 retries = 3 total calls
|
||||
Expect(callCount.Load()).To(Equal(int32(3)))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Retry then succeed", func() {
|
||||
It("retries and succeeds on second attempt", func() {
|
||||
var callCount atomic.Int32
|
||||
|
||||
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, attempt int32) error {
|
||||
callCount.Add(1)
|
||||
if attempt == 1 {
|
||||
return fmt.Errorf("temporary error")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
err := service.CreateQueue(ctx, "retry-succeed", host.QueueConfig{
|
||||
MaxRetries: 1,
|
||||
BackoffMs: 10, // Very short for testing
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
taskID, err := service.Enqueue(ctx, "retry-succeed", []byte("data"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
Eventually(func() string {
|
||||
status, _ := service.GetTaskStatus(ctx, taskID)
|
||||
return status
|
||||
}).WithTimeout(10 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed"))
|
||||
|
||||
Expect(callCount.Load()).To(Equal(int32(2)))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Backoff overflow cap", func() {
|
||||
It("caps backoff at maxRetentionMs to prevent overflow", func() {
|
||||
var callCount atomic.Int32
|
||||
|
||||
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error {
|
||||
callCount.Add(1)
|
||||
return fmt.Errorf("always fail")
|
||||
}
|
||||
|
||||
err := service.CreateQueue(ctx, "backoff-overflow", host.QueueConfig{
|
||||
MaxRetries: 3,
|
||||
BackoffMs: 1_000_000_000, // Very large backoff to trigger overflow on exponentiation
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
taskID, err := service.Enqueue(ctx, "backoff-overflow", []byte("overflow-test"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// Wait for first attempt to fail
|
||||
Eventually(func() int32 {
|
||||
return callCount.Load()
|
||||
}).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(BeNumerically(">=", int32(1)))
|
||||
|
||||
// Check next_run_at is positive and reasonable (capped at maxRetentionMs from now)
|
||||
var nextRunAt int64
|
||||
err = service.db.QueryRow(`SELECT next_run_at FROM tasks WHERE id = ?`, taskID).Scan(&nextRunAt)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
Expect(nextRunAt).To(BeNumerically(">", int64(0)), "next_run_at should be positive")
|
||||
Expect(nextRunAt).To(BeNumerically("<=", now+maxBackoffMs+1000), "next_run_at should be at most maxBackoffMs from now")
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Delay enforcement with concurrent workers", func() {
|
||||
It("enforces delay between dispatches even with multiple workers", func() {
|
||||
var mu sync.Mutex
|
||||
var dispatchTimes []time.Time
|
||||
|
||||
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error {
|
||||
mu.Lock()
|
||||
dispatchTimes = append(dispatchTimes, time.Now())
|
||||
mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
err := service.CreateQueue(ctx, "delay-concurrent", host.QueueConfig{
|
||||
Concurrency: 3,
|
||||
DelayMs: 200,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// Enqueue 5 tasks
|
||||
for i := 0; i < 5; i++ {
|
||||
_, err := service.Enqueue(ctx, "delay-concurrent", []byte(fmt.Sprintf("task-%d", i)))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
}
|
||||
|
||||
// Wait for all tasks to complete
|
||||
Eventually(func() int {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
return len(dispatchTimes)
|
||||
}).WithTimeout(10 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal(5))
|
||||
|
||||
// Sort dispatch times and verify gaps
|
||||
mu.Lock()
|
||||
sort.Slice(dispatchTimes, func(i, j int) bool {
|
||||
return dispatchTimes[i].Before(dispatchTimes[j])
|
||||
})
|
||||
times := make([]time.Time, len(dispatchTimes))
|
||||
copy(times, dispatchTimes)
|
||||
mu.Unlock()
|
||||
|
||||
// Consecutive dispatches should have at least ~160ms gap (80% of 200ms)
|
||||
for i := 1; i < len(times); i++ {
|
||||
gap := times[i].Sub(times[i-1])
|
||||
Expect(gap).To(BeNumerically(">=", 160*time.Millisecond),
|
||||
fmt.Sprintf("gap between dispatch %d and %d was %v, expected >= 160ms", i-1, i, gap))
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Shutdown recovery", func() {
|
||||
It("resets stale running tasks on CreateQueue", func() {
|
||||
// Create a first service and queue, enqueue a task
|
||||
service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) error {
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
err := service.CreateQueue(ctx, "recovery-queue", host.QueueConfig{})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
taskID, err := service.Enqueue(ctx, "recovery-queue", []byte("stale-task"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// Wait for the task to start running
|
||||
Eventually(func() string {
|
||||
status, _ := service.GetTaskStatus(ctx, taskID)
|
||||
return status
|
||||
}).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("running"))
|
||||
|
||||
// Close the service (simulates crash - tasks left in running state)
|
||||
service.Close()
|
||||
|
||||
// Create a new service pointing to the same DB
|
||||
managerCtx2, cancel2 := context.WithCancel(ctx)
|
||||
DeferCleanup(cancel2)
|
||||
manager2 := &Manager{
|
||||
plugins: make(map[string]*plugin),
|
||||
ctx: managerCtx2,
|
||||
}
|
||||
|
||||
service, err = newTaskQueueService("test_plugin", manager2, 5)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// Override callback to succeed
|
||||
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Re-create the queue - the upsert handles the existing row from the old service
|
||||
err = service.CreateQueue(ctx, "recovery-queue", host.QueueConfig{})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// The stale running task should now be reset to pending and eventually completed
|
||||
Eventually(func() string {
|
||||
status, _ := service.GetTaskStatus(ctx, taskID)
|
||||
return status
|
||||
}).WithTimeout(10 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed"))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Close", func() {
|
||||
It("prevents subsequent operations after close", func() {
|
||||
err := service.CreateQueue(ctx, "close-test", host.QueueConfig{})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
service.Close()
|
||||
|
||||
// After close, operations should fail
|
||||
_, err = service.Enqueue(ctx, "close-test", []byte("data"))
|
||||
Expect(err).To(HaveOccurred())
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Plugin isolation", func() {
|
||||
It("uses separate databases for different plugins", func() {
|
||||
managerCtx2, cancel2 := context.WithCancel(ctx)
|
||||
DeferCleanup(cancel2)
|
||||
manager2 := &Manager{
|
||||
plugins: make(map[string]*plugin),
|
||||
ctx: managerCtx2,
|
||||
}
|
||||
|
||||
service2, err := newTaskQueueService("other_plugin", manager2, 5)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
defer service2.Close()
|
||||
|
||||
// Check that separate database files exist
|
||||
_, err = os.Stat(filepath.Join(tmpDir, "plugins", "test_plugin", "taskqueue.db"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
_, err = os.Stat(filepath.Join(tmpDir, "plugins", "other_plugin", "taskqueue.db"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// Both services should be able to create queues with the same name independently
|
||||
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error { return nil }
|
||||
service2.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error { return nil }
|
||||
|
||||
err = service.CreateQueue(ctx, "shared-name", host.QueueConfig{})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = service2.CreateQueue(ctx, "shared-name", host.QueueConfig{})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// Enqueue to each and verify they work independently
|
||||
taskID1, err := service.Enqueue(ctx, "shared-name", []byte("plugin1"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
taskID2, err := service2.Enqueue(ctx, "shared-name", []byte("plugin2"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
Expect(taskID1).ToNot(Equal(taskID2))
|
||||
|
||||
// Both should complete
|
||||
Eventually(func() string {
|
||||
status, _ := service.GetTaskStatus(ctx, taskID1)
|
||||
return status
|
||||
}).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed"))
|
||||
|
||||
Eventually(func() string {
|
||||
status, _ := service2.GetTaskStatus(ctx, taskID2)
|
||||
return status
|
||||
}).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed"))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("TaskQueueService Integration", Ordered, func() {
|
||||
var manager *Manager
|
||||
var tmpDir string
|
||||
|
||||
BeforeAll(func() {
|
||||
var err error
|
||||
tmpDir, err = os.MkdirTemp("", "taskqueue-integration-test-*")
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// Copy the test-taskqueue plugin
|
||||
srcPath := filepath.Join(testdataDir, "test-taskqueue"+PackageExtension)
|
||||
destPath := filepath.Join(tmpDir, "test-taskqueue"+PackageExtension)
|
||||
data, err := os.ReadFile(srcPath)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = os.WriteFile(destPath, data, 0600)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// Compute SHA256 for the plugin
|
||||
hash := sha256.Sum256(data)
|
||||
hashHex := hex.EncodeToString(hash[:])
|
||||
|
||||
// Setup config
|
||||
DeferCleanup(configtest.SetupConfig())
|
||||
conf.Server.Plugins.Enabled = true
|
||||
conf.Server.Plugins.Folder = tmpDir
|
||||
conf.Server.Plugins.AutoReload = false
|
||||
conf.Server.CacheFolder = filepath.Join(tmpDir, "cache")
|
||||
conf.Server.DataFolder = tmpDir
|
||||
|
||||
// Setup mock DataStore with pre-enabled plugin
|
||||
mockPluginRepo := tests.CreateMockPluginRepo()
|
||||
mockPluginRepo.Permitted = true
|
||||
mockPluginRepo.SetData(model.Plugins{{
|
||||
ID: "test-taskqueue",
|
||||
Path: destPath,
|
||||
SHA256: hashHex,
|
||||
Enabled: true,
|
||||
}})
|
||||
dataStore := &tests.MockDataStore{MockedPlugin: mockPluginRepo}
|
||||
|
||||
// Create and start manager
|
||||
manager = &Manager{
|
||||
plugins: make(map[string]*plugin),
|
||||
ds: dataStore,
|
||||
metrics: noopMetricsRecorder{},
|
||||
subsonicRouter: http.NotFoundHandler(),
|
||||
}
|
||||
err = manager.Start(GinkgoT().Context())
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
DeferCleanup(func() {
|
||||
_ = manager.Stop()
|
||||
_ = os.RemoveAll(tmpDir)
|
||||
})
|
||||
})
|
||||
|
||||
// Helper types for calling the test plugin
|
||||
type testQueueConfig struct {
|
||||
Concurrency int32 `json:"concurrency,omitempty"`
|
||||
MaxRetries int32 `json:"maxRetries,omitempty"`
|
||||
BackoffMs int64 `json:"backoffMs,omitempty"`
|
||||
DelayMs int64 `json:"delayMs,omitempty"`
|
||||
RetentionMs int64 `json:"retentionMs,omitempty"`
|
||||
}
|
||||
|
||||
type testTaskQueueInput struct {
|
||||
Operation string `json:"operation"`
|
||||
QueueName string `json:"queueName,omitempty"`
|
||||
Config *testQueueConfig `json:"config,omitempty"`
|
||||
Payload []byte `json:"payload,omitempty"`
|
||||
TaskID string `json:"taskId,omitempty"`
|
||||
}
|
||||
|
||||
type testTaskQueueOutput struct {
|
||||
TaskID string `json:"taskId,omitempty"`
|
||||
Status string `json:"status,omitempty"`
|
||||
Error *string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
callTestTaskQueue := func(ctx context.Context, input testTaskQueueInput) (*testTaskQueueOutput, error) {
|
||||
manager.mu.RLock()
|
||||
p := manager.plugins["test-taskqueue"]
|
||||
manager.mu.RUnlock()
|
||||
|
||||
instance, err := p.instance(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer instance.Close(ctx)
|
||||
|
||||
inputBytes, _ := json.Marshal(input)
|
||||
_, outputBytes, err := instance.Call("nd_test_taskqueue", inputBytes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var output testTaskQueueOutput
|
||||
if err := json.Unmarshal(outputBytes, &output); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if output.Error != nil {
|
||||
return nil, errors.New(*output.Error)
|
||||
}
|
||||
return &output, nil
|
||||
}
|
||||
|
||||
Describe("Plugin Loading", func() {
|
||||
It("should load plugin with taskqueue permission and TaskWorker capability", func() {
|
||||
manager.mu.RLock()
|
||||
p, ok := manager.plugins["test-taskqueue"]
|
||||
manager.mu.RUnlock()
|
||||
Expect(ok).To(BeTrue())
|
||||
Expect(p.manifest.Permissions).ToNot(BeNil())
|
||||
Expect(p.manifest.Permissions.Taskqueue).ToNot(BeNil())
|
||||
Expect(p.manifest.Permissions.Taskqueue.MaxConcurrency).To(Equal(10))
|
||||
Expect(p.capabilities).To(ContainElement(CapabilityTaskWorker))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Create Queue", func() {
|
||||
It("should create a queue without error", func() {
|
||||
ctx := GinkgoT().Context()
|
||||
_, err := callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "create_queue",
|
||||
QueueName: "test-create",
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should return error for duplicate queue name", func() {
|
||||
ctx := GinkgoT().Context()
|
||||
_, err := callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "create_queue",
|
||||
QueueName: "test-dup",
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
_, err = callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "create_queue",
|
||||
QueueName: "test-dup",
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("already exists"))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Enqueue and Task Completion", func() {
|
||||
It("should enqueue a task and complete successfully", func() {
|
||||
ctx := GinkgoT().Context()
|
||||
|
||||
// Create queue
|
||||
_, err := callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "create_queue",
|
||||
QueueName: "test-complete",
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// Enqueue task with payload "hello"
|
||||
output, err := callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "enqueue",
|
||||
QueueName: "test-complete",
|
||||
Payload: []byte("hello"),
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(output.TaskID).ToNot(BeEmpty())
|
||||
|
||||
taskID := output.TaskID
|
||||
|
||||
// Poll until completed
|
||||
Eventually(func() string {
|
||||
out, err := callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "get_task_status",
|
||||
TaskID: taskID,
|
||||
})
|
||||
if err != nil {
|
||||
return "error"
|
||||
}
|
||||
return out.Status
|
||||
}).WithTimeout(5 * time.Second).WithPolling(100 * time.Millisecond).Should(Equal("completed"))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Enqueue with Failure, No Retries", func() {
|
||||
It("should fail when payload is 'fail' and maxRetries is 0", func() {
|
||||
ctx := GinkgoT().Context()
|
||||
|
||||
// Create queue with no retries
|
||||
_, err := callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "create_queue",
|
||||
QueueName: "test-fail-no-retry",
|
||||
Config: &testQueueConfig{
|
||||
MaxRetries: 0,
|
||||
},
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// Enqueue task that will fail
|
||||
output, err := callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "enqueue",
|
||||
QueueName: "test-fail-no-retry",
|
||||
Payload: []byte("fail"),
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
taskID := output.TaskID
|
||||
|
||||
// Poll until failed
|
||||
Eventually(func() string {
|
||||
out, err := callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "get_task_status",
|
||||
TaskID: taskID,
|
||||
})
|
||||
if err != nil {
|
||||
return "error"
|
||||
}
|
||||
return out.Status
|
||||
}).WithTimeout(5 * time.Second).WithPolling(100 * time.Millisecond).Should(Equal("failed"))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Enqueue with Retry Then Success", func() {
|
||||
It("should retry and eventually succeed with 'fail-then-succeed' payload", func() {
|
||||
ctx := GinkgoT().Context()
|
||||
|
||||
// Create queue with retries and short backoff
|
||||
_, err := callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "create_queue",
|
||||
QueueName: "test-retry-succeed",
|
||||
Config: &testQueueConfig{
|
||||
MaxRetries: 2,
|
||||
BackoffMs: 100,
|
||||
},
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// Enqueue task that fails on attempt < 2, then succeeds
|
||||
output, err := callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "enqueue",
|
||||
QueueName: "test-retry-succeed",
|
||||
Payload: []byte("fail-then-succeed"),
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
taskID := output.TaskID
|
||||
|
||||
// Poll until completed
|
||||
Eventually(func() string {
|
||||
out, err := callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "get_task_status",
|
||||
TaskID: taskID,
|
||||
})
|
||||
if err != nil {
|
||||
return "error"
|
||||
}
|
||||
return out.Status
|
||||
}).WithTimeout(5 * time.Second).WithPolling(100 * time.Millisecond).Should(Equal("completed"))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Cancel Pending Task", func() {
|
||||
It("should cancel a pending task", func() {
|
||||
ctx := GinkgoT().Context()
|
||||
|
||||
// Create queue with concurrency=1 and a large delay between dispatches.
|
||||
// The first task completes immediately (burst token), the second is dequeued
|
||||
// but blocks on the rate limiter. Tasks 3+ remain in 'pending' and can be cancelled.
|
||||
_, err := callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "create_queue",
|
||||
QueueName: "test-cancel",
|
||||
Config: &testQueueConfig{
|
||||
Concurrency: 1,
|
||||
DelayMs: 60000,
|
||||
},
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// Enqueue several tasks - the first will complete immediately,
|
||||
// the second will be dequeued but block on the rate limiter (status=running),
|
||||
// the rest will stay pending.
|
||||
var taskIDs []string
|
||||
for i := 0; i < 5; i++ {
|
||||
output, err := callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "enqueue",
|
||||
QueueName: "test-cancel",
|
||||
Payload: []byte("hello"),
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
taskIDs = append(taskIDs, output.TaskID)
|
||||
}
|
||||
|
||||
// Wait for the first task to complete (it has no delay)
|
||||
Eventually(func() string {
|
||||
out, err := callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "get_task_status",
|
||||
TaskID: taskIDs[0],
|
||||
})
|
||||
if err != nil {
|
||||
return "error"
|
||||
}
|
||||
return out.Status
|
||||
}).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed"))
|
||||
|
||||
// Give the worker a moment to dequeue the second task (which will
|
||||
// block on the delay) so tasks 3+ stay in 'pending'
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Cancel the last task - it should still be pending
|
||||
lastTaskID := taskIDs[len(taskIDs)-1]
|
||||
_, err = callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "cancel_task",
|
||||
TaskID: lastTaskID,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// Verify status is cancelled
|
||||
statusOut, err := callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "get_task_status",
|
||||
TaskID: lastTaskID,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(statusOut.Status).To(Equal("cancelled"))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Enqueue to Non-Existent Queue", func() {
|
||||
It("should return error when enqueueing to a queue that does not exist", func() {
|
||||
ctx := GinkgoT().Context()
|
||||
|
||||
_, err := callTestTaskQueue(ctx, testTaskQueueInput{
|
||||
Operation: "enqueue",
|
||||
QueueName: "nonexistent-queue",
|
||||
Payload: []byte("payload"),
|
||||
})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("does not exist"))
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -256,8 +256,11 @@ func (s *webSocketServiceImpl) isHostAllowed(host string) bool {
|
||||
}
|
||||
|
||||
// matchHostPattern matches a host against a pattern.
|
||||
// Supports wildcards like *.example.com
|
||||
// Supports "*" (allow all) and wildcards like "*.example.com".
|
||||
func matchHostPattern(pattern, host string) bool {
|
||||
if pattern == "*" {
|
||||
return true
|
||||
}
|
||||
if pattern == host {
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -575,6 +575,12 @@ var _ = Describe("WebSocketService", Ordered, func() {
|
||||
Expect(matchHostPattern("*.example.com", "deep.api.example.com")).To(BeTrue())
|
||||
})
|
||||
|
||||
It("should match bare '*' as allow-all", func() {
|
||||
Expect(matchHostPattern("*", "anything.example.com")).To(BeTrue())
|
||||
Expect(matchHostPattern("*", "127.0.0.1")).To(BeTrue())
|
||||
Expect(matchHostPattern("*", "::1")).To(BeTrue())
|
||||
})
|
||||
|
||||
It("should not match partial patterns", func() {
|
||||
Expect(matchHostPattern("*.example.com", "example.com.evil.org")).To(BeFalse())
|
||||
})
|
||||
|
||||
@@ -119,6 +119,32 @@ var hostServices = []hostServiceEntry{
|
||||
return host.RegisterUsersHostFunctions(service), nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "HTTP",
|
||||
hasPermission: func(p *Permissions) bool { return p != nil && p.Http != nil },
|
||||
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
||||
perm := ctx.permissions.Http
|
||||
service := newHTTPService(ctx.pluginName, perm)
|
||||
return host.RegisterHTTPHostFunctions(service), nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "TaskQueue",
|
||||
hasPermission: func(p *Permissions) bool { return p != nil && p.Taskqueue != nil },
|
||||
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
||||
perm := ctx.permissions.Taskqueue
|
||||
maxConcurrency := int32(1)
|
||||
if perm.MaxConcurrency > 0 {
|
||||
maxConcurrency = int32(perm.MaxConcurrency)
|
||||
}
|
||||
service, err := newTaskQueueService(ctx.pluginName, ctx.manager, maxConcurrency)
|
||||
if err != nil {
|
||||
log.Error("Failed to create TaskQueue service", "plugin", ctx.pluginName, err)
|
||||
return nil, nil
|
||||
}
|
||||
return host.RegisterTaskQueueHostFunctions(service), service
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// extractManifest reads manifest from an .ndp package and computes its SHA-256 hash.
|
||||
|
||||
@@ -110,6 +110,9 @@
|
||||
},
|
||||
"users": {
|
||||
"$ref": "#/$defs/UsersPermission"
|
||||
},
|
||||
"taskqueue": {
|
||||
"$ref": "#/$defs/TaskQueuePermission"
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -224,6 +227,23 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"TaskQueuePermission": {
|
||||
"type": "object",
|
||||
"description": "Task queue permissions for background task processing",
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"reason": {
|
||||
"type": "string",
|
||||
"description": "Explanation for why task queue access is needed"
|
||||
},
|
||||
"maxConcurrency": {
|
||||
"type": "integer",
|
||||
"description": "Maximum total concurrent workers across all queues. Default: 1",
|
||||
"minimum": 1,
|
||||
"default": 1
|
||||
}
|
||||
}
|
||||
},
|
||||
"UsersPermission": {
|
||||
"type": "object",
|
||||
"description": "Users service permissions for accessing user information",
|
||||
|
||||
@@ -64,6 +64,21 @@ func ValidateWithCapabilities(m *Manifest, capabilities []Capability) error {
|
||||
return fmt.Errorf("scrobbler capability requires 'users' permission to be declared in manifest")
|
||||
}
|
||||
}
|
||||
|
||||
// Scheduler permission requires SchedulerCallback capability
|
||||
if m.Permissions != nil && m.Permissions.Scheduler != nil {
|
||||
if !hasCapability(capabilities, CapabilityScheduler) {
|
||||
return fmt.Errorf("'scheduler' permission requires plugin to export '%s' function", FuncSchedulerCallback)
|
||||
}
|
||||
}
|
||||
|
||||
// TaskQueue permission requires TaskWorker capability
|
||||
if m.Permissions != nil && m.Permissions.Taskqueue != nil {
|
||||
if !hasCapability(capabilities, CapabilityTaskWorker) {
|
||||
return fmt.Errorf("'taskqueue' permission requires plugin to export '%s' function", FuncTaskWorkerCallback)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -181,6 +181,9 @@ type Permissions struct {
|
||||
// Subsonicapi corresponds to the JSON schema field "subsonicapi".
|
||||
Subsonicapi *SubsonicAPIPermission `json:"subsonicapi,omitempty" yaml:"subsonicapi,omitempty" mapstructure:"subsonicapi,omitempty"`
|
||||
|
||||
// Taskqueue corresponds to the JSON schema field "taskqueue".
|
||||
Taskqueue *TaskQueuePermission `json:"taskqueue,omitempty" yaml:"taskqueue,omitempty" mapstructure:"taskqueue,omitempty"`
|
||||
|
||||
// Users corresponds to the JSON schema field "users".
|
||||
Users *UsersPermission `json:"users,omitempty" yaml:"users,omitempty" mapstructure:"users,omitempty"`
|
||||
|
||||
@@ -200,6 +203,36 @@ type SubsonicAPIPermission struct {
|
||||
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
|
||||
}
|
||||
|
||||
// Task queue permissions for background task processing
|
||||
type TaskQueuePermission struct {
|
||||
// Maximum total concurrent workers across all queues. Default: 1
|
||||
MaxConcurrency int `json:"maxConcurrency,omitempty" yaml:"maxConcurrency,omitempty" mapstructure:"maxConcurrency,omitempty"`
|
||||
|
||||
// Explanation for why task queue access is needed
|
||||
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements json.Unmarshaler.
|
||||
func (j *TaskQueuePermission) UnmarshalJSON(value []byte) error {
|
||||
var raw map[string]interface{}
|
||||
if err := json.Unmarshal(value, &raw); err != nil {
|
||||
return err
|
||||
}
|
||||
type Plain TaskQueuePermission
|
||||
var plain Plain
|
||||
if err := json.Unmarshal(value, &plain); err != nil {
|
||||
return err
|
||||
}
|
||||
if v, ok := raw["maxConcurrency"]; !ok || v == nil {
|
||||
plain.MaxConcurrency = 1.0
|
||||
}
|
||||
if 1 > plain.MaxConcurrency {
|
||||
return fmt.Errorf("field %s: must be >= %v", "maxConcurrency", 1)
|
||||
}
|
||||
*j = TaskQueuePermission(plain)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Enable experimental WebAssembly threads support
|
||||
type ThreadsFeature struct {
|
||||
// Explanation for why threads support is needed
|
||||
|
||||
@@ -6,3 +6,10 @@ require (
|
||||
github.com/extism/go-pdk v1.1.3
|
||||
github.com/stretchr/testify v1.11.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/stretchr/objx v0.5.2 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
@@ -38,10 +38,12 @@ The following host services are available:
|
||||
- Artwork: provides artwork public URL generation capabilities for plugins.
|
||||
- Cache: provides in-memory TTL-based caching capabilities for plugins.
|
||||
- Config: provides access to plugin configuration values.
|
||||
- HTTP: provides outbound HTTP request capabilities for plugins.
|
||||
- KVStore: provides persistent key-value storage for plugins.
|
||||
- Library: provides access to music library metadata for plugins.
|
||||
- Scheduler: provides task scheduling capabilities for plugins.
|
||||
- SubsonicAPI: provides access to Navidrome's Subsonic API from plugins.
|
||||
- TaskQueue: provides persistent task queues for plugins.
|
||||
- Users: provides access to user information for plugins.
|
||||
- WebSocket: provides WebSocket communication capabilities for plugins.
|
||||
|
||||
|
||||
89
plugins/pdk/go/host/nd_host_http.go
Normal file
89
plugins/pdk/go/host/nd_host_http.go
Normal file
@@ -0,0 +1,89 @@
|
||||
// Code generated by ndpgen. DO NOT EDIT.
|
||||
//
|
||||
// This file contains client wrappers for the HTTP host service.
|
||||
// It is intended for use in Navidrome plugins built with TinyGo.
|
||||
//
|
||||
//go:build wasip1
|
||||
|
||||
package host
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
|
||||
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
||||
)
|
||||
|
||||
// HTTPRequest represents the HTTPRequest data structure.
|
||||
// HTTPRequest represents an outbound HTTP request from a plugin.
|
||||
type HTTPRequest struct {
|
||||
Method string `json:"method"`
|
||||
URL string `json:"url"`
|
||||
Headers map[string]string `json:"headers"`
|
||||
Body []byte `json:"body"`
|
||||
TimeoutMs int32 `json:"timeoutMs"`
|
||||
}
|
||||
|
||||
// HTTPResponse represents the HTTPResponse data structure.
|
||||
// HTTPResponse represents the response from an outbound HTTP request.
|
||||
type HTTPResponse struct {
|
||||
StatusCode int32 `json:"statusCode"`
|
||||
Headers map[string]string `json:"headers"`
|
||||
Body []byte `json:"body"`
|
||||
}
|
||||
|
||||
// http_send is the host function provided by Navidrome.
|
||||
//
|
||||
//go:wasmimport extism:host/user http_send
|
||||
func http_send(uint64) uint64
|
||||
|
||||
type hTTPSendRequest struct {
|
||||
Request HTTPRequest `json:"request"`
|
||||
}
|
||||
|
||||
type hTTPSendResponse struct {
|
||||
Result *HTTPResponse `json:"result,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// HTTPSend calls the http_send host function.
|
||||
// Send executes an HTTP request and returns the response.
|
||||
//
|
||||
// Parameters:
|
||||
// - request: The HTTP request to execute, including method, URL, headers, body, and timeout
|
||||
//
|
||||
// Returns the HTTP response with status code, headers, and body.
|
||||
// Network errors, timeouts, and permission failures are returned as Go errors.
|
||||
// Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error.
|
||||
func HTTPSend(request HTTPRequest) (*HTTPResponse, error) {
|
||||
// Marshal request to JSON
|
||||
req := hTTPSendRequest{
|
||||
Request: request,
|
||||
}
|
||||
reqBytes, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
reqMem := pdk.AllocateBytes(reqBytes)
|
||||
defer reqMem.Free()
|
||||
|
||||
// Call the host function
|
||||
responsePtr := http_send(reqMem.Offset())
|
||||
|
||||
// Read the response from memory
|
||||
responseMem := pdk.FindMemory(responsePtr)
|
||||
responseBytes := responseMem.ReadBytes()
|
||||
|
||||
// Parse the response
|
||||
var response hTTPSendResponse
|
||||
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Convert Error field to Go error
|
||||
if response.Error != "" {
|
||||
return nil, errors.New(response.Error)
|
||||
}
|
||||
|
||||
return response.Result, nil
|
||||
}
|
||||
57
plugins/pdk/go/host/nd_host_http_stub.go
Normal file
57
plugins/pdk/go/host/nd_host_http_stub.go
Normal file
@@ -0,0 +1,57 @@
|
||||
// Code generated by ndpgen. DO NOT EDIT.
|
||||
//
|
||||
// This file contains mock implementations for non-WASM builds.
|
||||
// These mocks allow IDE support, compilation, and unit testing on non-WASM platforms.
|
||||
// Plugin authors can use the exported mock instances to set expectations in tests.
|
||||
//
|
||||
//go:build !wasip1
|
||||
|
||||
package host
|
||||
|
||||
import "github.com/stretchr/testify/mock"
|
||||
|
||||
// HTTPRequest represents the HTTPRequest data structure.
|
||||
// HTTPRequest represents an outbound HTTP request from a plugin.
|
||||
type HTTPRequest struct {
|
||||
Method string `json:"method"`
|
||||
URL string `json:"url"`
|
||||
Headers map[string]string `json:"headers"`
|
||||
Body []byte `json:"body"`
|
||||
TimeoutMs int32 `json:"timeoutMs"`
|
||||
}
|
||||
|
||||
// HTTPResponse represents the HTTPResponse data structure.
|
||||
// HTTPResponse represents the response from an outbound HTTP request.
|
||||
type HTTPResponse struct {
|
||||
StatusCode int32 `json:"statusCode"`
|
||||
Headers map[string]string `json:"headers"`
|
||||
Body []byte `json:"body"`
|
||||
}
|
||||
|
||||
// mockHTTPService is the mock implementation for testing.
|
||||
type mockHTTPService struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// HTTPMock is the auto-instantiated mock instance for testing.
|
||||
// Use this to set expectations: host.HTTPMock.On("MethodName", args...).Return(values...)
|
||||
var HTTPMock = &mockHTTPService{}
|
||||
|
||||
// Send is the mock method for HTTPSend.
|
||||
func (m *mockHTTPService) Send(request HTTPRequest) (*HTTPResponse, error) {
|
||||
args := m.Called(request)
|
||||
return args.Get(0).(*HTTPResponse), args.Error(1)
|
||||
}
|
||||
|
||||
// HTTPSend delegates to the mock instance.
|
||||
// Send executes an HTTP request and returns the response.
|
||||
//
|
||||
// Parameters:
|
||||
// - request: The HTTP request to execute, including method, URL, headers, body, and timeout
|
||||
//
|
||||
// Returns the HTTP response with status code, headers, and body.
|
||||
// Network errors, timeouts, and permission failures are returned as Go errors.
|
||||
// Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error.
|
||||
func HTTPSend(request HTTPRequest) (*HTTPResponse, error) {
|
||||
return HTTPMock.Send(request)
|
||||
}
|
||||
219
plugins/pdk/go/host/nd_host_taskqueue.go
Normal file
219
plugins/pdk/go/host/nd_host_taskqueue.go
Normal file
@@ -0,0 +1,219 @@
|
||||
// Code generated by ndpgen. DO NOT EDIT.
|
||||
//
|
||||
// This file contains client wrappers for the TaskQueue host service.
|
||||
// It is intended for use in Navidrome plugins built with TinyGo.
|
||||
//
|
||||
//go:build wasip1
|
||||
|
||||
package host
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
|
||||
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
||||
)
|
||||
|
||||
// QueueConfig represents the QueueConfig data structure.
|
||||
// QueueConfig holds configuration for a task queue.
|
||||
type QueueConfig struct {
|
||||
Concurrency int32 `json:"concurrency"`
|
||||
MaxRetries int32 `json:"maxRetries"`
|
||||
BackoffMs int64 `json:"backoffMs"`
|
||||
DelayMs int64 `json:"delayMs"`
|
||||
RetentionMs int64 `json:"retentionMs"`
|
||||
}
|
||||
|
||||
// taskqueue_createqueue is the host function provided by Navidrome.
|
||||
//
|
||||
//go:wasmimport extism:host/user taskqueue_createqueue
|
||||
func taskqueue_createqueue(uint64) uint64
|
||||
|
||||
// taskqueue_enqueue is the host function provided by Navidrome.
|
||||
//
|
||||
//go:wasmimport extism:host/user taskqueue_enqueue
|
||||
func taskqueue_enqueue(uint64) uint64
|
||||
|
||||
// taskqueue_gettaskstatus is the host function provided by Navidrome.
|
||||
//
|
||||
//go:wasmimport extism:host/user taskqueue_gettaskstatus
|
||||
func taskqueue_gettaskstatus(uint64) uint64
|
||||
|
||||
// taskqueue_canceltask is the host function provided by Navidrome.
|
||||
//
|
||||
//go:wasmimport extism:host/user taskqueue_canceltask
|
||||
func taskqueue_canceltask(uint64) uint64
|
||||
|
||||
type taskQueueCreateQueueRequest struct {
|
||||
Name string `json:"name"`
|
||||
Config QueueConfig `json:"config"`
|
||||
}
|
||||
|
||||
type taskQueueEnqueueRequest struct {
|
||||
QueueName string `json:"queueName"`
|
||||
Payload []byte `json:"payload"`
|
||||
}
|
||||
|
||||
type taskQueueEnqueueResponse struct {
|
||||
Result string `json:"result,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
type taskQueueGetTaskStatusRequest struct {
|
||||
TaskID string `json:"taskId"`
|
||||
}
|
||||
|
||||
type taskQueueGetTaskStatusResponse struct {
|
||||
Result string `json:"result,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
type taskQueueCancelTaskRequest struct {
|
||||
TaskID string `json:"taskId"`
|
||||
}
|
||||
|
||||
// TaskQueueCreateQueue calls the taskqueue_createqueue host function.
|
||||
// CreateQueue creates a named task queue with the given configuration.
|
||||
// Zero-value fields in config use sensible defaults.
|
||||
// If a queue with the same name already exists, returns an error.
|
||||
// On startup, this also recovers any stale "running" tasks from a previous crash.
|
||||
func TaskQueueCreateQueue(name string, config QueueConfig) error {
|
||||
// Marshal request to JSON
|
||||
req := taskQueueCreateQueueRequest{
|
||||
Name: name,
|
||||
Config: config,
|
||||
}
|
||||
reqBytes, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reqMem := pdk.AllocateBytes(reqBytes)
|
||||
defer reqMem.Free()
|
||||
|
||||
// Call the host function
|
||||
responsePtr := taskqueue_createqueue(reqMem.Offset())
|
||||
|
||||
// Read the response from memory
|
||||
responseMem := pdk.FindMemory(responsePtr)
|
||||
responseBytes := responseMem.ReadBytes()
|
||||
|
||||
// Parse error-only response
|
||||
var response struct {
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||
return err
|
||||
}
|
||||
if response.Error != "" {
|
||||
return errors.New(response.Error)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TaskQueueEnqueue calls the taskqueue_enqueue host function.
|
||||
// Enqueue adds a task to the named queue. Returns the task ID.
|
||||
// payload is opaque bytes passed back to the plugin on execution.
|
||||
func TaskQueueEnqueue(queueName string, payload []byte) (string, error) {
|
||||
// Marshal request to JSON
|
||||
req := taskQueueEnqueueRequest{
|
||||
QueueName: queueName,
|
||||
Payload: payload,
|
||||
}
|
||||
reqBytes, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
reqMem := pdk.AllocateBytes(reqBytes)
|
||||
defer reqMem.Free()
|
||||
|
||||
// Call the host function
|
||||
responsePtr := taskqueue_enqueue(reqMem.Offset())
|
||||
|
||||
// Read the response from memory
|
||||
responseMem := pdk.FindMemory(responsePtr)
|
||||
responseBytes := responseMem.ReadBytes()
|
||||
|
||||
// Parse the response
|
||||
var response taskQueueEnqueueResponse
|
||||
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Convert Error field to Go error
|
||||
if response.Error != "" {
|
||||
return "", errors.New(response.Error)
|
||||
}
|
||||
|
||||
return response.Result, nil
|
||||
}
|
||||
|
||||
// TaskQueueGetTaskStatus calls the taskqueue_gettaskstatus host function.
|
||||
// GetTaskStatus returns the status of a task: "pending", "running",
|
||||
// "completed", "failed", or "cancelled".
|
||||
func TaskQueueGetTaskStatus(taskID string) (string, error) {
|
||||
// Marshal request to JSON
|
||||
req := taskQueueGetTaskStatusRequest{
|
||||
TaskID: taskID,
|
||||
}
|
||||
reqBytes, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
reqMem := pdk.AllocateBytes(reqBytes)
|
||||
defer reqMem.Free()
|
||||
|
||||
// Call the host function
|
||||
responsePtr := taskqueue_gettaskstatus(reqMem.Offset())
|
||||
|
||||
// Read the response from memory
|
||||
responseMem := pdk.FindMemory(responsePtr)
|
||||
responseBytes := responseMem.ReadBytes()
|
||||
|
||||
// Parse the response
|
||||
var response taskQueueGetTaskStatusResponse
|
||||
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Convert Error field to Go error
|
||||
if response.Error != "" {
|
||||
return "", errors.New(response.Error)
|
||||
}
|
||||
|
||||
return response.Result, nil
|
||||
}
|
||||
|
||||
// TaskQueueCancelTask calls the taskqueue_canceltask host function.
|
||||
// CancelTask cancels a pending task. Returns error if already
|
||||
// running, completed, or failed.
|
||||
func TaskQueueCancelTask(taskID string) error {
|
||||
// Marshal request to JSON
|
||||
req := taskQueueCancelTaskRequest{
|
||||
TaskID: taskID,
|
||||
}
|
||||
reqBytes, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reqMem := pdk.AllocateBytes(reqBytes)
|
||||
defer reqMem.Free()
|
||||
|
||||
// Call the host function
|
||||
responsePtr := taskqueue_canceltask(reqMem.Offset())
|
||||
|
||||
// Read the response from memory
|
||||
responseMem := pdk.FindMemory(responsePtr)
|
||||
responseBytes := responseMem.ReadBytes()
|
||||
|
||||
// Parse error-only response
|
||||
var response struct {
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
||||
return err
|
||||
}
|
||||
if response.Error != "" {
|
||||
return errors.New(response.Error)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
84
plugins/pdk/go/host/nd_host_taskqueue_stub.go
Normal file
84
plugins/pdk/go/host/nd_host_taskqueue_stub.go
Normal file
@@ -0,0 +1,84 @@
|
||||
// Code generated by ndpgen. DO NOT EDIT.
|
||||
//
|
||||
// This file contains mock implementations for non-WASM builds.
|
||||
// These mocks allow IDE support, compilation, and unit testing on non-WASM platforms.
|
||||
// Plugin authors can use the exported mock instances to set expectations in tests.
|
||||
//
|
||||
//go:build !wasip1
|
||||
|
||||
package host
|
||||
|
||||
import "github.com/stretchr/testify/mock"
|
||||
|
||||
// QueueConfig represents the QueueConfig data structure.
|
||||
// QueueConfig holds configuration for a task queue.
|
||||
type QueueConfig struct {
|
||||
Concurrency int32 `json:"concurrency"`
|
||||
MaxRetries int32 `json:"maxRetries"`
|
||||
BackoffMs int64 `json:"backoffMs"`
|
||||
DelayMs int64 `json:"delayMs"`
|
||||
RetentionMs int64 `json:"retentionMs"`
|
||||
}
|
||||
|
||||
// mockTaskQueueService is the mock implementation for testing.
|
||||
type mockTaskQueueService struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// TaskQueueMock is the auto-instantiated mock instance for testing.
|
||||
// Use this to set expectations: host.TaskQueueMock.On("MethodName", args...).Return(values...)
|
||||
var TaskQueueMock = &mockTaskQueueService{}
|
||||
|
||||
// CreateQueue is the mock method for TaskQueueCreateQueue.
|
||||
func (m *mockTaskQueueService) CreateQueue(name string, config QueueConfig) error {
|
||||
args := m.Called(name, config)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
// TaskQueueCreateQueue delegates to the mock instance.
|
||||
// CreateQueue creates a named task queue with the given configuration.
|
||||
// Zero-value fields in config use sensible defaults.
|
||||
// If a queue with the same name already exists, returns an error.
|
||||
// On startup, this also recovers any stale "running" tasks from a previous crash.
|
||||
func TaskQueueCreateQueue(name string, config QueueConfig) error {
|
||||
return TaskQueueMock.CreateQueue(name, config)
|
||||
}
|
||||
|
||||
// Enqueue is the mock method for TaskQueueEnqueue.
|
||||
func (m *mockTaskQueueService) Enqueue(queueName string, payload []byte) (string, error) {
|
||||
args := m.Called(queueName, payload)
|
||||
return args.String(0), args.Error(1)
|
||||
}
|
||||
|
||||
// TaskQueueEnqueue delegates to the mock instance.
|
||||
// Enqueue adds a task to the named queue. Returns the task ID.
|
||||
// payload is opaque bytes passed back to the plugin on execution.
|
||||
func TaskQueueEnqueue(queueName string, payload []byte) (string, error) {
|
||||
return TaskQueueMock.Enqueue(queueName, payload)
|
||||
}
|
||||
|
||||
// GetTaskStatus is the mock method for TaskQueueGetTaskStatus.
|
||||
func (m *mockTaskQueueService) GetTaskStatus(taskID string) (string, error) {
|
||||
args := m.Called(taskID)
|
||||
return args.String(0), args.Error(1)
|
||||
}
|
||||
|
||||
// TaskQueueGetTaskStatus delegates to the mock instance.
|
||||
// GetTaskStatus returns the status of a task: "pending", "running",
|
||||
// "completed", "failed", or "cancelled".
|
||||
func TaskQueueGetTaskStatus(taskID string) (string, error) {
|
||||
return TaskQueueMock.GetTaskStatus(taskID)
|
||||
}
|
||||
|
||||
// CancelTask is the mock method for TaskQueueCancelTask.
|
||||
func (m *mockTaskQueueService) CancelTask(taskID string) error {
|
||||
args := m.Called(taskID)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
// TaskQueueCancelTask delegates to the mock instance.
|
||||
// CancelTask cancels a pending task. Returns error if already
|
||||
// running, completed, or failed.
|
||||
func TaskQueueCancelTask(taskID string) error {
|
||||
return TaskQueueMock.CancelTask(taskID)
|
||||
}
|
||||
86
plugins/pdk/go/taskworker/taskworker.go
Normal file
86
plugins/pdk/go/taskworker/taskworker.go
Normal file
@@ -0,0 +1,86 @@
|
||||
// Code generated by ndpgen. DO NOT EDIT.
|
||||
//
|
||||
// This file contains export wrappers for the TaskWorker capability.
|
||||
// It is intended for use in Navidrome plugins built with TinyGo.
|
||||
//
|
||||
//go:build wasip1
|
||||
|
||||
package taskworker
|
||||
|
||||
import (
|
||||
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
||||
)
|
||||
|
||||
// TaskExecuteRequest is the request provided when a task is ready to execute.
|
||||
type TaskExecuteRequest struct {
|
||||
// QueueName is the name of the queue this task belongs to.
|
||||
QueueName string `json:"queueName"`
|
||||
// TaskID is the unique identifier for this task.
|
||||
TaskID string `json:"taskId"`
|
||||
// Payload is the opaque data provided when the task was enqueued.
|
||||
Payload []byte `json:"payload"`
|
||||
// Attempt is the current attempt number (1-based: first attempt = 1).
|
||||
Attempt int32 `json:"attempt"`
|
||||
}
|
||||
|
||||
// TaskExecuteResponse is the response from task execution.
|
||||
type TaskExecuteResponse struct {
|
||||
// Error, if non-empty, indicates the task failed. The task will be retried
|
||||
// if retries are configured and attempts remain.
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// TaskWorker is the marker interface for taskworker plugins.
|
||||
// Implement one or more of the provider interfaces below.
|
||||
// TaskWorker provides task execution handling.
|
||||
// This capability allows plugins to receive callbacks when their queued tasks
|
||||
// are ready to execute. Plugins that use the taskqueue host service must
|
||||
// implement this capability.
|
||||
type TaskWorker interface{}
|
||||
|
||||
// TaskExecuteProvider provides the OnTaskExecute function.
|
||||
type TaskExecuteProvider interface {
|
||||
OnTaskExecute(TaskExecuteRequest) (TaskExecuteResponse, error)
|
||||
} // Internal implementation holders
|
||||
var (
|
||||
taskExecuteImpl func(TaskExecuteRequest) (TaskExecuteResponse, error)
|
||||
)
|
||||
|
||||
// Register registers a taskworker implementation.
|
||||
// The implementation is checked for optional provider interfaces.
|
||||
func Register(impl TaskWorker) {
|
||||
if p, ok := impl.(TaskExecuteProvider); ok {
|
||||
taskExecuteImpl = p.OnTaskExecute
|
||||
}
|
||||
}
|
||||
|
||||
// NotImplementedCode is the standard return code for unimplemented functions.
|
||||
// The host recognizes this and skips the plugin gracefully.
|
||||
const NotImplementedCode int32 = -2
|
||||
|
||||
//go:wasmexport nd_task_execute
|
||||
func _NdTaskExecute() int32 {
|
||||
if taskExecuteImpl == nil {
|
||||
// Return standard code - host will skip this plugin gracefully
|
||||
return NotImplementedCode
|
||||
}
|
||||
|
||||
var input TaskExecuteRequest
|
||||
if err := pdk.InputJSON(&input); err != nil {
|
||||
pdk.SetError(err)
|
||||
return -1
|
||||
}
|
||||
|
||||
output, err := taskExecuteImpl(input)
|
||||
if err != nil {
|
||||
pdk.SetError(err)
|
||||
return -1
|
||||
}
|
||||
|
||||
if err := pdk.OutputJSON(output); err != nil {
|
||||
pdk.SetError(err)
|
||||
return -1
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
48
plugins/pdk/go/taskworker/taskworker_stub.go
Normal file
48
plugins/pdk/go/taskworker/taskworker_stub.go
Normal file
@@ -0,0 +1,48 @@
|
||||
// Code generated by ndpgen. DO NOT EDIT.
|
||||
//
|
||||
// This file provides stub implementations for non-WASM platforms.
|
||||
// It allows Go plugins to compile and run tests outside of WASM,
|
||||
// but the actual functionality is only available in WASM builds.
|
||||
//
|
||||
//go:build !wasip1
|
||||
|
||||
package taskworker
|
||||
|
||||
// TaskExecuteRequest is the request provided when a task is ready to execute.
|
||||
type TaskExecuteRequest struct {
|
||||
// QueueName is the name of the queue this task belongs to.
|
||||
QueueName string `json:"queueName"`
|
||||
// TaskID is the unique identifier for this task.
|
||||
TaskID string `json:"taskId"`
|
||||
// Payload is the opaque data provided when the task was enqueued.
|
||||
Payload []byte `json:"payload"`
|
||||
// Attempt is the current attempt number (1-based: first attempt = 1).
|
||||
Attempt int32 `json:"attempt"`
|
||||
}
|
||||
|
||||
// TaskExecuteResponse is the response from task execution.
|
||||
type TaskExecuteResponse struct {
|
||||
// Error, if non-empty, indicates the task failed. The task will be retried
|
||||
// if retries are configured and attempts remain.
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// TaskWorker is the marker interface for taskworker plugins.
|
||||
// Implement one or more of the provider interfaces below.
|
||||
// TaskWorker provides task execution handling.
|
||||
// This capability allows plugins to receive callbacks when their queued tasks
|
||||
// are ready to execute. Plugins that use the taskqueue host service must
|
||||
// implement this capability.
|
||||
type TaskWorker interface{}
|
||||
|
||||
// TaskExecuteProvider provides the OnTaskExecute function.
|
||||
type TaskExecuteProvider interface {
|
||||
OnTaskExecute(TaskExecuteRequest) (TaskExecuteResponse, error)
|
||||
}
|
||||
|
||||
// NotImplementedCode is the standard return code for unimplemented functions.
|
||||
const NotImplementedCode int32 = -2
|
||||
|
||||
// Register is a no-op on non-WASM platforms.
|
||||
// This stub allows code to compile outside of WASM.
|
||||
func Register(_ TaskWorker) {}
|
||||
59
plugins/pdk/python/host/nd_host_http.py
Normal file
59
plugins/pdk/python/host/nd_host_http.py
Normal file
@@ -0,0 +1,59 @@
|
||||
# Code generated by ndpgen. DO NOT EDIT.
|
||||
#
|
||||
# This file contains client wrappers for the HTTP host service.
|
||||
# It is intended for use in Navidrome plugins built with extism-py.
|
||||
#
|
||||
# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly.
|
||||
# The @extism.import_fn decorators are only detected when defined in the plugin's
|
||||
# main __init__.py file. Copy the needed functions from this file into your plugin.
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
import extism
|
||||
import json
|
||||
|
||||
|
||||
class HostFunctionError(Exception):
|
||||
"""Raised when a host function returns an error."""
|
||||
pass
|
||||
|
||||
|
||||
@extism.import_fn("extism:host/user", "http_send")
|
||||
def _http_send(offset: int) -> int:
|
||||
"""Raw host function - do not call directly."""
|
||||
...
|
||||
|
||||
|
||||
def http_send(request: Any) -> Any:
|
||||
"""Send executes an HTTP request and returns the response.
|
||||
|
||||
Parameters:
|
||||
- request: The HTTP request to execute, including method, URL, headers, body, and timeout
|
||||
|
||||
Returns the HTTP response with status code, headers, and body.
|
||||
Network errors, timeouts, and permission failures are returned as Go errors.
|
||||
Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error.
|
||||
|
||||
Args:
|
||||
request: Any parameter.
|
||||
|
||||
Returns:
|
||||
Any: The result value.
|
||||
|
||||
Raises:
|
||||
HostFunctionError: If the host function returns an error.
|
||||
"""
|
||||
request = {
|
||||
"request": request,
|
||||
}
|
||||
request_bytes = json.dumps(request).encode("utf-8")
|
||||
request_mem = extism.memory.alloc(request_bytes)
|
||||
response_offset = _http_send(request_mem.offset)
|
||||
response_mem = extism.memory.find(response_offset)
|
||||
response = json.loads(extism.memory.string(response_mem))
|
||||
|
||||
if response.get("error"):
|
||||
raise HostFunctionError(response["error"])
|
||||
|
||||
return response.get("result", None)
|
||||
59
plugins/pdk/python/host/nd_host_httpclient.py
Normal file
59
plugins/pdk/python/host/nd_host_httpclient.py
Normal file
@@ -0,0 +1,59 @@
|
||||
# Code generated by ndpgen. DO NOT EDIT.
|
||||
#
|
||||
# This file contains client wrappers for the HTTP host service.
|
||||
# It is intended for use in Navidrome plugins built with extism-py.
|
||||
#
|
||||
# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly.
|
||||
# The @extism.import_fn decorators are only detected when defined in the plugin's
|
||||
# main __init__.py file. Copy the needed functions from this file into your plugin.
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
import extism
|
||||
import json
|
||||
|
||||
|
||||
class HostFunctionError(Exception):
|
||||
"""Raised when a host function returns an error."""
|
||||
pass
|
||||
|
||||
|
||||
@extism.import_fn("extism:host/user", "http_send")
|
||||
def _http_send(offset: int) -> int:
|
||||
"""Raw host function - do not call directly."""
|
||||
...
|
||||
|
||||
|
||||
def http_send(request: Any) -> Any:
|
||||
"""Send executes an HTTP request and returns the response.
|
||||
|
||||
Parameters:
|
||||
- request: The HTTP request to execute, including method, URL, headers, body, and timeout
|
||||
|
||||
Returns the HTTP response with status code, headers, and body.
|
||||
Network errors, timeouts, and permission failures are returned as errors.
|
||||
Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error.
|
||||
|
||||
Args:
|
||||
request: Any parameter.
|
||||
|
||||
Returns:
|
||||
Any: The result value.
|
||||
|
||||
Raises:
|
||||
HostFunctionError: If the host function returns an error.
|
||||
"""
|
||||
request = {
|
||||
"request": request,
|
||||
}
|
||||
request_bytes = json.dumps(request).encode("utf-8")
|
||||
request_mem = extism.memory.alloc(request_bytes)
|
||||
response_offset = _http_send(request_mem.offset)
|
||||
response_mem = extism.memory.find(response_offset)
|
||||
response = json.loads(extism.memory.string(response_mem))
|
||||
|
||||
if response.get("error"):
|
||||
raise HostFunctionError(response["error"])
|
||||
|
||||
return response.get("result", None)
|
||||
153
plugins/pdk/python/host/nd_host_taskqueue.py
Normal file
153
plugins/pdk/python/host/nd_host_taskqueue.py
Normal file
@@ -0,0 +1,153 @@
|
||||
# Code generated by ndpgen. DO NOT EDIT.
|
||||
#
|
||||
# This file contains client wrappers for the TaskQueue host service.
|
||||
# It is intended for use in Navidrome plugins built with extism-py.
|
||||
#
|
||||
# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly.
|
||||
# The @extism.import_fn decorators are only detected when defined in the plugin's
|
||||
# main __init__.py file. Copy the needed functions from this file into your plugin.
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
import extism
|
||||
import json
|
||||
|
||||
|
||||
class HostFunctionError(Exception):
|
||||
"""Raised when a host function returns an error."""
|
||||
pass
|
||||
|
||||
|
||||
@extism.import_fn("extism:host/user", "taskqueue_createqueue")
|
||||
def _taskqueue_createqueue(offset: int) -> int:
|
||||
"""Raw host function - do not call directly."""
|
||||
...
|
||||
|
||||
|
||||
@extism.import_fn("extism:host/user", "taskqueue_enqueue")
|
||||
def _taskqueue_enqueue(offset: int) -> int:
|
||||
"""Raw host function - do not call directly."""
|
||||
...
|
||||
|
||||
|
||||
@extism.import_fn("extism:host/user", "taskqueue_gettaskstatus")
|
||||
def _taskqueue_gettaskstatus(offset: int) -> int:
|
||||
"""Raw host function - do not call directly."""
|
||||
...
|
||||
|
||||
|
||||
@extism.import_fn("extism:host/user", "taskqueue_canceltask")
|
||||
def _taskqueue_canceltask(offset: int) -> int:
|
||||
"""Raw host function - do not call directly."""
|
||||
...
|
||||
|
||||
|
||||
def taskqueue_create_queue(name: str, config: Any) -> None:
|
||||
"""CreateQueue creates a named task queue with the given configuration.
|
||||
Zero-value fields in config use sensible defaults.
|
||||
If a queue with the same name already exists, returns an error.
|
||||
On startup, this also recovers any stale "running" tasks from a previous crash.
|
||||
|
||||
Args:
|
||||
name: str parameter.
|
||||
config: Any parameter.
|
||||
|
||||
Raises:
|
||||
HostFunctionError: If the host function returns an error.
|
||||
"""
|
||||
request = {
|
||||
"name": name,
|
||||
"config": config,
|
||||
}
|
||||
request_bytes = json.dumps(request).encode("utf-8")
|
||||
request_mem = extism.memory.alloc(request_bytes)
|
||||
response_offset = _taskqueue_createqueue(request_mem.offset)
|
||||
response_mem = extism.memory.find(response_offset)
|
||||
response = json.loads(extism.memory.string(response_mem))
|
||||
|
||||
if response.get("error"):
|
||||
raise HostFunctionError(response["error"])
|
||||
|
||||
|
||||
|
||||
def taskqueue_enqueue(queue_name: str, payload: bytes) -> str:
|
||||
"""Enqueue adds a task to the named queue. Returns the task ID.
|
||||
payload is opaque bytes passed back to the plugin on execution.
|
||||
|
||||
Args:
|
||||
queue_name: str parameter.
|
||||
payload: bytes parameter.
|
||||
|
||||
Returns:
|
||||
str: The result value.
|
||||
|
||||
Raises:
|
||||
HostFunctionError: If the host function returns an error.
|
||||
"""
|
||||
request = {
|
||||
"queueName": queue_name,
|
||||
"payload": payload,
|
||||
}
|
||||
request_bytes = json.dumps(request).encode("utf-8")
|
||||
request_mem = extism.memory.alloc(request_bytes)
|
||||
response_offset = _taskqueue_enqueue(request_mem.offset)
|
||||
response_mem = extism.memory.find(response_offset)
|
||||
response = json.loads(extism.memory.string(response_mem))
|
||||
|
||||
if response.get("error"):
|
||||
raise HostFunctionError(response["error"])
|
||||
|
||||
return response.get("result", "")
|
||||
|
||||
|
||||
def taskqueue_get_task_status(task_id: str) -> str:
|
||||
"""GetTaskStatus returns the status of a task: "pending", "running",
|
||||
"completed", "failed", or "cancelled".
|
||||
|
||||
Args:
|
||||
task_id: str parameter.
|
||||
|
||||
Returns:
|
||||
str: The result value.
|
||||
|
||||
Raises:
|
||||
HostFunctionError: If the host function returns an error.
|
||||
"""
|
||||
request = {
|
||||
"taskId": task_id,
|
||||
}
|
||||
request_bytes = json.dumps(request).encode("utf-8")
|
||||
request_mem = extism.memory.alloc(request_bytes)
|
||||
response_offset = _taskqueue_gettaskstatus(request_mem.offset)
|
||||
response_mem = extism.memory.find(response_offset)
|
||||
response = json.loads(extism.memory.string(response_mem))
|
||||
|
||||
if response.get("error"):
|
||||
raise HostFunctionError(response["error"])
|
||||
|
||||
return response.get("result", "")
|
||||
|
||||
|
||||
def taskqueue_cancel_task(task_id: str) -> None:
|
||||
"""CancelTask cancels a pending task. Returns error if already
|
||||
running, completed, or failed.
|
||||
|
||||
Args:
|
||||
task_id: str parameter.
|
||||
|
||||
Raises:
|
||||
HostFunctionError: If the host function returns an error.
|
||||
"""
|
||||
request = {
|
||||
"taskId": task_id,
|
||||
}
|
||||
request_bytes = json.dumps(request).encode("utf-8")
|
||||
request_mem = extism.memory.alloc(request_bytes)
|
||||
response_offset = _taskqueue_canceltask(request_mem.offset)
|
||||
response_mem = extism.memory.find(response_offset)
|
||||
response = json.loads(extism.memory.string(response_mem))
|
||||
|
||||
if response.get("error"):
|
||||
raise HostFunctionError(response["error"])
|
||||
|
||||
@@ -9,4 +9,5 @@ pub mod lifecycle;
|
||||
pub mod metadata;
|
||||
pub mod scheduler;
|
||||
pub mod scrobbler;
|
||||
pub mod taskworker;
|
||||
pub mod websocket;
|
||||
|
||||
87
plugins/pdk/rust/nd-pdk-capabilities/src/taskworker.rs
Normal file
87
plugins/pdk/rust/nd-pdk-capabilities/src/taskworker.rs
Normal file
@@ -0,0 +1,87 @@
|
||||
// Code generated by ndpgen. DO NOT EDIT.
|
||||
//
|
||||
// This file contains export wrappers for the TaskWorker capability.
|
||||
// It is intended for use in Navidrome plugins built with extism-pdk.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
// Helper functions for skip_serializing_if with numeric types
|
||||
#[allow(dead_code)]
|
||||
fn is_zero_i32(value: &i32) -> bool { *value == 0 }
|
||||
#[allow(dead_code)]
|
||||
fn is_zero_u32(value: &u32) -> bool { *value == 0 }
|
||||
#[allow(dead_code)]
|
||||
fn is_zero_i64(value: &i64) -> bool { *value == 0 }
|
||||
#[allow(dead_code)]
|
||||
fn is_zero_u64(value: &u64) -> bool { *value == 0 }
|
||||
#[allow(dead_code)]
|
||||
fn is_zero_f32(value: &f32) -> bool { *value == 0.0 }
|
||||
#[allow(dead_code)]
|
||||
fn is_zero_f64(value: &f64) -> bool { *value == 0.0 }
|
||||
/// TaskExecuteRequest is the request provided when a task is ready to execute.
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct TaskExecuteRequest {
|
||||
/// QueueName is the name of the queue this task belongs to.
|
||||
#[serde(default)]
|
||||
pub queue_name: String,
|
||||
/// TaskID is the unique identifier for this task.
|
||||
#[serde(default)]
|
||||
pub task_id: String,
|
||||
/// Payload is the opaque data provided when the task was enqueued.
|
||||
#[serde(default)]
|
||||
pub payload: Vec<u8>,
|
||||
/// Attempt is the current attempt number (1-based: first attempt = 1).
|
||||
#[serde(default)]
|
||||
pub attempt: i32,
|
||||
}
|
||||
/// TaskExecuteResponse is the response from task execution.
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct TaskExecuteResponse {
|
||||
/// Error, if non-empty, indicates the task failed. The task will be retried
|
||||
/// if retries are configured and attempts remain.
|
||||
#[serde(default, skip_serializing_if = "String::is_empty")]
|
||||
pub error: String,
|
||||
}
|
||||
|
||||
/// Error represents an error from a capability method.
|
||||
#[derive(Debug)]
|
||||
pub struct Error {
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.message)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for Error {}
|
||||
|
||||
impl Error {
|
||||
pub fn new(message: impl Into<String>) -> Self {
|
||||
Self { message: message.into() }
|
||||
}
|
||||
}
|
||||
|
||||
/// TaskExecuteProvider provides the OnTaskExecute function.
|
||||
pub trait TaskExecuteProvider {
|
||||
fn on_task_execute(&self, req: TaskExecuteRequest) -> Result<TaskExecuteResponse, Error>;
|
||||
}
|
||||
|
||||
/// Register the on_task_execute export.
|
||||
/// This macro generates the WASM export function for this method.
|
||||
#[macro_export]
|
||||
macro_rules! register_taskworker_task_execute {
|
||||
($plugin_type:ty) => {
|
||||
#[extism_pdk::plugin_fn]
|
||||
pub fn nd_task_execute(
|
||||
req: extism_pdk::Json<$crate::taskworker::TaskExecuteRequest>
|
||||
) -> extism_pdk::FnResult<extism_pdk::Json<$crate::taskworker::TaskExecuteResponse>> {
|
||||
let plugin = <$plugin_type>::default();
|
||||
let result = $crate::taskworker::TaskExecuteProvider::on_task_execute(&plugin, req.into_inner())?;
|
||||
Ok(extism_pdk::Json(result))
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -35,10 +35,12 @@
|
||||
//! - [`artwork`] - provides artwork public URL generation capabilities for plugins.
|
||||
//! - [`cache`] - provides in-memory TTL-based caching capabilities for plugins.
|
||||
//! - [`config`] - provides access to plugin configuration values.
|
||||
//! - [`http`] - provides outbound HTTP request capabilities for plugins.
|
||||
//! - [`kvstore`] - provides persistent key-value storage for plugins.
|
||||
//! - [`library`] - provides access to music library metadata for plugins.
|
||||
//! - [`scheduler`] - provides task scheduling capabilities for plugins.
|
||||
//! - [`subsonicapi`] - provides access to Navidrome's Subsonic API from plugins.
|
||||
//! - [`taskqueue`] - provides persistent task queues for plugins.
|
||||
//! - [`users`] - provides access to user information for plugins.
|
||||
//! - [`websocket`] - provides WebSocket communication capabilities for plugins.
|
||||
|
||||
@@ -63,6 +65,13 @@ pub mod config {
|
||||
pub use super::nd_host_config::*;
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
mod nd_host_http;
|
||||
/// provides outbound HTTP request capabilities for plugins.
|
||||
pub mod http {
|
||||
pub use super::nd_host_http::*;
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
mod nd_host_kvstore;
|
||||
/// provides persistent key-value storage for plugins.
|
||||
@@ -91,6 +100,13 @@ pub mod subsonicapi {
|
||||
pub use super::nd_host_subsonicapi::*;
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
mod nd_host_taskqueue;
|
||||
/// provides persistent task queues for plugins.
|
||||
pub mod taskqueue {
|
||||
pub use super::nd_host_taskqueue::*;
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
mod nd_host_users;
|
||||
/// provides access to user information for plugins.
|
||||
|
||||
83
plugins/pdk/rust/nd-pdk-host/src/nd_host_http.rs
Normal file
83
plugins/pdk/rust/nd-pdk-host/src/nd_host_http.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
// Code generated by ndpgen. DO NOT EDIT.
|
||||
//
|
||||
// This file contains client wrappers for the HTTP host service.
|
||||
// It is intended for use in Navidrome plugins built with extism-pdk.
|
||||
|
||||
use extism_pdk::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// HTTPRequest represents an outbound HTTP request from a plugin.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct HTTPRequest {
|
||||
pub method: String,
|
||||
pub url: String,
|
||||
#[serde(default)]
|
||||
pub headers: std::collections::HashMap<String, String>,
|
||||
#[serde(default)]
|
||||
pub body: Vec<u8>,
|
||||
#[serde(default)]
|
||||
pub timeout_ms: i32,
|
||||
}
|
||||
|
||||
/// HTTPResponse represents the response from an outbound HTTP request.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct HTTPResponse {
|
||||
pub status_code: i32,
|
||||
#[serde(default)]
|
||||
pub headers: std::collections::HashMap<String, String>,
|
||||
#[serde(default)]
|
||||
pub body: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct HTTPSendRequest {
|
||||
request: HTTPRequest,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct HTTPSendResponse {
|
||||
#[serde(default)]
|
||||
result: Option<HTTPResponse>,
|
||||
#[serde(default)]
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
#[host_fn]
|
||||
extern "ExtismHost" {
|
||||
fn http_send(input: Json<HTTPSendRequest>) -> Json<HTTPSendResponse>;
|
||||
}
|
||||
|
||||
/// Send executes an HTTP request and returns the response.
|
||||
///
|
||||
/// Parameters:
|
||||
/// - request: The HTTP request to execute, including method, URL, headers, body, and timeout
|
||||
///
|
||||
/// Returns the HTTP response with status code, headers, and body.
|
||||
/// Network errors, timeouts, and permission failures are returned as Go errors.
|
||||
/// Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `request` - HTTPRequest parameter.
|
||||
///
|
||||
/// # Returns
|
||||
/// The result value.
|
||||
///
|
||||
/// # Errors
|
||||
/// Returns an error if the host function call fails.
|
||||
pub fn send(request: HTTPRequest) -> Result<Option<HTTPResponse>, Error> {
|
||||
let response = unsafe {
|
||||
http_send(Json(HTTPSendRequest {
|
||||
request: request,
|
||||
}))?
|
||||
};
|
||||
|
||||
if let Some(err) = response.0.error {
|
||||
return Err(Error::msg(err));
|
||||
}
|
||||
|
||||
Ok(response.0.result)
|
||||
}
|
||||
184
plugins/pdk/rust/nd-pdk-host/src/nd_host_taskqueue.rs
Normal file
184
plugins/pdk/rust/nd-pdk-host/src/nd_host_taskqueue.rs
Normal file
@@ -0,0 +1,184 @@
|
||||
// Code generated by ndpgen. DO NOT EDIT.
|
||||
//
|
||||
// This file contains client wrappers for the TaskQueue host service.
|
||||
// It is intended for use in Navidrome plugins built with extism-pdk.
|
||||
|
||||
use extism_pdk::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// QueueConfig holds configuration for a task queue.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct QueueConfig {
|
||||
pub concurrency: i32,
|
||||
pub max_retries: i32,
|
||||
pub backoff_ms: i64,
|
||||
pub delay_ms: i64,
|
||||
pub retention_ms: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct TaskQueueCreateQueueRequest {
|
||||
name: String,
|
||||
config: QueueConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct TaskQueueCreateQueueResponse {
|
||||
#[serde(default)]
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct TaskQueueEnqueueRequest {
|
||||
queue_name: String,
|
||||
payload: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct TaskQueueEnqueueResponse {
|
||||
#[serde(default)]
|
||||
result: String,
|
||||
#[serde(default)]
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct TaskQueueGetTaskStatusRequest {
|
||||
task_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct TaskQueueGetTaskStatusResponse {
|
||||
#[serde(default)]
|
||||
result: String,
|
||||
#[serde(default)]
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct TaskQueueCancelTaskRequest {
|
||||
task_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct TaskQueueCancelTaskResponse {
|
||||
#[serde(default)]
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
#[host_fn]
|
||||
extern "ExtismHost" {
|
||||
fn taskqueue_createqueue(input: Json<TaskQueueCreateQueueRequest>) -> Json<TaskQueueCreateQueueResponse>;
|
||||
fn taskqueue_enqueue(input: Json<TaskQueueEnqueueRequest>) -> Json<TaskQueueEnqueueResponse>;
|
||||
fn taskqueue_gettaskstatus(input: Json<TaskQueueGetTaskStatusRequest>) -> Json<TaskQueueGetTaskStatusResponse>;
|
||||
fn taskqueue_canceltask(input: Json<TaskQueueCancelTaskRequest>) -> Json<TaskQueueCancelTaskResponse>;
|
||||
}
|
||||
|
||||
/// CreateQueue creates a named task queue with the given configuration.
|
||||
/// Zero-value fields in config use sensible defaults.
|
||||
/// If a queue with the same name already exists, returns an error.
|
||||
/// On startup, this also recovers any stale "running" tasks from a previous crash.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `name` - String parameter.
|
||||
/// * `config` - QueueConfig parameter.
|
||||
///
|
||||
/// # Errors
|
||||
/// Returns an error if the host function call fails.
|
||||
pub fn create_queue(name: &str, config: QueueConfig) -> Result<(), Error> {
|
||||
let response = unsafe {
|
||||
taskqueue_createqueue(Json(TaskQueueCreateQueueRequest {
|
||||
name: name.to_owned(),
|
||||
config: config,
|
||||
}))?
|
||||
};
|
||||
|
||||
if let Some(err) = response.0.error {
|
||||
return Err(Error::msg(err));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Enqueue adds a task to the named queue. Returns the task ID.
|
||||
/// payload is opaque bytes passed back to the plugin on execution.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `queue_name` - String parameter.
|
||||
/// * `payload` - Vec<u8> parameter.
|
||||
///
|
||||
/// # Returns
|
||||
/// The result value.
|
||||
///
|
||||
/// # Errors
|
||||
/// Returns an error if the host function call fails.
|
||||
pub fn enqueue(queue_name: &str, payload: Vec<u8>) -> Result<String, Error> {
|
||||
let response = unsafe {
|
||||
taskqueue_enqueue(Json(TaskQueueEnqueueRequest {
|
||||
queue_name: queue_name.to_owned(),
|
||||
payload: payload,
|
||||
}))?
|
||||
};
|
||||
|
||||
if let Some(err) = response.0.error {
|
||||
return Err(Error::msg(err));
|
||||
}
|
||||
|
||||
Ok(response.0.result)
|
||||
}
|
||||
|
||||
/// GetTaskStatus returns the status of a task: "pending", "running",
|
||||
/// "completed", "failed", or "cancelled".
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `task_id` - String parameter.
|
||||
///
|
||||
/// # Returns
|
||||
/// The result value.
|
||||
///
|
||||
/// # Errors
|
||||
/// Returns an error if the host function call fails.
|
||||
pub fn get_task_status(task_id: &str) -> Result<String, Error> {
|
||||
let response = unsafe {
|
||||
taskqueue_gettaskstatus(Json(TaskQueueGetTaskStatusRequest {
|
||||
task_id: task_id.to_owned(),
|
||||
}))?
|
||||
};
|
||||
|
||||
if let Some(err) = response.0.error {
|
||||
return Err(Error::msg(err));
|
||||
}
|
||||
|
||||
Ok(response.0.result)
|
||||
}
|
||||
|
||||
/// CancelTask cancels a pending task. Returns error if already
|
||||
/// running, completed, or failed.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `task_id` - String parameter.
|
||||
///
|
||||
/// # Errors
|
||||
/// Returns an error if the host function call fails.
|
||||
pub fn cancel_task(task_id: &str) -> Result<(), Error> {
|
||||
let response = unsafe {
|
||||
taskqueue_canceltask(Json(TaskQueueCancelTaskRequest {
|
||||
task_id: task_id.to_owned(),
|
||||
}))?
|
||||
};
|
||||
|
||||
if let Some(err) = response.0.error {
|
||||
return Err(Error::msg(err));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
16
plugins/testdata/test-taskqueue/go.mod
vendored
Normal file
16
plugins/testdata/test-taskqueue/go.mod
vendored
Normal file
@@ -0,0 +1,16 @@
|
||||
module test-taskqueue
|
||||
|
||||
go 1.25
|
||||
|
||||
require github.com/navidrome/navidrome/plugins/pdk/go v0.0.0
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/extism/go-pdk v1.1.3 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/stretchr/objx v0.5.2 // indirect
|
||||
github.com/stretchr/testify v1.11.1 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
replace github.com/navidrome/navidrome/plugins/pdk/go => ../../pdk/go
|
||||
14
plugins/testdata/test-taskqueue/go.sum
vendored
Normal file
14
plugins/testdata/test-taskqueue/go.sum
vendored
Normal file
@@ -0,0 +1,14 @@
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/extism/go-pdk v1.1.3 h1:hfViMPWrqjN6u67cIYRALZTZLk/enSPpNKa+rZ9X2SQ=
|
||||
github.com/extism/go-pdk v1.1.3/go.mod h1:Gz+LIU/YCKnKXhgge8yo5Yu1F/lbv7KtKFkiCSzW/P4=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
|
||||
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
100
plugins/testdata/test-taskqueue/main.go
vendored
Normal file
100
plugins/testdata/test-taskqueue/main.go
vendored
Normal file
@@ -0,0 +1,100 @@
|
||||
// Test TaskQueue plugin for Navidrome plugin system integration tests.
|
||||
// Build with: tinygo build -o ../test-taskqueue.wasm -target wasip1 -buildmode=c-shared .
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/navidrome/navidrome/plugins/pdk/go/host"
|
||||
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
||||
"github.com/navidrome/navidrome/plugins/pdk/go/taskworker"
|
||||
)
|
||||
|
||||
func init() {
|
||||
taskworker.Register(&handler{})
|
||||
}
|
||||
|
||||
type handler struct{}
|
||||
|
||||
func (h *handler) OnTaskExecute(req taskworker.TaskExecuteRequest) (taskworker.TaskExecuteResponse, error) {
|
||||
payload := string(req.Payload)
|
||||
if payload == "fail" {
|
||||
return taskworker.TaskExecuteResponse{Error: "task failed as instructed"}, nil
|
||||
}
|
||||
if payload == "fail-then-succeed" && req.Attempt < 2 {
|
||||
return taskworker.TaskExecuteResponse{Error: "transient failure"}, nil
|
||||
}
|
||||
return taskworker.TaskExecuteResponse{}, nil
|
||||
}
|
||||
|
||||
// Test helper types
|
||||
type TestInput struct {
|
||||
Operation string `json:"operation"`
|
||||
QueueName string `json:"queueName,omitempty"`
|
||||
Config *host.QueueConfig `json:"config,omitempty"`
|
||||
Payload []byte `json:"payload,omitempty"`
|
||||
TaskID string `json:"taskId,omitempty"`
|
||||
}
|
||||
|
||||
type TestOutput struct {
|
||||
TaskID string `json:"taskId,omitempty"`
|
||||
Status string `json:"status,omitempty"`
|
||||
Error *string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
//go:wasmexport nd_test_taskqueue
|
||||
func ndTestTaskQueue() int32 {
|
||||
var input TestInput
|
||||
if err := pdk.InputJSON(&input); err != nil {
|
||||
errStr := err.Error()
|
||||
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||
return 0
|
||||
}
|
||||
|
||||
switch input.Operation {
|
||||
case "create_queue":
|
||||
config := host.QueueConfig{}
|
||||
if input.Config != nil {
|
||||
config = *input.Config
|
||||
}
|
||||
err := host.TaskQueueCreateQueue(input.QueueName, config)
|
||||
if err != nil {
|
||||
errStr := err.Error()
|
||||
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||
return 0
|
||||
}
|
||||
pdk.OutputJSON(TestOutput{})
|
||||
|
||||
case "enqueue":
|
||||
taskID, err := host.TaskQueueEnqueue(input.QueueName, input.Payload)
|
||||
if err != nil {
|
||||
errStr := err.Error()
|
||||
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||
return 0
|
||||
}
|
||||
pdk.OutputJSON(TestOutput{TaskID: taskID})
|
||||
|
||||
case "get_task_status":
|
||||
status, err := host.TaskQueueGetTaskStatus(input.TaskID)
|
||||
if err != nil {
|
||||
errStr := err.Error()
|
||||
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||
return 0
|
||||
}
|
||||
pdk.OutputJSON(TestOutput{Status: status})
|
||||
|
||||
case "cancel_task":
|
||||
err := host.TaskQueueCancelTask(input.TaskID)
|
||||
if err != nil {
|
||||
errStr := err.Error()
|
||||
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||
return 0
|
||||
}
|
||||
pdk.OutputJSON(TestOutput{})
|
||||
|
||||
default:
|
||||
errStr := "unknown operation: " + input.Operation
|
||||
pdk.OutputJSON(TestOutput{Error: &errStr})
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func main() {}
|
||||
12
plugins/testdata/test-taskqueue/manifest.json
vendored
Normal file
12
plugins/testdata/test-taskqueue/manifest.json
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"name": "Test TaskQueue Plugin",
|
||||
"author": "Navidrome Test",
|
||||
"version": "1.0.0",
|
||||
"description": "A test plugin for TaskQueue integration testing",
|
||||
"permissions": {
|
||||
"taskqueue": {
|
||||
"reason": "For testing task queue operations",
|
||||
"maxConcurrency": 10
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -76,6 +76,7 @@ func serveIndex(ds model.DataStore, fs fs.FS, shareInfo *model.Share) http.Handl
|
||||
"separator": string(os.PathSeparator),
|
||||
"enableInspect": conf.Server.Inspect.Enabled,
|
||||
"pluginsEnabled": conf.Server.Plugins.Enabled,
|
||||
"extAuthLogoutURL": conf.Server.ExtAuth.LogoutURL,
|
||||
}
|
||||
if strings.HasPrefix(conf.Server.UILoginBackgroundURL, "/") {
|
||||
appConfig["loginBackgroundURL"] = path.Join(conf.Server.BasePath, conf.Server.UILoginBackgroundURL)
|
||||
|
||||
@@ -104,6 +104,7 @@ var _ = Describe("serveIndex", func() {
|
||||
Entry("enableUserEditing", func() { conf.Server.EnableUserEditing = false }, "enableUserEditing", false),
|
||||
Entry("enableSharing", func() { conf.Server.EnableSharing = true }, "enableSharing", true),
|
||||
Entry("devNewEventStream", func() { conf.Server.DevNewEventStream = true }, "devNewEventStream", true),
|
||||
Entry("extAuthLogoutURL", func() { conf.Server.ExtAuth.LogoutURL = "https://auth.example.com/logout" }, "extAuthLogoutURL", "https://auth.example.com/logout"),
|
||||
)
|
||||
|
||||
DescribeTable("sets other UI configuration values",
|
||||
|
||||
@@ -443,7 +443,7 @@ func (api *Router) buildArtist(r *http.Request, artist *model.Artist) (*response
|
||||
func (api *Router) buildAlbumDirectory(ctx context.Context, album *model.Album) (*responses.Directory, error) {
|
||||
dir := &responses.Directory{}
|
||||
dir.Id = album.ID
|
||||
dir.Name = album.Name
|
||||
dir.Name = album.FullName()
|
||||
dir.Parent = album.AlbumArtistID
|
||||
dir.PlayCount = album.PlayCount
|
||||
if album.PlayCount > 0 {
|
||||
|
||||
@@ -197,7 +197,7 @@ func childFromMediaFile(ctx context.Context, mf model.MediaFile) responses.Child
|
||||
}
|
||||
|
||||
child.Parent = mf.AlbumID
|
||||
child.Album = mf.Album
|
||||
child.Album = mf.FullAlbumName()
|
||||
child.Year = int32(mf.Year)
|
||||
child.Artist = mf.Artist
|
||||
child.Genre = mf.Genre
|
||||
@@ -302,7 +302,7 @@ func artistRefs(participants model.ParticipantList) []responses.ArtistID3Ref {
|
||||
func fakePath(mf model.MediaFile) string {
|
||||
builder := strings.Builder{}
|
||||
|
||||
builder.WriteString(fmt.Sprintf("%s/%s/", sanitizeSlashes(mf.AlbumArtist), sanitizeSlashes(mf.Album)))
|
||||
builder.WriteString(fmt.Sprintf("%s/%s/", sanitizeSlashes(mf.AlbumArtist), sanitizeSlashes(mf.FullAlbumName())))
|
||||
if mf.DiscNumber != 0 {
|
||||
builder.WriteString(fmt.Sprintf("%02d-", mf.DiscNumber))
|
||||
}
|
||||
@@ -321,9 +321,10 @@ func childFromAlbum(ctx context.Context, al model.Album) responses.Child {
|
||||
child := responses.Child{}
|
||||
child.Id = al.ID
|
||||
child.IsDir = true
|
||||
child.Title = al.Name
|
||||
child.Name = al.Name
|
||||
child.Album = al.Name
|
||||
fullName := al.FullName()
|
||||
child.Title = fullName
|
||||
child.Name = fullName
|
||||
child.Album = fullName
|
||||
child.Artist = al.AlbumArtist
|
||||
child.Year = int32(cmp.Or(al.MaxOriginalYear, al.MaxYear))
|
||||
child.Genre = al.Genre
|
||||
@@ -405,7 +406,7 @@ func buildDiscSubtitles(a model.Album) []responses.DiscTitle {
|
||||
func buildAlbumID3(ctx context.Context, album model.Album) responses.AlbumID3 {
|
||||
dir := responses.AlbumID3{}
|
||||
dir.Id = album.ID
|
||||
dir.Name = album.Name
|
||||
dir.Name = album.FullName()
|
||||
dir.Artist = album.AlbumArtist
|
||||
dir.ArtistId = album.AlbumArtistID
|
||||
dir.CoverArt = album.CoverArtID().String()
|
||||
|
||||
@@ -66,6 +66,10 @@ const authProvider = {
|
||||
|
||||
logout: () => {
|
||||
removeItems()
|
||||
if (config.extAuthLogoutURL) {
|
||||
window.location.href = config.extAuthLogoutURL
|
||||
return Promise.resolve(false)
|
||||
}
|
||||
return Promise.resolve()
|
||||
},
|
||||
|
||||
|
||||
@@ -122,7 +122,7 @@ const UserMenu = (props) => {
|
||||
})
|
||||
: null,
|
||||
)}
|
||||
{!config.auth && logout}
|
||||
{(!config.auth || !!config.extAuthLogoutURL) && logout}
|
||||
</MenuList>
|
||||
</Popover>
|
||||
</div>
|
||||
|
||||
Reference in New Issue
Block a user