Compare commits

..

10 Commits

Author SHA1 Message Date
Viktor Scharf
8288abf63e deleting user by userId in test 2025-09-26 16:27:37 +02:00
Christian Richter
b1bc63274f attempt to fix broken soft delete
Signed-off-by: Christian Richter <c.richter@opencloud.eu>
2025-09-25 17:08:15 +02:00
Christian Richter
d76f528c00 use standard errors package
Signed-off-by: Christian Richter <c.richter@opencloud.eu>
2025-09-24 15:03:31 +02:00
Christian Richter
c15bb0f99b remove obsolete properties
Signed-off-by: Christian Richter <c.richter@opencloud.eu>
2025-09-24 15:03:31 +02:00
Christian Richter
11900601d2 revert faulty replaces
Signed-off-by: Christian Richter <c.richter@opencloud.eu>
2025-09-24 15:03:31 +02:00
Christian Richter
16d600cffe add missing pointer
Signed-off-by: Christian Richter <c.richter@opencloud.eu>
2025-09-24 15:03:31 +02:00
Christian Richter
3272b4862a respect ldap settings, add comments
Signed-off-by: Christian Richter <c.richter@opencloud.eu>
2025-09-24 15:03:31 +02:00
Christian Richter
29804e355b add persistance function & userstate
Signed-off-by: Christian Richter <c.richter@opencloud.eu>
2025-09-24 15:03:31 +02:00
Christian Richter
9c4f74d394 add nats-js-kv connection to graph
Signed-off-by: Christian Richter <c.richter@opencloud.eu>

# Conflicts:
#	services/graph/pkg/service/v0/service.go
2025-09-24 15:03:31 +02:00
Christian Richter
55a6f057e5 add nats-js-kv persistance to graph
Signed-off-by: Christian Richter <c.richter@opencloud.eu>
2025-09-24 15:03:31 +02:00
57 changed files with 651 additions and 1238 deletions

8
go.mod
View File

@@ -57,7 +57,7 @@ require (
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.11.9
github.com/nats-io/nats.go v1.46.0
github.com/nats-io/nats.go v1.45.0
github.com/oklog/run v1.2.0
github.com/olekukonko/tablewriter v1.0.9
github.com/onsi/ginkgo v1.16.5
@@ -65,7 +65,7 @@ require (
github.com/onsi/gomega v1.38.2
github.com/open-policy-agent/opa v1.8.0
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76
github.com/opencloud-eu/reva/v2 v2.38.1-0.20250924125540-eaa2437c36b2
github.com/opencloud-eu/reva/v2 v2.38.1-0.20250922152322-476bb1f0070a
github.com/opensearch-project/opensearch-go/v4 v4.5.0
github.com/orcaman/concurrent-map v1.0.0
github.com/pkg/errors v0.9.1
@@ -104,8 +104,8 @@ require (
go.opentelemetry.io/otel/trace v1.38.0
golang.org/x/crypto v0.42.0
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac
golang.org/x/image v0.31.0
golang.org/x/net v0.44.0
golang.org/x/image v0.30.0
golang.org/x/net v0.43.0
golang.org/x/oauth2 v0.31.0
golang.org/x/sync v0.17.0
golang.org/x/term v0.35.0

16
go.sum
View File

@@ -885,8 +885,8 @@ github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.11.9 h1:k7nzHZjUf51W1b08xiQih63Rdxh0yr5O4K892Mx5gQA=
github.com/nats-io/nats-server/v2 v2.11.9/go.mod h1:1MQgsAQX1tVjpf3Yzrk3x2pzdsZiNL/TVP3Amhp3CR8=
github.com/nats-io/nats.go v1.46.0 h1:iUcX+MLT0HHXskGkz+Sg20sXrPtJLsOojMDTDzOHSb8=
github.com/nats-io/nats.go v1.46.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA=
github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
@@ -930,8 +930,8 @@ github.com/opencloud-eu/go-micro-plugins/v4/store/nats-js-kv v0.0.0-202505121527
github.com/opencloud-eu/go-micro-plugins/v4/store/nats-js-kv v0.0.0-20250512152754-23325793059a/go.mod h1:pjcozWijkNPbEtX5SIQaxEW/h8VAVZYTLx+70bmB3LY=
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76 h1:vD/EdfDUrv4omSFjrinT8Mvf+8D7f9g4vgQ2oiDrVUI=
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76/go.mod h1:pzatilMEHZFT3qV7C/X3MqOa3NlRQuYhlRhZTL+hN6Q=
github.com/opencloud-eu/reva/v2 v2.38.1-0.20250924125540-eaa2437c36b2 h1:e3B6KbWMjloKpqoTwTwvBLoCETRyyCDkQsqwRQMUdxc=
github.com/opencloud-eu/reva/v2 v2.38.1-0.20250924125540-eaa2437c36b2/go.mod h1:8mGCM9tLIPsC5aEKS022Z5u89u6jKuOl0znK0gNFReM=
github.com/opencloud-eu/reva/v2 v2.38.1-0.20250922152322-476bb1f0070a h1:LwqYIxoBH26sWdk2xHzgEdWlqVnQJ2lC8MuwBMjkuC0=
github.com/opencloud-eu/reva/v2 v2.38.1-0.20250922152322-476bb1f0070a/go.mod h1:jykpTTQ0QWw0EzLop+6neuKCPccE0rj2asOwzls283U=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040=
@@ -1343,8 +1343,8 @@ golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac/go.mod h1:hH+7mtFmImwwcMvScy
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/image v0.18.0/go.mod h1:4yyo5vMFQjVjUcVk4jEQcU9MGy/rulF5WvUILseCM2E=
golang.org/x/image v0.31.0 h1:mLChjE2MV6g1S7oqbXC0/UcKijjm5fnJLUYKIYrLESA=
golang.org/x/image v0.31.0/go.mod h1:R9ec5Lcp96v9FTF+ajwaH3uGxPH4fKfHHAVbUILxghA=
golang.org/x/image v0.30.0 h1:jD5RhkmVAnjqaCUXfbGBrn3lpxbknfN9w2UhHHU+5B4=
golang.org/x/image v0.30.0/go.mod h1:SAEUTxCCMWSrJcCy/4HwavEsfZZJlYxeHLc6tTiAe/c=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -1424,8 +1424,8 @@ golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I=
golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=

View File

@@ -28,7 +28,7 @@ dev-docker-multiarch:
docker buildx rm opencloudbuilder || true
docker buildx create --platform linux/arm64,linux/amd64 --name opencloudbuilder
docker buildx use opencloudbuilder
docker buildx build --platform linux/arm64,linux/amd64 --output type=docker --file docker/Dockerfile.multiarch --tag opencloudeu/opencloud:dev-multiarch ../..
cd .. && docker buildx build --platform linux/arm64,linux/amd64 --output type=docker --file opencloud/docker/Dockerfile.multiarch --tag opencloudeu/opencloud:dev-multiarch .
docker buildx rm opencloudbuilder
.PHONY: debug-docker

View File

@@ -7,11 +7,11 @@ ARG STRING
RUN apk add bash make git curl gcc musl-dev libc-dev binutils-gold inotify-tools vips-dev
WORKDIR /opencloud
RUN --mount=type=bind,target=/opencloud,rw\
RUN --mount=type=bind,target=/opencloud \
--mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache \
GOOS="${TARGETOS:-linux}" GOARCH="${TARGETARCH:-amd64}" ; \
cd opencloud && make -C opencloud release-linux-docker-${TARGETARCH} ENABLE_VIPS=true DIST=/dist
make -C opencloud release-linux-docker-${TARGETARCH} ENABLE_VIPS=true DIST=/dist
FROM alpine:3.21
ARG VERSION

View File

@@ -7,6 +7,7 @@ import (
"net"
"net/http"
"net/rpc"
"os/signal"
"sort"
"strings"
"sync"
@@ -30,7 +31,6 @@ import (
authmachine "github.com/opencloud-eu/opencloud/services/auth-machine/pkg/command"
authservice "github.com/opencloud-eu/opencloud/services/auth-service/pkg/command"
clientlog "github.com/opencloud-eu/opencloud/services/clientlog/pkg/command"
collaboration "github.com/opencloud-eu/opencloud/services/collaboration/pkg/command"
eventhistory "github.com/opencloud-eu/opencloud/services/eventhistory/pkg/command"
frontend "github.com/opencloud-eu/opencloud/services/frontend/pkg/command"
gateway "github.com/opencloud-eu/opencloud/services/gateway/pkg/command"
@@ -329,11 +329,6 @@ func NewService(ctx context.Context, options ...Option) (*Service, error) {
cfg.Audit.Commons = cfg.Commons
return audit.Execute(cfg.Audit)
})
areg(opts.Config.Collaboration.Service.Name, func(ctx context.Context, cfg *occfg.Config) error {
cfg.Collaboration.Context = ctx
cfg.Collaboration.Commons = cfg.Commons
return collaboration.Execute(cfg.Collaboration)
})
areg(opts.Config.Policies.Service.Name, func(ctx context.Context, cfg *occfg.Config) error {
cfg.Policies.Context = ctx
cfg.Policies.Commons = cfg.Commons
@@ -365,9 +360,12 @@ func Start(ctx context.Context, o ...Option) error {
return err
}
// create a context that will be cancelled when too many backoff cycles on one of the services happens
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// cancel the context when a signal is received.
var cancel context.CancelFunc
if ctx == nil {
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}
// tolerance controls backoff cycles from the supervisor.
tolerance := 5

View File

@@ -69,26 +69,16 @@ func Server(cfg *config.Config) *cli.Command {
return err
}
// use the AppURLs helper (an atomic pointer) to fetch and store the app URLs
// this is required as the app URLs are fetched periodically in the background
// and read when handling requests
appURLs := helpers.NewAppURLs()
appUrls, err := helpers.GetAppURLs(cfg, logger)
if err != nil {
return err
}
ticker := time.NewTicker(cfg.CS3Api.APPRegistrationInterval)
defer ticker.Stop()
go func() {
for ; true; <-ticker.C {
// fetch and store the app URLs
v, err := helpers.GetAppURLs(cfg, logger)
if err != nil {
logger.Warn().Err(err).Msg("Failed to get app URLs")
// empty map to clear previous URLs
v = make(map[string]map[string]string)
}
appURLs.Store(v)
// register the app provider
if err := helpers.RegisterAppProvider(ctx, cfg, logger, gatewaySelector, appURLs); err != nil {
if err := helpers.RegisterAppProvider(ctx, cfg, logger, gatewaySelector, appUrls); err != nil {
logger.Warn().Err(err).Msg("Failed to register app provider")
}
}
@@ -107,7 +97,7 @@ func Server(cfg *config.Config) *cli.Command {
// start GRPC server
grpcServer, teardown, err := grpc.Server(
grpc.AppURLs(appURLs),
grpc.AppURLs(appUrls),
grpc.Config(cfg),
grpc.Logger(logger),
grpc.TraceProvider(traceProvider),

View File

@@ -6,75 +6,13 @@ import (
"net/http"
"net/url"
"strings"
"sync/atomic"
"github.com/beevik/etree"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/collaboration/pkg/config"
"github.com/opencloud-eu/reva/v2/pkg/mime"
"github.com/pkg/errors"
)
// AppURLs holds the app urls fetched from the WOPI app discovery endpoint
// It is a type safe wrapper around an atomic pointer to a map
type AppURLs struct {
urls atomic.Pointer[map[string]map[string]string]
}
func NewAppURLs() *AppURLs {
a := &AppURLs{}
a.urls.Store(&map[string]map[string]string{})
return a
}
func (a *AppURLs) Store(urls map[string]map[string]string) {
a.urls.Store(&urls)
}
func (a *AppURLs) GetMimeTypes() []string {
currentURLs := a.urls.Load()
if currentURLs == nil {
return []string{}
}
mimeTypesMap := make(map[string]bool)
for _, extensions := range *currentURLs {
for ext := range extensions {
m := mime.Detect(false, ext)
// skip the default
if m == "application/octet-stream" {
continue
}
mimeTypesMap[m] = true
}
}
// Convert map to slice
mimeTypes := make([]string, 0, len(mimeTypesMap))
for mimeType := range mimeTypesMap {
mimeTypes = append(mimeTypes, mimeType)
}
return mimeTypes
}
// GetAppURLFor gets the appURL from the list of appURLs based on the
// action and file extension provided. If there is no match, an empty
// string will be returned.
func (a *AppURLs) GetAppURLFor(action, fileExt string) string {
currentURLs := a.urls.Load()
if currentURLs == nil {
return ""
}
if actionURL, ok := (*currentURLs)[action]; ok {
if actionExtensionURL, ok := actionURL[fileExt]; ok {
return actionExtensionURL
}
}
return ""
}
// GetAppURLs gets the edit and view urls for different file types from the
// target WOPI app (onlyoffice, collabora, etc) via their "/hosting/discovery"
// endpoint.
@@ -92,6 +30,10 @@ func GetAppURLs(cfg *config.Config, logger log.Logger) (map[string]map[string]st
httpResp, err := httpClient.Get(wopiAppUrl)
if err != nil {
logger.Error().
Err(err).
Str("WopiAppUrl", wopiAppUrl).
Msg("WopiDiscovery: failed to access wopi app url")
return nil, err
}

View File

@@ -12,265 +12,6 @@ import (
"github.com/opencloud-eu/opencloud/services/collaboration/pkg/helpers"
)
var _ = Describe("AppURLs", func() {
var appURLs *helpers.AppURLs
BeforeEach(func() {
appURLs = helpers.NewAppURLs()
})
Describe("NewAppURLs", func() {
It("should create a new AppURLs instance with empty map", func() {
Expect(appURLs).NotTo(BeNil())
Expect(appURLs.GetMimeTypes()).To(BeEmpty())
Expect(appURLs.GetAppURLFor("view", ".pdf")).To(BeEmpty())
})
})
Describe("Store and GetAppURLFor", func() {
It("should store and retrieve app URLs correctly", func() {
testURLs := map[string]map[string]string{
"view": {
".pdf": "https://example.com/view/pdf",
".docx": "https://example.com/view/docx",
".xlsx": "https://example.com/view/xlsx",
},
"edit": {
".docx": "https://example.com/edit/docx",
".xlsx": "https://example.com/edit/xlsx",
},
}
appURLs.Store(testURLs)
// Test successful lookups
Expect(appURLs.GetAppURLFor("view", ".pdf")).To(Equal("https://example.com/view/pdf"))
Expect(appURLs.GetAppURLFor("view", ".docx")).To(Equal("https://example.com/view/docx"))
Expect(appURLs.GetAppURLFor("edit", ".docx")).To(Equal("https://example.com/edit/docx"))
})
It("should return empty string for non-existent action", func() {
testURLs := map[string]map[string]string{
"view": {".pdf": "https://example.com/view/pdf"},
}
appURLs.Store(testURLs)
Expect(appURLs.GetAppURLFor("nonexistent", ".pdf")).To(BeEmpty())
})
It("should return empty string for non-existent extension", func() {
testURLs := map[string]map[string]string{
"view": {".pdf": "https://example.com/view/pdf"},
}
appURLs.Store(testURLs)
Expect(appURLs.GetAppURLFor("view", ".nonexistent")).To(BeEmpty())
})
It("should handle empty maps gracefully", func() {
emptyURLs := map[string]map[string]string{}
appURLs.Store(emptyURLs)
Expect(appURLs.GetAppURLFor("view", ".pdf")).To(BeEmpty())
})
It("should handle nil action maps gracefully", func() {
testURLs := map[string]map[string]string{
"view": nil,
}
appURLs.Store(testURLs)
Expect(appURLs.GetAppURLFor("view", ".pdf")).To(BeEmpty())
})
})
Describe("GetMimeTypes", func() {
It("should return empty slice for empty AppURLs", func() {
mimeTypes := appURLs.GetMimeTypes()
Expect(mimeTypes).To(BeEmpty())
})
It("should return correct mime types for known extensions", func() {
testURLs := map[string]map[string]string{
"view": {
".pdf": "https://example.com/view/pdf",
".docx": "https://example.com/view/docx",
".xlsx": "https://example.com/view/xlsx",
".pptx": "https://example.com/view/pptx",
},
"edit": {
".txt": "https://example.com/edit/txt",
".html": "https://example.com/edit/html",
},
}
appURLs.Store(testURLs)
mimeTypes := appURLs.GetMimeTypes()
// Should contain expected mime types (order doesn't matter)
Expect(mimeTypes).To(ContainElements(
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
"text/plain",
"text/html",
))
// Should not contain application/octet-stream (filtered out)
Expect(mimeTypes).NotTo(ContainElement("application/octet-stream"))
})
It("should deduplicate mime types across actions", func() {
testURLs := map[string]map[string]string{
"view": {
".pdf": "https://example.com/view/pdf",
".txt": "https://example.com/view/txt",
},
"edit": {
".pdf": "https://example.com/edit/pdf", // Same extension as view
".txt": "https://example.com/edit/txt", // Same extension as view
},
}
appURLs.Store(testURLs)
mimeTypes := appURLs.GetMimeTypes()
// Should only have unique mime types
Expect(mimeTypes).To(ContainElements("application/pdf", "text/plain"))
Expect(len(mimeTypes)).To(Equal(2)) // No duplicates
})
It("should filter out application/octet-stream mime types", func() {
testURLs := map[string]map[string]string{
"view": {
".pdf": "https://example.com/view/pdf",
".unknown": "https://example.com/view/unknown", // This might return application/octet-stream
".fake-ext": "https://example.com/view/fake", // This might return application/octet-stream
},
}
appURLs.Store(testURLs)
mimeTypes := appURLs.GetMimeTypes()
// Should contain PDF but not octet-stream
Expect(mimeTypes).To(ContainElement("application/pdf"))
Expect(mimeTypes).NotTo(ContainElement("application/octet-stream"))
})
It("should handle empty extension maps", func() {
testURLs := map[string]map[string]string{
"view": {},
"edit": {},
}
appURLs.Store(testURLs)
mimeTypes := appURLs.GetMimeTypes()
Expect(mimeTypes).To(BeEmpty())
})
It("should handle nil extension maps", func() {
testURLs := map[string]map[string]string{
"view": nil,
"edit": nil,
}
appURLs.Store(testURLs)
mimeTypes := appURLs.GetMimeTypes()
Expect(mimeTypes).To(BeEmpty())
})
})
Describe("Concurrent Access", func() {
It("should handle concurrent reads and writes safely", func() {
// This is a basic smoke test for concurrent access
// In practice, you'd want more sophisticated race testing
initialURLs := map[string]map[string]string{
"view": {".pdf": "https://example.com/view/pdf"},
}
appURLs.Store(initialURLs)
done := make(chan bool, 10)
// Start multiple readers
for i := 0; i < 5; i++ {
go func() {
defer GinkgoRecover()
for j := 0; j < 100; j++ {
_ = appURLs.GetAppURLFor("view", ".pdf")
_ = appURLs.GetMimeTypes()
}
done <- true
}()
}
// Start multiple writers
for i := 0; i < 5; i++ {
go func(id int) {
defer GinkgoRecover()
for j := 0; j < 100; j++ {
newURLs := map[string]map[string]string{
"view": {".pdf": "https://example.com/updated/pdf"},
}
appURLs.Store(newURLs)
}
done <- true
}(i)
}
// Wait for all goroutines to complete
for i := 0; i < 10; i++ {
<-done
}
// Should still be functional after concurrent access
Expect(appURLs.GetAppURLFor("view", ".pdf")).NotTo(BeEmpty())
})
})
Describe("Real-world scenarios", func() {
It("should handle realistic WOPI discovery data", func() {
// Based on the test data from the discovery tests
realisticURLs := map[string]map[string]string{
"view": {
".pdf": "https://cloud.opencloud.test/hosting/wopi/word/view",
".djvu": "https://cloud.opencloud.test/hosting/wopi/word/view",
".docx": "https://cloud.opencloud.test/hosting/wopi/word/view",
".xls": "https://cloud.opencloud.test/hosting/wopi/cell/view",
".xlsb": "https://cloud.opencloud.test/hosting/wopi/cell/view",
},
"edit": {
".docx": "https://cloud.opencloud.test/hosting/wopi/word/edit",
},
}
appURLs.Store(realisticURLs)
// Test specific lookups
Expect(appURLs.GetAppURLFor("view", ".pdf")).To(Equal("https://cloud.opencloud.test/hosting/wopi/word/view"))
Expect(appURLs.GetAppURLFor("edit", ".docx")).To(Equal("https://cloud.opencloud.test/hosting/wopi/word/edit"))
Expect(appURLs.GetAppURLFor("edit", ".pdf")).To(BeEmpty()) // No edit for PDF
// Test mime types
mimeTypes := appURLs.GetMimeTypes()
Expect(mimeTypes).To(ContainElements(
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.ms-excel",
))
})
})
})
var _ = Describe("Discovery", func() {
var (
discoveryContent1 string

View File

@@ -7,6 +7,7 @@ import (
registryv1beta1 "github.com/cs3org/go-cs3apis/cs3/app/registry/v1beta1"
gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
"github.com/opencloud-eu/reva/v2/pkg/mime"
"github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool"
"github.com/opencloud-eu/opencloud/pkg/log"
@@ -34,9 +35,24 @@ func RegisterAppProvider(
cfg *config.Config,
logger log.Logger,
gws pool.Selectable[gatewayv1beta1.GatewayAPIClient],
appUrls *AppURLs,
appUrls map[string]map[string]string,
) error {
mimeTypes := appUrls.GetMimeTypes()
mimeTypesMap := make(map[string]bool)
for _, extensions := range appUrls {
for ext := range extensions {
m := mime.Detect(false, ext)
// skip the default
if m == "application/octet-stream" {
continue
}
mimeTypesMap[m] = true
}
}
mimeTypes := make([]string, 0, len(mimeTypesMap))
for m := range mimeTypesMap {
mimeTypes = append(mimeTypes, m)
}
logger.Debug().
Str("AppName", cfg.App.Name).

View File

@@ -5,7 +5,6 @@ import (
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/collaboration/pkg/config"
"github.com/opencloud-eu/opencloud/services/collaboration/pkg/helpers"
microstore "go-micro.dev/v4/store"
"go.opentelemetry.io/otel/trace"
)
@@ -15,7 +14,7 @@ type Option func(o *Options)
// Options defines the available options for this package.
type Options struct {
AppURLs *helpers.AppURLs
AppURLs map[string]map[string]string
Name string
Logger log.Logger
Context context.Context
@@ -36,7 +35,7 @@ func newOptions(opts ...Option) Options {
}
// AppURLs provides app urls based on mimetypes.
func AppURLs(val *helpers.AppURLs) Option {
func AppURLs(val map[string]map[string]string) Option {
return func(o *Options) {
o.AppURLs = val
}

View File

@@ -7,7 +7,6 @@ import (
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/collaboration/pkg/config"
"github.com/opencloud-eu/opencloud/services/collaboration/pkg/helpers"
)
// Option defines a single option function.
@@ -17,7 +16,7 @@ type Option func(o *Options)
type Options struct {
Logger log.Logger
Config *config.Config
AppURLs *helpers.AppURLs
AppURLs map[string]map[string]string
GatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient]
Store microstore.Store
}
@@ -48,7 +47,7 @@ func Config(val *config.Config) Option {
}
// AppURLs provides a function to set the AppURLs option.
func AppURLs(val *helpers.AppURLs) Option {
func AppURLs(val map[string]map[string]string) Option {
return func(o *Options) {
o.AppURLs = val
}

View File

@@ -42,10 +42,6 @@ func NewHandler(opts ...Option) (*Service, func(), error) {
}
}
if options.AppURLs == nil {
return nil, teardown, errors.New("AppURLs option is required")
}
return &Service{
id: options.Config.GRPC.Namespace + "." + options.Config.Service.Name + "." + options.Config.App.Name,
appURLs: options.AppURLs,
@@ -59,7 +55,7 @@ func NewHandler(opts ...Option) (*Service, func(), error) {
// Service implements the OpenInApp interface
type Service struct {
id string
appURLs *helpers.AppURLs
appURLs map[string]map[string]string
logger log.Logger
config *config.Config
gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient]
@@ -177,6 +173,18 @@ func (s *Service) OpenInApp(
}, nil
}
// getAppUrlFor gets the appURL from the list of appURLs based on the
// action and file extension provided. If there is no match, an empty
// string will be returned.
func (s *Service) getAppUrlFor(action, fileExt string) string {
if actionURL, ok := s.appURLs[action]; ok {
if actionExtensionURL, ok := actionURL[fileExt]; ok {
return actionExtensionURL
}
}
return ""
}
// getAppUrl will get the appURL that should be used based on the extension
// and the provided view mode.
// "view" urls will be chosen first, then if the view mode is "read/write",
@@ -184,17 +192,17 @@ func (s *Service) OpenInApp(
// "read/write" view mode if no "edit" url is found.
func (s *Service) getAppUrl(fileExt string, viewMode appproviderv1beta1.ViewMode) string {
// prioritize view action if possible
appURL := s.appURLs.GetAppURLFor("view", fileExt)
appURL := s.getAppUrlFor("view", fileExt)
if strings.ToLower(s.config.App.Product) == "collabora" {
// collabora provides only one action per extension. usual options
// are "view" (checked above), "edit" or "view_comment" (this last one
// is exclusive of collabora)
if appURL == "" {
if editURL := s.appURLs.GetAppURLFor("edit", fileExt); editURL != "" {
if editURL := s.getAppUrlFor("edit", fileExt); editURL != "" {
return editURL
}
if commentURL := s.appURLs.GetAppURLFor("view_comment", fileExt); commentURL != "" {
if commentURL := s.getAppUrlFor("view_comment", fileExt); commentURL != "" {
return commentURL
}
}
@@ -202,7 +210,7 @@ func (s *Service) getAppUrl(fileExt string, viewMode appproviderv1beta1.ViewMode
// If not collabora, there might be an edit action for the extension.
// If read/write mode has been requested, prioritize edit action.
if viewMode == appproviderv1beta1.ViewMode_VIEW_MODE_READ_WRITE {
if editAppURL := s.appURLs.GetAppURLFor("edit", fileExt); editAppURL != "" {
if editAppURL := s.getAppUrlFor("edit", fileExt); editAppURL != "" {
appURL = editAppURL
}
}

View File

@@ -23,7 +23,6 @@ import (
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/collaboration/mocks"
"github.com/opencloud-eu/opencloud/services/collaboration/pkg/config"
"github.com/opencloud-eu/opencloud/services/collaboration/pkg/helpers"
service "github.com/opencloud-eu/opencloud/services/collaboration/pkg/service/grpc/v0"
)
@@ -81,25 +80,22 @@ var _ = Describe("Discovery", func() {
gatewaySelector := mocks.NewSelectable[gatewayv1beta1.GatewayAPIClient](GinkgoT())
gatewaySelector.On("Next").Return(gatewayClient, nil)
appURLs := helpers.NewAppURLs()
appURLs.Store(map[string]map[string]string{
"view": {
".pdf": "https://cloud.opencloud.test/hosting/wopi/word/view",
".djvu": "https://cloud.opencloud.test/hosting/wopi/word/view",
".docx": "https://cloud.opencloud.test/hosting/wopi/word/view",
".xls": "https://cloud.opencloud.test/hosting/wopi/cell/view",
".xlsb": "https://cloud.opencloud.test/hosting/wopi/cell/view",
},
"edit": {
".docx": "https://cloud.opencloud.test/hosting/wopi/word/edit",
".invalid": "://cloud.opencloud.test/hosting/wopi/cell/edit",
},
})
srv, srvTear, _ = service.NewHandler(
service.Logger(log.NopLogger()),
service.Config(cfg),
service.AppURLs(appURLs),
service.AppURLs(map[string]map[string]string{
"view": {
".pdf": "https://cloud.opencloud.test/hosting/wopi/word/view",
".djvu": "https://cloud.opencloud.test/hosting/wopi/word/view",
".docx": "https://cloud.opencloud.test/hosting/wopi/word/view",
".xls": "https://cloud.opencloud.test/hosting/wopi/cell/view",
".xlsb": "https://cloud.opencloud.test/hosting/wopi/cell/view",
},
"edit": {
".docx": "https://cloud.opencloud.test/hosting/wopi/word/edit",
".invalid": "://cloud.opencloud.test/hosting/wopi/cell/edit",
},
}),
service.GatewaySelector(gatewaySelector),
)
})

View File

@@ -49,7 +49,6 @@ type Config struct {
LDAPServerWriteEnabled bool `yaml:"ldap_server_write_enabled" env:"OC_LDAP_SERVER_WRITE_ENABLED;FRONTEND_LDAP_SERVER_WRITE_ENABLED" desc:"Allow creating, modifying and deleting LDAP users via the GRAPH API. This can only be set to 'true' when keeping default settings for the LDAP user and group attribute types (the 'OC_LDAP_USER_SCHEMA_* and 'OC_LDAP_GROUP_SCHEMA_* variables)." introductionVersion:"1.0.0"`
EditLoginAllowedDisabled bool `yaml:"edit_login_allowed_disabled" env:"FRONTEND_EDIT_LOGIN_ALLOWED_DISABLED" desc:"Used to set if login is allowed/forbidden for for User." introductionVersion:"3.4.0"`
FullTextSearch bool `yaml:"full_text_search" env:"FRONTEND_FULL_TEXT_SEARCH_ENABLED" desc:"Set to true to signal the web client that full-text search is enabled." introductionVersion:"1.0.0"`
CheckForUpdates bool `yaml:"check_for_updates" env:"FRONTEND_CHECK_FOR_UPDATES" desc:"Enable automatic checking for updates. Defaults to true." introductionVersion:"3.6.0"`
Middleware Middleware `yaml:"middleware"`

View File

@@ -88,7 +88,6 @@ func DefaultConfig() *config.Config {
DefaultLinkPermissions: 1,
SearchMinLength: 3,
Edition: "",
CheckForUpdates: true,
Checksums: config.Checksums{
SupportedTypes: []string{"sha1", "md5", "adler32"},
PreferredUploadType: "sha1",

View File

@@ -214,7 +214,6 @@ func FrontendConfigFromStruct(cfg *config.Config, logger log.Logger) (map[string
"productversion": version.GetString(),
"hostname": "",
},
"check_for_updates": cfg.CheckForUpdates,
"support_url_signing": true,
"support_sse": !cfg.DisableSSE,
},

View File

@@ -42,6 +42,8 @@ type Config struct {
Metadata Metadata `yaml:"metadata_config"`
UserSoftDeleteRetentionTime time.Duration `yaml:"user_soft_delete_retention_time" env:"GRAPH_USER_SOFT_DELETE_RETENTION_TIME" desc:"The time after which a soft-deleted user is permanently deleted. If set to 0 (default), there is no soft delete retention time and users are deleted immediately after being soft-deleted. If set to a positive value, the user will be kept in the system for that duration before being permanently deleted." introductionVersion:"%%NEXT%%"`
Store Store `yaml:"store"`
}
type Spaces struct {
@@ -168,3 +170,11 @@ type Metadata struct {
SystemUserIDP string `yaml:"system_user_idp" env:"OC_SYSTEM_USER_IDP;GRAPH_SYSTEM_USER_IDP" desc:"IDP of the OpenCloud STORAGE-SYSTEM system user." introductionVersion:"%%NEXT%%"`
SystemUserAPIKey string `yaml:"system_user_api_key" env:"OC_SYSTEM_USER_API_KEY" desc:"API key for the STORAGE-SYSTEM system user." introductionVersion:"%%NEXT%%"`
}
// Store configures the store to use
type Store struct {
Nodes []string `yaml:"nodes" env:"OC_PERSISTENT_STORE_NODES;GRAPH_STORE_NODES" desc:"A list of nodes to access the configured store. This has no effect when 'memory' store is configured. Note that the behaviour how nodes are used is dependent on the library of the configured store. See the Environment Variable Types description for more details." introductionVersion:"1.0.0"`
Database string `yaml:"database" env:"GRAPH_STORE_DATABASE" desc:"The database name the configured store should use." introductionVersion:"1.0.0"`
AuthUsername string `yaml:"username" env:"OC_PERSISTENT_STORE_AUTH_USERNAME;GRAPH_STORE_AUTH_USERNAME" desc:"The username to authenticate with the store. Only applies when store type 'nats-js-kv' is configured." introductionVersion:"1.0.0"`
AuthPassword string `yaml:"password" env:"OC_PERSISTENT_STORE_AUTH_PASSWORD;GRAPH_STORE_AUTH_PASSWORD" desc:"The password to authenticate with the store. Only applies when store type 'nats-js-kv' is configured." introductionVersion:"1.0.0"`
}

View File

@@ -131,6 +131,10 @@ func DefaultConfig() *config.Config {
SystemUserIDP: "internal",
},
UserSoftDeleteRetentionTime: 0,
Store: config.Store{
Nodes: []string{"127.0.0.1:9233"},
Database: "graph",
},
}
}

View File

@@ -1,142 +0,0 @@
# SOME DESCRIPTIVE TITLE.
# Copyright (C) YEAR THE PACKAGE'S COPYRIGHT HOLDER
# This file is distributed under the same license as the PACKAGE package.
# FIRST AUTHOR <EMAIL@ADDRESS>, YEAR.
#
# Translators:
# LinkinWires <darkinsonic13@gmail.com>, 2025
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2025-09-26 00:01+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: LinkinWires <darkinsonic13@gmail.com>, 2025\n"
"Language-Team: Ukrainian (https://app.transifex.com/opencloud-eu/teams/204053/uk/)\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Language: uk\n"
"Plural-Forms: nplurals=4; plural=(n % 1 == 0 && n % 10 == 1 && n % 100 != 11 ? 0 : n % 1 == 0 && n % 10 >= 2 && n % 10 <= 4 && (n % 100 < 12 || n % 100 > 14) ? 1 : n % 1 == 0 && (n % 10 ==0 || (n % 10 >=5 && n % 10 <=9) || (n % 100 >=11 && n % 100 <=14 )) ? 2: 3);\n"
#. UnifiedRole Editor, Role DisplayName (resolves directly)
#. UnifiedRole EditorListGrants, Role DisplayName (resolves directly)
#. UnifiedRole SpaseEditor, Role DisplayName (resolves directly)
#. UnifiedRole FileEditor, Role DisplayName (resolves directly)
#. UnifiedRole FileEditorListGrants, Role DisplayName (resolves directly)
#: pkg/unifiedrole/roles.go:116 pkg/unifiedrole/roles.go:122
#: pkg/unifiedrole/roles.go:128 pkg/unifiedrole/roles.go:140
#: pkg/unifiedrole/roles.go:146
msgid "Can edit"
msgstr "Може редагувати"
#. UnifiedRole SpaseEditorWithoutVersions, Role DisplayName (resolves
#. directly)
#: pkg/unifiedrole/roles.go:134
msgid "Can edit without versions"
msgstr "Може редагувати (без версій)"
#. UnifiedRole Manager, Role DisplayName (resolves directly)
#: pkg/unifiedrole/roles.go:158
msgid "Can manage"
msgstr "Може керувати"
#. UnifiedRole EditorLite, Role DisplayName (resolves directly)
#: pkg/unifiedrole/roles.go:152
msgid "Can upload"
msgstr "Може вивантажувати"
#. UnifiedRole Viewer, Role DisplayName (resolves directly)
#. UnifiedRole Viewer, Role DisplayName (resolves directly)
#. UnifiedRole SpaseViewer, Role DisplayName (resolves directly)
#: pkg/unifiedrole/roles.go:98 pkg/unifiedrole/roles.go:104
#: pkg/unifiedrole/roles.go:110
msgid "Can view"
msgstr "Може переглядати"
#. UnifiedRole SecureViewer, Role DisplayName (resolves directly)
#: pkg/unifiedrole/roles.go:164
msgid "Can view (secure)"
msgstr "Може переглядати (обмежено)"
#. UnifiedRole FullDenial, Role DisplayName (resolves directly)
#: pkg/unifiedrole/roles.go:170
msgid "Cannot access"
msgstr "Не має доступу"
#. UnifiedRole FullDenial, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:167
msgid "Deny all access."
msgstr "Заборонити будь-який доступ."
#. default description for new spaces
#: pkg/service/v0/spacetemplates.go:32
msgid "Here you can add a description for this Space."
msgstr "Тут можна додати опис до цього простору."
#. UnifiedRole Viewer, Role Description (resolves directly)
#. UnifiedRole SpaceViewer, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:95 pkg/unifiedrole/roles.go:107
msgid "View and download."
msgstr "Може переглядати та завантажувати файли."
#. UnifiedRole SecureViewer, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:161
msgid "View only documents, images and PDFs. Watermarks will be applied."
msgstr ""
"Може переглядати лише документи, зображення та PDF-файли. Будуть застосовані"
" водяні знаки."
#. UnifiedRole FileEditor, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:137
msgid "View, download and edit."
msgstr "Може переглядати, завантажувати та редагувати файли."
#. UnifiedRole ViewerListGrants, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:101
msgid "View, download and show all invited people."
msgstr "Може переглядати, завантажувати та показувати усіх запрошених людей."
#. UnifiedRole EditorLite, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:149
msgid "View, download and upload."
msgstr "Може переглядати, завантажувати та вивантажувати файли."
#. UnifiedRole FileEditorListGrants, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:143
msgid "View, download, edit and show all invited people."
msgstr ""
"Може переглядати, завантажувати, редагувати та показувати усіх запрошених "
"людей."
#. UnifiedRole Editor, Role Description (resolves directly)
#. UnifiedRole SpaseEditorWithoutVersions, Role Description (resolves
#. directly)
#: pkg/unifiedrole/roles.go:113 pkg/unifiedrole/roles.go:131
msgid "View, download, upload, edit, add and delete."
msgstr ""
"Може переглядати, завантажувати, вивантажувати, редагувати, додавати та "
"видаляти файли."
#. UnifiedRole Manager, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:155
msgid "View, download, upload, edit, add, delete and manage members."
msgstr ""
"Може переглядати, завантажувати, вивантажувати, редагувати, додавати та "
"видаляти файли, а також може керувати учасниками."
#. UnifiedRoleListGrants Editor, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:119
msgid "View, download, upload, edit, add, delete and show all invited people."
msgstr ""
"Може переглядати, завантажувати, редагувати, додавати, видаляти та "
"показувати усіх запрошених людей."
#. UnifiedRole SpaseEditor, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:125
msgid "View, download, upload, edit, add, delete including the history."
msgstr ""
"Може переглядати, завантажувати, вивантажувати, редагувати, додавати та "
"видаляти файли, у тому числі їх історію."

View File

@@ -12,6 +12,7 @@ import (
storageprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/go-chi/chi/v5"
"github.com/jellydator/ttlcache/v3"
"github.com/nats-io/nats.go"
"go-micro.dev/v4/client"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/types/known/emptypb"
@@ -67,6 +68,7 @@ type Graph struct {
keycloakClient keycloak.Client
historyClient ehsvc.EventHistoryService
traceProvider trace.TracerProvider
natskv nats.KeyValue
}
// ServeHTTP implements the Service interface.

View File

@@ -15,6 +15,7 @@ import (
"github.com/go-chi/chi/v5/middleware"
ldapv3 "github.com/go-ldap/ldap/v3"
"github.com/jellydator/ttlcache/v3"
"github.com/nats-io/nats.go"
"github.com/riandyrn/otelchi"
microstore "go-micro.dev/v4/store"
@@ -153,6 +154,38 @@ func NewService(opts ...Option) (Graph, error) { //nolint:maintidx
identity.IdentityCacheWithGroupsTTL(time.Duration(options.Config.Spaces.GroupsCacheTTL)),
)
// Connect to NATS servers
natsOptions := nats.Options{
Servers: options.Config.Store.Nodes,
}
conn, err := natsOptions.Connect()
if err != nil {
return Graph{}, err
}
js, err := conn.JetStream()
if err != nil {
return Graph{}, err
}
kv, err := js.KeyValue(options.Config.Store.Database)
if err != nil {
if !errors.Is(err, nats.ErrBucketNotFound) {
return Graph{}, fmt.Errorf("Failed to get bucket (%s): %w", options.Config.Store.Database, err)
}
kv, err = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: options.Config.Store.Database,
})
if err != nil {
return Graph{}, fmt.Errorf("Failed to create bucket (%s): %w", options.Config.Store.Database, err)
}
}
if err != nil {
return Graph{}, err
}
baseGraphService := BaseGraphService{
logger: &options.Logger,
identityCache: identityCache,
@@ -198,6 +231,7 @@ func NewService(opts ...Option) (Graph, error) { //nolint:maintidx
historyClient: options.EventHistoryClient,
traceProvider: options.TraceProvider,
valueService: options.ValueService,
natskv: kv,
}
if err := setIdentityBackends(options, &svc); err != nil {

View File

@@ -2,6 +2,7 @@ package svc
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
@@ -22,12 +23,14 @@ import (
"github.com/go-chi/chi/v5"
"github.com/go-chi/render"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
libregraph "github.com/opencloud-eu/libre-graph-api-go"
settingsmsg "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/messages/settings/v0"
settingssvc "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/services/settings/v0"
"github.com/opencloud-eu/opencloud/services/graph/pkg/errorcode"
"github.com/opencloud-eu/opencloud/services/graph/pkg/identity"
"github.com/opencloud-eu/opencloud/services/graph/pkg/odata"
"github.com/opencloud-eu/opencloud/services/graph/pkg/userstate"
ocsettingssvc "github.com/opencloud-eu/opencloud/services/settings/pkg/service/v0"
"github.com/opencloud-eu/opencloud/services/settings/pkg/store/defaults"
revactx "github.com/opencloud-eu/reva/v2/pkg/ctx"
@@ -642,7 +645,30 @@ func (g Graph) DeleteUser(w http.ResponseWriter, r *http.Request) {
return
}
if g.config.UserSoftDeleteRetentionTime > 0 && purgeUser && user.GetAccountEnabled() {
us, err := g.getUserStateFromNatsKeyValue(r.Context(), userID)
if err != nil {
logger.Error().Err(err).Str("id", userID).Msg("could not get user state")
us = userstate.UserState{
UserId: userID,
State: userstate.UserStateUnspecified,
}
}
if us.State == userstate.UserStateHardDeleted {
logger.Debug().Str("id", userID).Msg("could not delete user: user already hard deleted")
errorcode.ItemNotFound.Render(w, r, http.StatusNotFound, "user not found")
return
}
if us.State == userstate.UserStateUnspecified {
if user.GetAccountEnabled() {
us.State = userstate.UserStateEnabled
} else {
us.State = userstate.UserStateSoftDeleted
}
}
if g.config.UserSoftDeleteRetentionTime > 0 && purgeUser && us.State == userstate.UserStateEnabled {
logger.Debug().Msg("could not delete user: purgeUser is set but user is still enabled")
errorcode.InvalidRequest.Render(w, r, http.StatusBadRequest, "user should be hard deleted, but is still enabled, please soft delete first")
return
@@ -684,7 +710,9 @@ func (g Graph) DeleteUser(w http.ResponseWriter, r *http.Request) {
return
}
for _, sp := range lspr.GetStorageSpaces() {
if !(sp.SpaceType == _spaceTypePersonal && sp.Owner.Id.OpaqueId == user.GetId()) {
// if the spacetype equals _spaceTypePersonal and the owner id equals the user id
// then we found the personal space of the user to be deleted
if !(sp.GetSpaceType() == _spaceTypePersonal && sp.Owner.GetId().GetOpaqueId() == user.GetId()) {
continue
}
// TODO: check if request contains a homespace and if, check if requesting user has the privilege to
@@ -706,7 +734,7 @@ func (g Graph) DeleteUser(w http.ResponseWriter, r *http.Request) {
}
}
// the space will if the system does not have a UserSoftDeleteRetentionTime configured, e.g. SoftDelete disabled
if g.config.UserSoftDeleteRetentionTime == 0 || (purgeUser && !user.GetAccountEnabled()) {
if g.config.UserSoftDeleteRetentionTime == 0 || (purgeUser && us.State == userstate.UserStateSoftDeleted) {
purgeSpaceFlag := utils.AppendPlainToOpaque(nil, "purge", "")
_, err := client.DeleteStorageSpace(r.Context(), &storageprovider.DeleteStorageSpaceRequest{
Opaque: purgeSpaceFlag,
@@ -725,24 +753,41 @@ func (g Graph) DeleteUser(w http.ResponseWriter, r *http.Request) {
}
}
if g.config.UserSoftDeleteRetentionTime == 0 || (purgeUser && !user.GetAccountEnabled()) {
if (g.config.UserSoftDeleteRetentionTime > 0 && us.State == userstate.UserStateSoftDeleted && purgeUser) ||
(g.config.UserSoftDeleteRetentionTime == 0) {
logger.Debug().Str("id", user.GetId()).Msg("calling delete user on backend")
err = g.identityBackend.DeleteUser(r.Context(), user.GetId())
if err != nil {
logger.Debug().Err(err).Msg("could not delete user: backend error")
errorcode.RenderError(w, r, err)
return
}
us.State = userstate.UserStateHardDeleted
err = g.setUserStateToNatsKeyValue(r.Context(), userID, us)
if err != nil {
logger.Error().Err(err).Str("id", userID).Msg("could not set user state")
errorcode.RenderError(w, r, err)
}
} else {
logger.Debug().Str("id", user.GetId()).Msg("calling soft delete user on backend")
userUpdate := *libregraph.NewUserUpdate()
userUpdate.AccountEnabled = libregraph.PtrBool(false)
us.State = userstate.UserStateSoftDeleted
us.RetentionPeriod = g.config.UserSoftDeleteRetentionTime
us.Reason = "User soft deleted via Graph API" // TODO: this needs a proper implementation through the request
us.TimeStamp = time.Now()
err = g.setUserStateToNatsKeyValue(r.Context(), userID, us)
if err != nil {
logger.Error().Err(err).Str("id", userID).Msg("could not set user state")
errorcode.RenderError(w, r, err)
return
}
g.identityBackend.UpdateUser(r.Context(), user.GetId(), userUpdate)
}
if g.config.UserSoftDeleteRetentionTime == 0 ||
(g.config.UserSoftDeleteRetentionTime > 0 && purgeUser && !user.GetAccountEnabled()) {
(g.config.UserSoftDeleteRetentionTime > 0 && purgeUser && us.State == userstate.UserStateSoftDeleted) {
e := events.UserDeleted{UserID: user.GetId()}
e.Executant = currentUser.GetId()
g.publishEvent(r.Context(), e)
@@ -1103,3 +1148,69 @@ func (g Graph) searchOCMAcceptedUsers(ctx context.Context, odataReq *godata.GoDa
}
return users, nil
}
// getUserStateFromNatsKeyValue gets the user state from the nats key value store.
func (g Graph) getUserStateFromNatsKeyValue(ctx context.Context, userID string) (userstate.UserState, error) {
logger := g.logger.SubloggerWithRequestID(ctx)
if g.natskv == nil {
logger.Debug().Msg("nats connection or user state key value store not configured")
return userstate.UserState{
UserId: userID,
State: userstate.UserStateUnspecified,
}, nil
}
entry, err := g.natskv.Get(userID)
if err != nil {
if errors.Is(err, nats.ErrKeyNotFound) {
logger.Debug().Str("userid", userID).Msg("no user state found in nats key value store")
return userstate.UserState{
UserId: userID,
State: userstate.UserStateUnspecified,
}, nil
}
logger.Error().Err(err).Str("userid", userID).Msg("error getting user state from nats key value store")
return userstate.UserState{
State: userstate.UserStateUnspecified,
}, err
}
userState := userstate.UserState{}
if err := json.Unmarshal(entry.Value(), &userState); err != nil {
logger.Error().Err(err).Str("userid", userID).Msg("error unmarshalling user state from nats key value store")
return userstate.UserState{
UserId: userID,
State: userstate.UserStateUnspecified,
}, err
}
return userState, nil
}
// setUserStateToNatsKeyValue sets the user state in the nats key value store.
func (g Graph) setUserStateToNatsKeyValue(ctx context.Context, userID string, us userstate.UserState) error {
logger := g.logger.SubloggerWithRequestID(ctx)
if ok, err := userstate.IsValidUserState(&us); !ok {
logger.Debug().Str("userid", userID).Msg("invalid user state")
return fmt.Errorf("invalid user state: %w", err)
}
if g.natskv == nil {
logger.Debug().Msg("nats connection or user state key value store not configured")
return nil
}
data, err := json.Marshal(us)
if err != nil {
logger.Error().Err(err).Str("userid", userID).Msg("error marshalling user state to nats key value store")
return err
}
if _, err := g.natskv.Put(userID, data); err != nil {
logger.Error().Err(err).Str("userid", userID).Msg("error putting user state to nats key value store")
return err
}
return nil
}

View File

@@ -0,0 +1,37 @@
package userstate
import (
"fmt"
"time"
)
const (
_ = iota
UserStateUnspecified
UserStateEnabled
UserStateDisabled
UserStateSoftDeleted
UserStateHardDeleted
)
// UserState represents the state of a user account.
// Note: This does not reflect state changes, these need to be red from the audit logs.
type UserState struct {
UserId string `json:"userid"`
State uint8 `json:"state"`
TimeStamp time.Time `json:"timestamp,omitempty"`
RetentionPeriod time.Duration `json:"retentionPeriod,omitempty"`
Reason string `json:"reason,omitempty,omitempty"`
}
func IsValidUserState(us *UserState) (bool, error) {
if us.State == UserStateSoftDeleted {
if us.RetentionPeriod <= 0 {
return false, fmt.Errorf("retention period must be greater than 0 for soft deleted users")
}
if us.Reason == "" {
return false, fmt.Errorf("reason must be provided for soft deleted users")
}
}
return true, nil
}

View File

@@ -272,12 +272,6 @@ func DefaultPolicies() []config.Policy {
Endpoint: "/auth-app/tokens",
Service: "eu.opencloud.web.auth-app",
},
{
Endpoint: "/wopi",
Service: "eu.opencloud.web.collaboration.Collabora",
Unprotected: true,
SkipXAccessToken: true,
},
},
},
}

View File

@@ -1,146 +0,0 @@
# SOME DESCRIPTIVE TITLE.
# Copyright (C) YEAR THE PACKAGE'S COPYRIGHT HOLDER
# This file is distributed under the same license as the PACKAGE package.
# FIRST AUTHOR <EMAIL@ADDRESS>, YEAR.
#
# Translators:
# LinkinWires <darkinsonic13@gmail.com>, 2025
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2025-09-26 00:01+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: LinkinWires <darkinsonic13@gmail.com>, 2025\n"
"Language-Team: Ukrainian (https://app.transifex.com/opencloud-eu/teams/204053/uk/)\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Language: uk\n"
"Plural-Forms: nplurals=4; plural=(n % 1 == 0 && n % 10 == 1 && n % 100 != 11 ? 0 : n % 1 == 0 && n % 10 >= 2 && n % 10 <= 4 && (n % 100 < 12 || n % 100 > 14) ? 1 : n % 1 == 0 && (n % 10 ==0 || (n % 10 >=5 && n % 10 <=9) || (n % 100 >=11 && n % 100 <=14 )) ? 2: 3);\n"
#. name of the notification option 'Space Shared'
#: pkg/store/defaults/templates.go:20
msgid "Added as space member"
msgstr "Додавання до простору у якості учасника (-ці)"
#. translation for the 'daily' email interval option
#: pkg/store/defaults/templates.go:50
msgid "Daily"
msgstr "Щоденно"
#. name of the notification option 'Email Interval'
#: pkg/store/defaults/templates.go:44
msgid "Email sending interval"
msgstr "Інтервал надсилання електронних листів"
#. name of the notification option 'File Rejected'
#: pkg/store/defaults/templates.go:40
msgid "File rejected"
msgstr "Відхилення файлу"
#. translation for the 'instant' email interval option
#: pkg/store/defaults/templates.go:48
msgid "Instant"
msgstr "Миттєво"
#. translation for the 'never' email interval option
#: pkg/store/defaults/templates.go:54
msgid "Never"
msgstr "Ніколи"
#. description of the notification option 'Space Shared'
#: pkg/store/defaults/templates.go:22
msgid "Notify when I have been added as a member to a space"
msgstr "Сповіщати, коли мене додають як учасника (-цю) до простору"
#. description of the notification option 'Space Unshared'
#: pkg/store/defaults/templates.go:26
msgid "Notify when I have been removed as member from a space"
msgstr "Сповіщати, коли мене видаляють з учасників простору"
#. description of the notification option 'Share Received'
#: pkg/store/defaults/templates.go:10
msgid "Notify when I have received a share"
msgstr "Сповіщати, коли зі мною поділилися файлом або папкою"
#. description of the notification option 'File Rejected'
#: pkg/store/defaults/templates.go:42
msgid ""
"Notify when a file I uploaded was rejected because of a virus infection or "
"policy violation"
msgstr ""
"Сповіщати, коли завантажений мною файл було відхилено через знайдений вірус "
"або порушення політик"
#. description of the notification option 'Share Removed'
#: pkg/store/defaults/templates.go:14
msgid "Notify when a received share has been removed"
msgstr "Сповіщати, коли зі мною більше не діляться файлом або папкою"
#. description of the notification option 'Share Expired'
#: pkg/store/defaults/templates.go:18
msgid "Notify when a received share has expired"
msgstr "Сповіщати, коли термін доступу до файлу або папки сплинув"
#. description of the notification option 'Space Deleted'
#: pkg/store/defaults/templates.go:38
msgid "Notify when a space I am member of has been deleted"
msgstr "Сповіщати, коли простір, учасником (-цею) якого я є, був видалений"
#. description of the notification option 'Space Disabled'
#: pkg/store/defaults/templates.go:34
msgid "Notify when a space I am member of has been disabled"
msgstr "Сповіщати, коли простір, учасником (-цею) якого я є, був вимкнений"
#. description of the notification option 'Space Membership Expired'
#: pkg/store/defaults/templates.go:30
msgid "Notify when a space membership has expired"
msgstr "Сповіщати, коли термін доступу до простору сплинув"
#. name of the notification option 'Space Unshared'
#: pkg/store/defaults/templates.go:24
msgid "Removed as space member"
msgstr "Видалення з простору"
#. description of the notification option 'Email Interval'
#: pkg/store/defaults/templates.go:46
msgid "Selected value:"
msgstr "Обране значення:"
#. name of the notification option 'Share Expired'
#: pkg/store/defaults/templates.go:16
msgid "Share Expired"
msgstr "Сплинення доступу до файлу / папки"
#. name of the notification option 'Share Received'
#: pkg/store/defaults/templates.go:8
msgid "Share Received"
msgstr "Надання доступу"
#. name of the notification option 'Share Removed'
#: pkg/store/defaults/templates.go:12
msgid "Share Removed"
msgstr "Видалення доступу"
#. name of the notification option 'Space Deleted'
#: pkg/store/defaults/templates.go:36
msgid "Space deleted"
msgstr "Видалення простору"
#. name of the notification option 'Space Disabled'
#: pkg/store/defaults/templates.go:32
msgid "Space disabled"
msgstr "Вимкнення простору"
#. name of the notification option 'Space Membership Expired'
#: pkg/store/defaults/templates.go:28
msgid "Space membership expired"
msgstr "Сплинення доступу до простору"
#. translation for the 'weekly' email interval option
#: pkg/store/defaults/templates.go:52
msgid "Weekly"
msgstr "Щотижнево"

View File

@@ -289,12 +289,13 @@ class GraphContext implements Context {
*/
public function adminDeletesUserUsingTheGraphApi(string $user, ?string $byUser = null): ResponseInterface {
$credentials = $this->getAdminOrUserCredentials($byUser);
return GraphHelper::deleteUser(
$userId = $this->featureContext->getAttributeOfCreatedUser($user, 'id');
return GraphHelper::deleteUserByUserId(
$this->featureContext->getBaseUrl(),
$this->featureContext->getStepLineRef(),
$credentials["username"],
$credentials["password"],
$user
$userId
);
}

View File

@@ -21,9 +21,6 @@ _testmain.go
*.exe
# Git backup files
*.orig
# Emacs
*~
\#*\#

View File

@@ -23,7 +23,7 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io
go get github.com/nats-io/nats.go@latest
# To get a specific version:
go get github.com/nats-io/nats.go@v1.46.0
go get github.com/nats-io/nats.go@v1.45.0
# Note that the latest major version for NATS Server is v2:
go get github.com/nats-io/nats-server/v2@latest

View File

@@ -1,22 +1,22 @@
module github.com/nats-io/nats.go
go 1.24.0
go 1.23.0
require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.18.0
github.com/nats-io/jwt/v2 v2.8.0
github.com/nats-io/nats-server/v2 v2.12.0
github.com/nats-io/jwt v1.2.2
github.com/nats-io/nats-server/v2 v2.11.2
github.com/nats-io/nkeys v0.4.11
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
)
require (
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op // indirect
github.com/google/go-tpm v0.9.5 // indirect
github.com/google/go-tpm v0.9.3 // indirect
github.com/minio/highwayhash v1.0.3 // indirect
golang.org/x/crypto v0.42.0 // indirect
golang.org/x/sys v0.36.0 // indirect
golang.org/x/time v0.13.0 // indirect
github.com/nats-io/jwt/v2 v2.7.4 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/time v0.11.0 // indirect
)

View File

@@ -12,27 +12,36 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-tpm v0.9.5 h1:ocUmnDebX54dnW+MQWGQRbdaAcJELsa6PqZhJ48KwVU=
github.com/google/go-tpm v0.9.5/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/go-tpm v0.9.3 h1:+yx0/anQuGzi+ssRqeD6WpXjW2L/V0dItUayO0i9sRc=
github.com/google/go-tpm v0.9.3/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.12.0 h1:OIwe8jZUqJFrh+hhiyKu8snNib66qsx806OslqJuo74=
github.com/nats-io/nats-server/v2 v2.12.0/go.mod h1:nr8dhzqkP5E/lDwmn+A2CvQPMd1yDKXQI7iGg3lAvww=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.11.2 h1:k5KBAuRpJW9qAF11Io2txNhR5m1KUmqVkalLAw2yLfk=
github.com/nats-io/nats-server/v2 v2.11.2/go.mod h1:6Z6Fd+JgckqzKig7DYwhgrE7bJ6fypPHnGPND+DqgMY=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/time v0.13.0 h1:eUlYslOIt32DgYD6utsuUeHs4d7AsEYLuIAdg7FlYgI=
golang.org/x/time v0.13.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0=
golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=

View File

@@ -314,10 +314,6 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
if resp.Error.ErrorCode == JSErrCodeMaximumConsumersLimit {
return nil, ErrMaximumConsumersLimit
}
return nil, resp.Error
}

View File

@@ -325,12 +325,6 @@ type (
// associating metadata on the consumer. This feature requires
// nats-server v2.10.0 or later.
Metadata map[string]string `json:"metadata,omitempty"`
// NamePrefix is an optional custom prefix for the consumer name.
// If provided, ordered consumer names will be generated as:
// {NamePrefix}_{sequence_number} (e.g., "custom_1", "custom_2").
// If not provided, a unique ID (NUID) will be used as the prefix.
NamePrefix string `json:"-"`
}
// DeliverPolicy determines from which point to start delivering messages.
@@ -368,11 +362,6 @@ const (
// restricting when a consumer will receive messages based on the number of
// pending messages or acks.
PriorityPolicyOverflow
// PriorityPolicyPrioritized is the priority policy that allows for the
// server to deliver messages to clients based on their priority (instead
// of round-robin). Requires nats-server v2.12.0 or later.
PriorityPolicyPrioritized
)
func (p *PriorityPolicy) UnmarshalJSON(data []byte) error {
@@ -383,8 +372,6 @@ func (p *PriorityPolicy) UnmarshalJSON(data []byte) error {
*p = PriorityPolicyPinned
case jsonString("overflow"):
*p = PriorityPolicyOverflow
case jsonString("prioritized"):
*p = PriorityPolicyPrioritized
default:
return fmt.Errorf("nats: can not unmarshal %q", data)
}
@@ -399,8 +386,6 @@ func (p PriorityPolicy) MarshalJSON() ([]byte, error) {
return json.Marshal("pinned_client")
case PriorityPolicyOverflow:
return json.Marshal("overflow")
case PriorityPolicyPrioritized:
return json.Marshal("prioritized")
}
return nil, fmt.Errorf("nats: unknown priority policy %v", p)
}

View File

@@ -43,28 +43,27 @@ type (
)
const (
JSErrCodeBadRequest ErrorCode = 10003
JSErrCodeConsumerCreate ErrorCode = 10012
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeMaximumConsumersLimit ErrorCode = 10026
JSErrCodeMessageNotFound ErrorCode = 10037
JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039
JSErrCodeJetStreamNotEnabled ErrorCode = 10076
JSErrCodeStreamNameInUse ErrorCode = 10058
JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamNameInUse ErrorCode = 10058
JSErrCodeStreamWrongLastSequence ErrorCode = 10071
JSErrCodeJetStreamNotEnabled ErrorCode = 10076
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeConsumerCreate ErrorCode = 10012
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeConsumerExists ErrorCode = 10148
JSErrCodeDuplicateFilterSubjects ErrorCode = 10136
JSErrCodeOverlappingFilterSubjects ErrorCode = 10138
JSErrCodeConsumerEmptyFilter ErrorCode = 10139
JSErrCodeConsumerExists ErrorCode = 10148
JSErrCodeConsumerDoesNotExist ErrorCode = 10149
JSErrCodeMessageNotFound ErrorCode = 10037
JSErrCodeBadRequest ErrorCode = 10003
JSErrCodeStreamWrongLastSequence ErrorCode = 10071
)
var (
@@ -143,10 +142,6 @@ var (
// creating consumer (e.g. illegal update).
ErrConsumerCreate JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerCreate, Description: "could not create consumer", Code: 500}}
// ErrMaximumConsumersLimit is returned when user limit of allowed
// consumers for stream is reached
ErrMaximumConsumersLimit JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMaximumConsumersLimit, Description: "maximum consumers limit reached", Code: 400}}
// ErrDuplicateFilterSubjects is returned when both FilterSubject and
// FilterSubjects are specified when creating consumer.
ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}}

View File

@@ -864,15 +864,11 @@ func (js *jetStream) OrderedConsumer(ctx context.Context, stream string, cfg Ord
if err := validateStreamName(stream); err != nil {
return nil, err
}
namePrefix := cfg.NamePrefix
if namePrefix == "" {
namePrefix = nuid.Next()
}
oc := &orderedConsumer{
js: js,
cfg: &cfg,
stream: stream,
namePrefix: namePrefix,
namePrefix: nuid.Next(),
doReset: make(chan struct{}, 1),
}
consCfg := oc.getConsumerConfig()

View File

@@ -14,7 +14,6 @@
package jetstream
import (
"context"
"fmt"
"time"
)
@@ -348,26 +347,6 @@ func (min PullMinAckPending) configureMessages(opts *consumeOpts) error {
return nil
}
// PullPrioritized sets the priority used when sending pull requests for consumer with
// PriorityPolicyPrioritized. Lower values indicate higher priority (0 is the
// highest priority). Maximum priority value is 9.
//
// If provided, PullPriorityGroup must be set as well and the consumer has to
// have PriorityPolicy set to PriorityPolicyPrioritized.
//
// PullPrioritized implements both PullConsumeOpt and PullMessagesOpt, allowing
// it to configure Consumer.Consume and Consumer.Messages.
type PullPrioritized uint8
func (p PullPrioritized) configureConsume(opts *consumeOpts) error {
opts.Priority = uint8(p)
return nil
}
func (p PullPrioritized) configureMessages(opts *consumeOpts) error {
opts.Priority = uint8(p)
return nil
}
// PullPriorityGroup sets the priority group for a consumer.
// It has to match one of the priority groups set on the consumer.
//
@@ -489,19 +468,6 @@ func FetchMinAckPending(min int64) FetchOpt {
}
}
// FetchPrioritized sets the priority used when sending fetch requests for consumer with
// PriorityPolicyPrioritized. Lower values indicate higher priority (0 is the
// highest priority). Maximum priority value is 9.
//
// If provided, FetchPriorityGroup must be set as well and the consumer has to
// have PriorityPolicy set to PriorityPolicyPrioritized.
func FetchPrioritized(priority uint8) FetchOpt {
return func(req *pullRequest) error {
req.Priority = priority
return nil
}
}
// FetchPriorityGroup sets the priority group for a consumer.
// It has to match one of the priority groups set on the consumer.
func FetchPriorityGroup(group string) FetchOpt {
@@ -520,7 +486,6 @@ func FetchMaxWait(timeout time.Duration) FetchOpt {
return fmt.Errorf("%w: timeout value must be greater than 0", ErrInvalidOption)
}
req.Expires = timeout
req.maxWaitSet = true
return nil
}
}
@@ -543,31 +508,6 @@ func FetchHeartbeat(hb time.Duration) FetchOpt {
}
}
// FetchContext sets a context for the Fetch operation.
// The Fetch operation will be canceled if the context is canceled.
// If the context has a deadline, it will be used to set expiry on pull request.
func FetchContext(ctx context.Context) FetchOpt {
return func(req *pullRequest) error {
req.ctx = ctx
// If context has a deadline, use it to set expiry
if deadline, ok := ctx.Deadline(); ok {
remaining := time.Until(deadline)
if remaining <= 0 {
return fmt.Errorf("%w: context deadline already exceeded", ErrInvalidOption)
}
// Use 90% of remaining time for server (capped at 1s)
buffer := time.Duration(float64(remaining) * 0.1)
if buffer > time.Second {
buffer = time.Second
}
req.Expires = remaining - buffer
}
return nil
}
}
// WithDeletedDetails can be used to display the information about messages
// deleted from a stream on a stream info request
func WithDeletedDetails(deletedDetails bool) StreamInfoOpt {
@@ -708,25 +648,3 @@ func WithStallWait(ttl time.Duration) PublishOpt {
return nil
}
}
type nextOptFunc func(*nextOpts)
func (fn nextOptFunc) configureNext(opts *nextOpts) {
fn(opts)
}
// NextMaxWait sets a timeout for the Next operation.
// If the timeout is reached before a message is available, a timeout error is returned.
func NextMaxWait(timeout time.Duration) NextOpt {
return nextOptFunc(func(opts *nextOpts) {
opts.timeout = timeout
})
}
// NextContext sets a context for the Next operation.
// The Next operation will be canceled if the context is canceled.
func NextContext(ctx context.Context) NextOpt {
return nextOptFunc(func(opts *nextOpts) {
opts.ctx = ctx
})
}

View File

@@ -256,11 +256,7 @@ type (
// removed by the TTL setting.
// It is required for per-key TTL to work and for watcher to notify
// about TTL expirations (both per key and per bucket)
LimitMarkerTTL time.Duration `json:"limit_marker_ttl,omitempty"`
// Metadata is a set of application-defined key-value pairs that can be
// used to store arbitrary metadata about the bucket.
Metadata map[string]string `json:"metadata,omitempty"`
LimitMarkerTTL time.Duration
}
// KeyLister is used to retrieve a list of key value store keys. It returns
@@ -320,9 +316,6 @@ type (
// LimitMarkerTTL is how long the bucket keeps markers when keys are
// removed by the TTL setting, 0 meaning markers are not supported.
LimitMarkerTTL() time.Duration
// Metadata returns the metadata associated with the bucket.
Metadata() map[string]string
}
// KeyWatcher is what is returned when doing a watch. It can be used to
@@ -674,7 +667,6 @@ func (js *jetStream) prepareKeyValueConfig(ctx context.Context, cfg KeyValueConf
Discard: DiscardNew,
AllowMsgTTL: allowMsgTTL,
SubjectDeleteMarkerTTL: subjectDeleteMarkerTTL,
Metadata: cfg.Metadata,
}
if cfg.Mirror != nil {
// Copy in case we need to make changes so we do not change caller's version.
@@ -821,11 +813,6 @@ func (s *KeyValueBucketStatus) LimitMarkerTTL() time.Duration {
return s.info.Config.SubjectDeleteMarkerTTL
}
// Metadata returns the metadata associated with the bucket.
func (s *KeyValueBucketStatus) Metadata() map[string]string {
return s.info.Config.Metadata
}
type kvLister struct {
kvs chan KeyValueStatus
kvNames chan string

View File

@@ -282,21 +282,10 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er
return sub, nil
}
func (s *orderedSubscription) Next(opts ...NextOpt) (Msg, error) {
func (s *orderedSubscription) Next() (Msg, error) {
for {
msg, err := s.consumer.currentSub.Next(opts...)
msg, err := s.consumer.currentSub.Next()
if err != nil {
// Check for errors which should be returned directly
// without resetting the consumer
if errors.Is(err, ErrInvalidOption) {
return nil, err
}
if errors.Is(err, nats.ErrTimeout) {
return nil, err
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return nil, err
}
if errors.Is(err, ErrMsgIteratorClosed) {
s.Stop()
return nil, err

View File

@@ -130,8 +130,6 @@ type (
// Domain is the domain the message was published to.
Domain string `json:"domain,omitempty"`
Value string `json:"val,omitempty"`
}
)

View File

@@ -14,7 +14,6 @@
package jetstream
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -35,11 +34,8 @@ type (
MessagesContext interface {
// Next retrieves next message on a stream. It will block until the next
// message is available. If the context is canceled, Next will return
// ErrMsgIteratorClosed error. An optional timeout or context can be
// provided using NextOpt options. If none are provided, Next will block
// indefinitely until a message is available, iterator is closed or a
// heartbeat error occurs.
Next(opts ...NextOpt) (Msg, error)
// ErrMsgIteratorClosed error.
Next() (Msg, error)
// Stop unsubscribes from the stream and cancels subscription. Calling
// Next after calling Stop will return ErrMsgIteratorClosed error.
@@ -96,18 +92,15 @@ type (
}
pullRequest struct {
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
MaxBytes int `json:"max_bytes,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
MinPending int64 `json:"min_pending,omitempty"`
MinAckPending int64 `json:"min_ack_pending,omitempty"`
PinID string `json:"id,omitempty"`
Group string `json:"group,omitempty"`
Priority uint8 `json:"priority,omitempty"`
ctx context.Context `json:"-"`
maxWaitSet bool `json:"-"`
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
MaxBytes int `json:"max_bytes,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
MinPending int64 `json:"min_pending,omitempty"`
MinAckPending int64 `json:"min_ack_pending,omitempty"`
PinID string `json:"id,omitempty"`
Group string `json:"group,omitempty"`
}
consumeOpts struct {
@@ -117,7 +110,6 @@ type (
LimitSize bool
MinPending int64
MinAckPending int64
Priority uint8
Group string
Heartbeat time.Duration
ErrHandler ConsumeErrHandler
@@ -175,16 +167,6 @@ type (
timer *time.Timer
sync.Mutex
}
// NextOpt is an option for configuring the behavior of MessagesContext.Next.
NextOpt interface {
configureNext(*nextOpts)
}
nextOpts struct {
timeout time.Duration
ctx context.Context
}
)
const (
@@ -332,7 +314,6 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
Heartbeat: consumeOpts.Heartbeat,
MinPending: consumeOpts.MinPending,
MinAckPending: consumeOpts.MinAckPending,
Priority: consumeOpts.Priority,
Group: consumeOpts.Group,
PinID: p.getPinID(),
}, subject); err != nil {
@@ -372,7 +353,6 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
Heartbeat: sub.consumeOpts.Heartbeat,
MinPending: sub.consumeOpts.MinPending,
MinAckPending: sub.consumeOpts.MinAckPending,
Priority: sub.consumeOpts.Priority,
Group: sub.consumeOpts.Group,
PinID: p.getPinID(),
}
@@ -403,7 +383,6 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
Heartbeat: sub.consumeOpts.Heartbeat,
MinPending: sub.consumeOpts.MinPending,
MinAckPending: sub.consumeOpts.MinAckPending,
Priority: sub.consumeOpts.Priority,
Group: sub.consumeOpts.Group,
PinID: p.getPinID(),
}
@@ -489,7 +468,6 @@ func (s *pullSubscription) checkPending() {
Group: s.consumeOpts.Group,
MinPending: s.consumeOpts.MinPending,
MinAckPending: s.consumeOpts.MinAckPending,
Priority: s.consumeOpts.Priority,
}
s.pending.msgCount = s.consumeOpts.MaxMessages
@@ -591,30 +569,7 @@ var (
// Next retrieves next message on a stream. It will block until the next
// message is available. If the context is canceled, Next will return
// ErrMsgIteratorClosed error.
func (s *pullSubscription) Next(opts ...NextOpt) (Msg, error) {
var nextOpts nextOpts
for _, opt := range opts {
opt.configureNext(&nextOpts)
}
if nextOpts.timeout > 0 && nextOpts.ctx != nil {
return nil, fmt.Errorf("%w: cannot specify both NextMaxWait and NextContext", ErrInvalidOption)
}
// Create timeout channel if needed
var timeoutCh <-chan time.Time
if nextOpts.timeout > 0 {
timer := time.NewTimer(nextOpts.timeout)
defer timer.Stop()
timeoutCh = timer.C
}
// Use context if provided
var ctxDone <-chan struct{}
if nextOpts.ctx != nil {
ctxDone = nextOpts.ctx.Done()
}
func (s *pullSubscription) Next() (Msg, error) {
s.Lock()
defer s.Unlock()
drainMode := s.draining.Load() == 1
@@ -705,10 +660,6 @@ func (s *pullSubscription) Next(opts ...NextOpt) (Msg, error) {
}
isConnected = false
}
case <-timeoutCh:
return nil, nats.ErrTimeout
case <-ctxDone:
return nil, nextOpts.ctx.Err()
}
}
}
@@ -828,11 +779,6 @@ func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error)
return nil, err
}
}
if req.ctx != nil && req.maxWaitSet {
return nil, fmt.Errorf("%w: cannot specify both FetchContext and FetchMaxWait", ErrInvalidOption)
}
// if heartbeat was not explicitly set, set it to 5 seconds for longer pulls
// and disable it for shorter pulls
if req.Heartbeat == unset {
@@ -862,11 +808,6 @@ func (p *pullConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch,
return nil, err
}
}
if req.ctx != nil && req.maxWaitSet {
return nil, fmt.Errorf("%w: cannot specify both FetchContext and FetchMaxWait", ErrInvalidOption)
}
// if heartbeat was not explicitly set, set it to 5 seconds for longer pulls
// and disable it for shorter pulls
if req.Heartbeat == unset {
@@ -921,13 +862,6 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) {
var receivedMsgs, receivedBytes int
hbTimer := sub.scheduleHeartbeatCheck(req.Heartbeat)
// Use context if provided
var ctxDone <-chan struct{}
if req.ctx != nil {
ctxDone = req.ctx.Done()
}
go func(res *fetchResult) {
defer sub.subscription.Unsubscribe()
defer close(res.msgs)
@@ -988,12 +922,6 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) {
res.done = true
res.Unlock()
return
case <-ctxDone:
res.Lock()
res.err = req.ctx.Err()
res.done = true
res.Unlock()
return
}
}
}(res)

View File

@@ -349,15 +349,11 @@ func (s *stream) UpdatePushConsumer(ctx context.Context, cfg ConsumerConfig) (Pu
// messages from a stream. Ordered consumers are ephemeral in-memory
// pull consumers and are resilient to deletes and restarts.
func (s *stream) OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig) (Consumer, error) {
namePrefix := cfg.NamePrefix
if namePrefix == "" {
namePrefix = nuid.Next()
}
oc := &orderedConsumer{
js: s.js,
cfg: &cfg,
stream: s.name,
namePrefix: namePrefix,
namePrefix: nuid.Next(),
doReset: make(chan struct{}, 1),
}
consCfg := oc.getConsumerConfig()
@@ -532,7 +528,6 @@ func (s *stream) getMsg(ctx context.Context, mreq *apiMsgGetRequest) (*RawStream
if err != nil {
return nil, err
}
var gmSubj string
// handle direct gets
@@ -603,7 +598,6 @@ func convertDirectGetMsgResponseToMsg(r *nats.Msg) (*RawStreamMsg, error) {
}
}
}
// Check for headers that give us the required information to
// reconstruct the message.
if len(r.Header) == 0 {

View File

@@ -201,18 +201,6 @@ type (
// Enables and sets a duration for adding server markers for delete, purge and max age limits.
// This feature requires nats-server v2.11.0 or later.
SubjectDeleteMarkerTTL time.Duration `json:"subject_delete_marker_ttl,omitempty"`
// AllowMsgCounter enables the feature
AllowMsgCounter bool `json:"allow_msg_counter"`
// AllowAtomicPublish allows atomic batch publishing into the stream.
AllowAtomicPublish bool `json:"allow_atomic,omitempty"`
// AllowMsgSchedules enables the scheduling of messages
AllowMsgSchedules bool `json:"allow_msg_schedules,omitempty"`
// PersistMode allows to opt-in to different persistence mode settings.
PersistMode PersistModeType `json:"persist_mode,omitempty"`
}
// StreamSourceInfo shows information about an upstream stream
@@ -288,25 +276,10 @@ type (
// Name is the name of the cluster.
Name string `json:"name,omitempty"`
// RaftGroup is the name of the Raft group managing the asset (in
// clustered environments).
RaftGroup string `json:"raft_group,omitempty"`
// Leader is the server name of the RAFT leader.
Leader string `json:"leader,omitempty"`
// LeaderSince is the time that it was elected as leader in RFC3339
// format, absent when not the leader.
LeaderSince *time.Time `json:"leader_since,omitempty"`
// SystemAcc indicates if the traffic_account is the system account.
// When true, replication traffic goes over the system account.
SystemAcc bool `json:"system_account,omitempty"`
// TrafficAcc is the account where the replication traffic goes over.
TrafficAcc string `json:"traffic_account,omitempty"`
// Replicas is the list of members of the RAFT cluster.
// Replicas is the list of members of the RAFT cluster
Replicas []*PeerInfo `json:"replicas,omitempty"`
}
@@ -434,9 +407,6 @@ type (
// StoreCompression determines how messages are compressed.
StoreCompression uint8
// PersistModeType determines what persistence mode the stream uses.
PersistModeType int
)
const (
@@ -468,16 +438,6 @@ const (
workQueuePolicyString = "workqueue"
)
const (
// DefaultPersistMode specifies the default persist mode. Writes to the stream will immediately be flushed.
// The publish acknowledgement will be sent after the persisting completes.
DefaultPersistMode = PersistModeType(iota)
// AsyncPersistMode specifies writes to the stream will be flushed asynchronously.
// The publish acknowledgement may be sent before the persisting completes.
// This means writes could be lost if they weren't flushed prior to a hard kill of the server.
AsyncPersistMode
)
func (rp RetentionPolicy) String() string {
switch rp {
case LimitsPolicy:
@@ -552,40 +512,6 @@ func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error {
return nil
}
func (pm PersistModeType) String() string {
switch pm {
case DefaultPersistMode:
return "Default"
case AsyncPersistMode:
return "Async"
default:
return "Unknown Persist Mode"
}
}
func (pm PersistModeType) MarshalJSON() ([]byte, error) {
switch pm {
case DefaultPersistMode:
return json.Marshal("default")
case AsyncPersistMode:
return json.Marshal("async")
default:
return nil, fmt.Errorf("nats: can not marshal %v", pm)
}
}
func (pm *PersistModeType) UnmarshalJSON(data []byte) error {
switch strings.ToLower(string(data)) {
case jsonString("default"):
*pm = DefaultPersistMode
case jsonString("async"):
*pm = AsyncPersistMode
default:
return fmt.Errorf("nats: can not unmarshal %q", data)
}
return nil
}
const (
// FileStorage specifies on disk storage. It's the default.
FileStorage StorageType = iota

View File

@@ -2836,10 +2836,6 @@ func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
if sub.jsi == nil || sub.jsi.consumer == _EMPTY_ {
if sub.jsi.ordered {
sub.mu.Unlock()
return nil, ErrConsumerInfoOnOrderedReset
}
sub.mu.Unlock()
return nil, ErrTypeSubscription
}

View File

@@ -150,9 +150,6 @@ var (
// ErrConsumerLeadershipChanged is returned when pending requests are no longer valid after leadership has changed
ErrConsumerLeadershipChanged JetStreamError = &jsError{message: "Leadership Changed"}
// ErrConsumerInfoOnOrderedReset is returned when attempting to fetch consumer info for an ordered consumer that is currently being recreated.
ErrConsumerInfoOnOrderedReset JetStreamError = &jsError{message: "cannot fetch consumer info; ordered consumer is being reset"}
// ErrNoHeartbeat is returned when no heartbeat is received from server when sending requests with pull consumer.
ErrNoHeartbeat JetStreamError = &jsError{message: "no heartbeat received"}

View File

@@ -1093,13 +1093,9 @@ type StreamState struct {
// ClusterInfo shows information about the underlying set of servers
// that make up the stream or consumer.
type ClusterInfo struct {
Name string `json:"name,omitempty"`
RaftGroup string `json:"raft_group,omitempty"`
Leader string `json:"leader,omitempty"`
LeaderSince *time.Time `json:"leader_since,omitempty"`
SystemAcc bool `json:"system_account,omitempty"`
TrafficAcc string `json:"traffic_account,omitempty"`
Replicas []*PeerInfo `json:"replicas,omitempty"`
Name string `json:"name,omitempty"`
Leader string `json:"leader,omitempty"`
Replicas []*PeerInfo `json:"replicas,omitempty"`
}
// PeerInfo shows information about all the peers in the cluster that

View File

@@ -48,7 +48,7 @@ import (
// Default Constants
const (
Version = "1.46.0"
Version = "1.45.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60

View File

@@ -106,7 +106,6 @@ type CapabilitiesCore struct {
PollInterval int `json:"pollinterval" xml:"pollinterval" mapstructure:"poll_interval"`
WebdavRoot string `json:"webdav-root,omitempty" xml:"webdav-root,omitempty" mapstructure:"webdav_root"`
Status *Status `json:"status" xml:"status"`
CheckForUpdates ocsBool `json:"check-for-updates" xml:"check-for-updates" mapstructure:"check_for_updates"`
SupportURLSigning ocsBool `json:"support-url-signing" xml:"support-url-signing" mapstructure:"support_url_signing"`
SupportSSE ocsBool `json:"support-sse" xml:"support-sse" mapstructure:"support_sse"`
}

View File

@@ -21,7 +21,6 @@ package tree
import (
"context"
"crypto/sha256"
"fmt"
"io"
"io/fs"
@@ -394,17 +393,6 @@ func (t *Tree) findSpaceId(path string) (string, error) {
return "", fmt.Errorf("could not find space for path %s", path)
}
func (t *Tree) generateTempNodeId(path string) string {
if len(path) < 240 {
return strings.ReplaceAll(strings.TrimPrefix(path, "/"), "/", "-")
} else {
// Use sha256 if path too long
pathHash := fmt.Sprintf("%x", sha256.Sum256([]byte(path)))[:240]
t.log.Info().Str("path", path).Msg("path too long, using sha256 as lock: " + pathHash)
return pathHash
}
}
func (t *Tree) assimilate(item scanItem) error {
t.log.Debug().Str("path", item.Path).Bool("recurse", item.Recurse).Msg("assimilate")
var err error
@@ -544,8 +532,7 @@ func (t *Tree) assimilate(item scanItem) error {
assimilationNode := &assimilationNode{
spaceID: spaceID,
// Use the path as the node ID (which is used for calculating the lock file path) since we do not have an ID yet
// In the case path is too long, we do sha256 and extract first 240 characters
nodeId: t.generateTempNodeId(item.Path),
nodeId: strings.ReplaceAll(strings.TrimPrefix(item.Path, "/"), "/", "-"),
}
unlock, err := t.lookup.MetadataBackend().Lock(assimilationNode)
if err != nil {

View File

@@ -6,7 +6,7 @@
// cancellation signals, and other request-scoped values across API boundaries
// and between processes.
// As of Go 1.7 this package is available in the standard library under the
// name [context].
// name [context], and migrating to it can be done automatically with [go fix].
//
// Incoming requests to a server should create a [Context], and outgoing
// calls to servers should accept a Context. The chain of function
@@ -38,6 +38,8 @@
//
// See https://go.dev/blog/context for example code for a server that uses
// Contexts.
//
// [go fix]: https://go.dev/cmd/go#hdr-Update_packages_to_use_new_APIs
package context
import (
@@ -49,37 +51,36 @@ import (
// API boundaries.
//
// Context's methods may be called by multiple goroutines simultaneously.
//
//go:fix inline
type Context = context.Context
// Canceled is the error returned by [Context.Err] when the context is canceled
// for some reason other than its deadline passing.
//
//go:fix inline
var Canceled = context.Canceled
// DeadlineExceeded is the error returned by [Context.Err] when the context is canceled
// due to its deadline passing.
//
//go:fix inline
var DeadlineExceeded = context.DeadlineExceeded
// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
//
//go:fix inline
func Background() Context { return context.Background() }
func Background() Context {
return background
}
// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter).
//
//go:fix inline
func TODO() Context { return context.TODO() }
func TODO() Context {
return todo
}
var (
background = context.Background()
todo = context.TODO()
)
// A CancelFunc tells an operation to abandon its work.
// A CancelFunc does not wait for the work to stop.
@@ -94,8 +95,6 @@ type CancelFunc = context.CancelFunc
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this [Context] complete.
//
//go:fix inline
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
return context.WithCancel(parent)
}
@@ -109,8 +108,6 @@ func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this [Context] complete.
//
//go:fix inline
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
return context.WithDeadline(parent, d)
}
@@ -125,8 +122,6 @@ func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
// defer cancel() // releases resources if slowOperation completes before timeout elapses
// return slowOperation(ctx)
// }
//
//go:fix inline
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return context.WithTimeout(parent, timeout)
}
@@ -144,8 +139,6 @@ func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
// interface{}, context keys often have concrete type
// struct{}. Alternatively, exported context key variables' static
// type should be a pointer or interface.
//
//go:fix inline
func WithValue(parent Context, key, val interface{}) Context {
return context.WithValue(parent, key, val)
}

View File

@@ -55,7 +55,7 @@ func configFromServer(h1 *http.Server, h2 *Server) http2Config {
PermitProhibitedCipherSuites: h2.PermitProhibitedCipherSuites,
CountError: h2.CountError,
}
fillNetHTTPConfig(&conf, h1.HTTP2)
fillNetHTTPServerConfig(&conf, h1)
setConfigDefaults(&conf, true)
return conf
}
@@ -81,7 +81,7 @@ func configFromTransport(h2 *Transport) http2Config {
}
if h2.t1 != nil {
fillNetHTTPConfig(&conf, h2.t1.HTTP2)
fillNetHTTPTransportConfig(&conf, h2.t1)
}
setConfigDefaults(&conf, false)
return conf
@@ -120,45 +120,3 @@ func adjustHTTP1MaxHeaderSize(n int64) int64 {
const typicalHeaders = 10 // conservative
return n + typicalHeaders*perFieldOverhead
}
func fillNetHTTPConfig(conf *http2Config, h2 *http.HTTP2Config) {
if h2 == nil {
return
}
if h2.MaxConcurrentStreams != 0 {
conf.MaxConcurrentStreams = uint32(h2.MaxConcurrentStreams)
}
if h2.MaxEncoderHeaderTableSize != 0 {
conf.MaxEncoderHeaderTableSize = uint32(h2.MaxEncoderHeaderTableSize)
}
if h2.MaxDecoderHeaderTableSize != 0 {
conf.MaxDecoderHeaderTableSize = uint32(h2.MaxDecoderHeaderTableSize)
}
if h2.MaxConcurrentStreams != 0 {
conf.MaxConcurrentStreams = uint32(h2.MaxConcurrentStreams)
}
if h2.MaxReadFrameSize != 0 {
conf.MaxReadFrameSize = uint32(h2.MaxReadFrameSize)
}
if h2.MaxReceiveBufferPerConnection != 0 {
conf.MaxUploadBufferPerConnection = int32(h2.MaxReceiveBufferPerConnection)
}
if h2.MaxReceiveBufferPerStream != 0 {
conf.MaxUploadBufferPerStream = int32(h2.MaxReceiveBufferPerStream)
}
if h2.SendPingTimeout != 0 {
conf.SendPingTimeout = h2.SendPingTimeout
}
if h2.PingTimeout != 0 {
conf.PingTimeout = h2.PingTimeout
}
if h2.WriteByteTimeout != 0 {
conf.WriteByteTimeout = h2.WriteByteTimeout
}
if h2.PermitProhibitedCipherSuites {
conf.PermitProhibitedCipherSuites = true
}
if h2.CountError != nil {
conf.CountError = h2.CountError
}
}

61
vendor/golang.org/x/net/http2/config_go124.go generated vendored Normal file
View File

@@ -0,0 +1,61 @@
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.24
package http2
import "net/http"
// fillNetHTTPServerConfig sets fields in conf from srv.HTTP2.
func fillNetHTTPServerConfig(conf *http2Config, srv *http.Server) {
fillNetHTTPConfig(conf, srv.HTTP2)
}
// fillNetHTTPTransportConfig sets fields in conf from tr.HTTP2.
func fillNetHTTPTransportConfig(conf *http2Config, tr *http.Transport) {
fillNetHTTPConfig(conf, tr.HTTP2)
}
func fillNetHTTPConfig(conf *http2Config, h2 *http.HTTP2Config) {
if h2 == nil {
return
}
if h2.MaxConcurrentStreams != 0 {
conf.MaxConcurrentStreams = uint32(h2.MaxConcurrentStreams)
}
if h2.MaxEncoderHeaderTableSize != 0 {
conf.MaxEncoderHeaderTableSize = uint32(h2.MaxEncoderHeaderTableSize)
}
if h2.MaxDecoderHeaderTableSize != 0 {
conf.MaxDecoderHeaderTableSize = uint32(h2.MaxDecoderHeaderTableSize)
}
if h2.MaxConcurrentStreams != 0 {
conf.MaxConcurrentStreams = uint32(h2.MaxConcurrentStreams)
}
if h2.MaxReadFrameSize != 0 {
conf.MaxReadFrameSize = uint32(h2.MaxReadFrameSize)
}
if h2.MaxReceiveBufferPerConnection != 0 {
conf.MaxUploadBufferPerConnection = int32(h2.MaxReceiveBufferPerConnection)
}
if h2.MaxReceiveBufferPerStream != 0 {
conf.MaxUploadBufferPerStream = int32(h2.MaxReceiveBufferPerStream)
}
if h2.SendPingTimeout != 0 {
conf.SendPingTimeout = h2.SendPingTimeout
}
if h2.PingTimeout != 0 {
conf.PingTimeout = h2.PingTimeout
}
if h2.WriteByteTimeout != 0 {
conf.WriteByteTimeout = h2.WriteByteTimeout
}
if h2.PermitProhibitedCipherSuites {
conf.PermitProhibitedCipherSuites = true
}
if h2.CountError != nil {
conf.CountError = h2.CountError
}
}

16
vendor/golang.org/x/net/http2/config_pre_go124.go generated vendored Normal file
View File

@@ -0,0 +1,16 @@
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !go1.24
package http2
import "net/http"
// Pre-Go 1.24 fallback.
// The Server.HTTP2 and Transport.HTTP2 config fields were added in Go 1.24.
func fillNetHTTPServerConfig(conf *http2Config, srv *http.Server) {}
func fillNetHTTPTransportConfig(conf *http2Config, tr *http.Transport) {}

View File

@@ -15,32 +15,21 @@ import (
"runtime"
"strconv"
"sync"
"sync/atomic"
)
var DebugGoroutines = os.Getenv("DEBUG_HTTP2_GOROUTINES") == "1"
// Setting DebugGoroutines to false during a test to disable goroutine debugging
// results in race detector complaints when a test leaves goroutines running before
// returning. Tests shouldn't do this, of course, but when they do it generally shows
// up as infrequent, hard-to-debug flakes. (See #66519.)
//
// Disable goroutine debugging during individual tests with an atomic bool.
// (Note that it's safe to enable/disable debugging mid-test, so the actual race condition
// here is harmless.)
var disableDebugGoroutines atomic.Bool
type goroutineLock uint64
func newGoroutineLock() goroutineLock {
if !DebugGoroutines || disableDebugGoroutines.Load() {
if !DebugGoroutines {
return 0
}
return goroutineLock(curGoroutineID())
}
func (g goroutineLock) check() {
if !DebugGoroutines || disableDebugGoroutines.Load() {
if !DebugGoroutines {
return
}
if curGoroutineID() != uint64(g) {
@@ -49,7 +38,7 @@ func (g goroutineLock) check() {
}
func (g goroutineLock) checkNotOn() {
if !DebugGoroutines || disableDebugGoroutines.Load() {
if !DebugGoroutines {
return
}
if curGoroutineID() == uint64(g) {

View File

@@ -15,6 +15,7 @@ package http2 // import "golang.org/x/net/http2"
import (
"bufio"
"context"
"crypto/tls"
"errors"
"fmt"
@@ -254,13 +255,15 @@ func (cw closeWaiter) Wait() {
// idle memory usage with many connections.
type bufferedWriter struct {
_ incomparable
conn net.Conn // immutable
bw *bufio.Writer // non-nil when data is buffered
byteTimeout time.Duration // immutable, WriteByteTimeout
group synctestGroupInterface // immutable
conn net.Conn // immutable
bw *bufio.Writer // non-nil when data is buffered
byteTimeout time.Duration // immutable, WriteByteTimeout
}
func newBufferedWriter(conn net.Conn, timeout time.Duration) *bufferedWriter {
func newBufferedWriter(group synctestGroupInterface, conn net.Conn, timeout time.Duration) *bufferedWriter {
return &bufferedWriter{
group: group,
conn: conn,
byteTimeout: timeout,
}
@@ -311,18 +314,24 @@ func (w *bufferedWriter) Flush() error {
type bufferedWriterTimeoutWriter bufferedWriter
func (w *bufferedWriterTimeoutWriter) Write(p []byte) (n int, err error) {
return writeWithByteTimeout(w.conn, w.byteTimeout, p)
return writeWithByteTimeout(w.group, w.conn, w.byteTimeout, p)
}
// writeWithByteTimeout writes to conn.
// If more than timeout passes without any bytes being written to the connection,
// the write fails.
func writeWithByteTimeout(conn net.Conn, timeout time.Duration, p []byte) (n int, err error) {
func writeWithByteTimeout(group synctestGroupInterface, conn net.Conn, timeout time.Duration, p []byte) (n int, err error) {
if timeout <= 0 {
return conn.Write(p)
}
for {
conn.SetWriteDeadline(time.Now().Add(timeout))
var now time.Time
if group == nil {
now = time.Now()
} else {
now = group.Now()
}
conn.SetWriteDeadline(now.Add(timeout))
nn, err := conn.Write(p[n:])
n += nn
if n == len(p) || nn == 0 || !errors.Is(err, os.ErrDeadlineExceeded) {
@@ -408,3 +417,14 @@ func (s *sorter) SortStrings(ss []string) {
// makes that struct also non-comparable, and generally doesn't add
// any size (as long as it's first).
type incomparable [0]func()
// synctestGroupInterface is the methods of synctestGroup used by Server and Transport.
// It's defined as an interface here to let us keep synctestGroup entirely test-only
// and not a part of non-test builds.
type synctestGroupInterface interface {
Join()
Now() time.Time
NewTimer(d time.Duration) timer
AfterFunc(d time.Duration, f func()) timer
ContextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc)
}

View File

@@ -176,6 +176,39 @@ type Server struct {
// so that we don't embed a Mutex in this struct, which will make the
// struct non-copyable, which might break some callers.
state *serverInternalState
// Synchronization group used for testing.
// Outside of tests, this is nil.
group synctestGroupInterface
}
func (s *Server) markNewGoroutine() {
if s.group != nil {
s.group.Join()
}
}
func (s *Server) now() time.Time {
if s.group != nil {
return s.group.Now()
}
return time.Now()
}
// newTimer creates a new time.Timer, or a synthetic timer in tests.
func (s *Server) newTimer(d time.Duration) timer {
if s.group != nil {
return s.group.NewTimer(d)
}
return timeTimer{time.NewTimer(d)}
}
// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
func (s *Server) afterFunc(d time.Duration, f func()) timer {
if s.group != nil {
return s.group.AfterFunc(d, f)
}
return timeTimer{time.AfterFunc(d, f)}
}
type serverInternalState struct {
@@ -390,9 +423,6 @@ func (o *ServeConnOpts) handler() http.Handler {
//
// The opts parameter is optional. If nil, default values are used.
func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
if opts == nil {
opts = &ServeConnOpts{}
}
s.serveConn(c, opts, nil)
}
@@ -408,7 +438,7 @@ func (s *Server) serveConn(c net.Conn, opts *ServeConnOpts, newf func(*serverCon
conn: c,
baseCtx: baseCtx,
remoteAddrStr: c.RemoteAddr().String(),
bw: newBufferedWriter(c, conf.WriteByteTimeout),
bw: newBufferedWriter(s.group, c, conf.WriteByteTimeout),
handler: opts.handler(),
streams: make(map[uint32]*stream),
readFrameCh: make(chan readFrameResult),
@@ -608,11 +638,11 @@ type serverConn struct {
pingSent bool
sentPingData [8]byte
goAwayCode ErrCode
shutdownTimer *time.Timer // nil until used
idleTimer *time.Timer // nil if unused
shutdownTimer timer // nil until used
idleTimer timer // nil if unused
readIdleTimeout time.Duration
pingTimeout time.Duration
readIdleTimer *time.Timer // nil if unused
readIdleTimer timer // nil if unused
// Owned by the writeFrameAsync goroutine:
headerWriteBuf bytes.Buffer
@@ -657,12 +687,12 @@ type stream struct {
flow outflow // limits writing from Handler to client
inflow inflow // what the client is allowed to POST/etc to us
state streamState
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
gotTrailerHeader bool // HEADER frame for trailers was seen
wroteHeaders bool // whether we wrote headers (not status 100)
readDeadline *time.Timer // nil if unused
writeDeadline *time.Timer // nil if unused
closeErr error // set before cw is closed
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
gotTrailerHeader bool // HEADER frame for trailers was seen
wroteHeaders bool // whether we wrote headers (not status 100)
readDeadline timer // nil if unused
writeDeadline timer // nil if unused
closeErr error // set before cw is closed
trailer http.Header // accumulated trailers
reqTrailer http.Header // handler's Request.Trailer
@@ -818,6 +848,7 @@ type readFrameResult struct {
// consumer is done with the frame.
// It's run on its own goroutine.
func (sc *serverConn) readFrames() {
sc.srv.markNewGoroutine()
gate := make(chan struct{})
gateDone := func() { gate <- struct{}{} }
for {
@@ -850,6 +881,7 @@ type frameWriteResult struct {
// At most one goroutine can be running writeFrameAsync at a time per
// serverConn.
func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest, wd *writeData) {
sc.srv.markNewGoroutine()
var err error
if wd == nil {
err = wr.write.writeFrame(sc)
@@ -933,22 +965,22 @@ func (sc *serverConn) serve(conf http2Config) {
sc.setConnState(http.StateIdle)
if sc.srv.IdleTimeout > 0 {
sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
sc.idleTimer = sc.srv.afterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
defer sc.idleTimer.Stop()
}
if conf.SendPingTimeout > 0 {
sc.readIdleTimeout = conf.SendPingTimeout
sc.readIdleTimer = time.AfterFunc(conf.SendPingTimeout, sc.onReadIdleTimer)
sc.readIdleTimer = sc.srv.afterFunc(conf.SendPingTimeout, sc.onReadIdleTimer)
defer sc.readIdleTimer.Stop()
}
go sc.readFrames() // closed by defer sc.conn.Close above
settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer)
settingsTimer := sc.srv.afterFunc(firstSettingsTimeout, sc.onSettingsTimer)
defer settingsTimer.Stop()
lastFrameTime := time.Now()
lastFrameTime := sc.srv.now()
loopNum := 0
for {
loopNum++
@@ -962,7 +994,7 @@ func (sc *serverConn) serve(conf http2Config) {
case res := <-sc.wroteFrameCh:
sc.wroteFrame(res)
case res := <-sc.readFrameCh:
lastFrameTime = time.Now()
lastFrameTime = sc.srv.now()
// Process any written frames before reading new frames from the client since a
// written frame could have triggered a new stream to be started.
if sc.writingFrameAsync {
@@ -1045,7 +1077,7 @@ func (sc *serverConn) handlePingTimer(lastFrameReadTime time.Time) {
}
pingAt := lastFrameReadTime.Add(sc.readIdleTimeout)
now := time.Now()
now := sc.srv.now()
if pingAt.After(now) {
// We received frames since arming the ping timer.
// Reset it for the next possible timeout.
@@ -1109,10 +1141,10 @@ func (sc *serverConn) readPreface() error {
errc <- nil
}
}()
timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server?
timer := sc.srv.newTimer(prefaceTimeout) // TODO: configurable on *Server?
defer timer.Stop()
select {
case <-timer.C:
case <-timer.C():
return errPrefaceTimeout
case err := <-errc:
if err == nil {
@@ -1128,21 +1160,6 @@ var errChanPool = sync.Pool{
New: func() interface{} { return make(chan error, 1) },
}
func getErrChan() chan error {
if inTests {
// Channels cannot be reused across synctest tests.
return make(chan error, 1)
} else {
return errChanPool.Get().(chan error)
}
}
func putErrChan(ch chan error) {
if !inTests {
errChanPool.Put(ch)
}
}
var writeDataPool = sync.Pool{
New: func() interface{} { return new(writeData) },
}
@@ -1150,7 +1167,7 @@ var writeDataPool = sync.Pool{
// writeDataFromHandler writes DATA response frames from a handler on
// the given stream.
func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error {
ch := getErrChan()
ch := errChanPool.Get().(chan error)
writeArg := writeDataPool.Get().(*writeData)
*writeArg = writeData{stream.id, data, endStream}
err := sc.writeFrameFromHandler(FrameWriteRequest{
@@ -1182,7 +1199,7 @@ func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStrea
return errStreamClosed
}
}
putErrChan(ch)
errChanPool.Put(ch)
if frameWriteDone {
writeDataPool.Put(writeArg)
}
@@ -1496,7 +1513,7 @@ func (sc *serverConn) goAway(code ErrCode) {
func (sc *serverConn) shutDownIn(d time.Duration) {
sc.serveG.check()
sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
sc.shutdownTimer = sc.srv.afterFunc(d, sc.onShutdownTimer)
}
func (sc *serverConn) resetStream(se StreamError) {
@@ -2101,7 +2118,7 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
// (in Go 1.8), though. That's a more sane option anyway.
if sc.hs.ReadTimeout > 0 {
sc.conn.SetReadDeadline(time.Time{})
st.readDeadline = time.AfterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
st.readDeadline = sc.srv.afterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
}
return sc.scheduleHandler(id, rw, req, handler)
@@ -2199,7 +2216,7 @@ func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream
st.flow.add(sc.initialStreamSendWindowSize)
st.inflow.init(sc.initialStreamRecvWindowSize)
if sc.hs.WriteTimeout > 0 {
st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
st.writeDeadline = sc.srv.afterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
}
sc.streams[id] = st
@@ -2388,6 +2405,7 @@ func (sc *serverConn) handlerDone() {
// Run on its own goroutine.
func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
sc.srv.markNewGoroutine()
defer sc.sendServeMsg(handlerDoneMsg)
didPanic := true
defer func() {
@@ -2436,7 +2454,7 @@ func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) erro
// waiting for this frame to be written, so an http.Flush mid-handler
// writes out the correct value of keys, before a handler later potentially
// mutates it.
errc = getErrChan()
errc = errChanPool.Get().(chan error)
}
if err := sc.writeFrameFromHandler(FrameWriteRequest{
write: headerData,
@@ -2448,7 +2466,7 @@ func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) erro
if errc != nil {
select {
case err := <-errc:
putErrChan(errc)
errChanPool.Put(errc)
return err
case <-sc.doneServing:
return errClientDisconnected
@@ -2555,7 +2573,7 @@ func (b *requestBody) Read(p []byte) (n int, err error) {
if err == io.EOF {
b.sawEOF = true
}
if b.conn == nil {
if b.conn == nil && inTests {
return
}
b.conn.noteBodyReadFromHandler(b.stream, n, err)
@@ -2684,7 +2702,7 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
var date string
if _, ok := rws.snapHeader["Date"]; !ok {
// TODO(bradfitz): be faster here, like net/http? measure.
date = time.Now().UTC().Format(http.TimeFormat)
date = rws.conn.srv.now().UTC().Format(http.TimeFormat)
}
for _, v := range rws.snapHeader["Trailer"] {
@@ -2806,7 +2824,7 @@ func (rws *responseWriterState) promoteUndeclaredTrailers() {
func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
st := w.rws.stream
if !deadline.IsZero() && deadline.Before(time.Now()) {
if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) {
// If we're setting a deadline in the past, reset the stream immediately
// so writes after SetWriteDeadline returns will fail.
st.onReadTimeout()
@@ -2822,9 +2840,9 @@ func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
if deadline.IsZero() {
st.readDeadline = nil
} else if st.readDeadline == nil {
st.readDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onReadTimeout)
st.readDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onReadTimeout)
} else {
st.readDeadline.Reset(deadline.Sub(time.Now()))
st.readDeadline.Reset(deadline.Sub(sc.srv.now()))
}
})
return nil
@@ -2832,7 +2850,7 @@ func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
func (w *responseWriter) SetWriteDeadline(deadline time.Time) error {
st := w.rws.stream
if !deadline.IsZero() && deadline.Before(time.Now()) {
if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) {
// If we're setting a deadline in the past, reset the stream immediately
// so writes after SetWriteDeadline returns will fail.
st.onWriteTimeout()
@@ -2848,9 +2866,9 @@ func (w *responseWriter) SetWriteDeadline(deadline time.Time) error {
if deadline.IsZero() {
st.writeDeadline = nil
} else if st.writeDeadline == nil {
st.writeDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onWriteTimeout)
st.writeDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onWriteTimeout)
} else {
st.writeDeadline.Reset(deadline.Sub(time.Now()))
st.writeDeadline.Reset(deadline.Sub(sc.srv.now()))
}
})
return nil
@@ -3129,7 +3147,7 @@ func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
method: opts.Method,
url: u,
header: cloneHeader(opts.Header),
done: getErrChan(),
done: errChanPool.Get().(chan error),
}
select {
@@ -3146,7 +3164,7 @@ func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
case <-st.cw:
return errStreamClosed
case err := <-msg.done:
putErrChan(msg.done)
errChanPool.Put(msg.done)
return err
}
}

20
vendor/golang.org/x/net/http2/timer.go generated vendored Normal file
View File

@@ -0,0 +1,20 @@
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import "time"
// A timer is a time.Timer, as an interface which can be replaced in tests.
type timer = interface {
C() <-chan time.Time
Reset(d time.Duration) bool
Stop() bool
}
// timeTimer adapts a time.Timer to the timer interface.
type timeTimer struct {
*time.Timer
}
func (t timeTimer) C() <-chan time.Time { return t.Timer.C }

View File

@@ -193,6 +193,50 @@ type Transport struct {
type transportTestHooks struct {
newclientconn func(*ClientConn)
group synctestGroupInterface
}
func (t *Transport) markNewGoroutine() {
if t != nil && t.transportTestHooks != nil {
t.transportTestHooks.group.Join()
}
}
func (t *Transport) now() time.Time {
if t != nil && t.transportTestHooks != nil {
return t.transportTestHooks.group.Now()
}
return time.Now()
}
func (t *Transport) timeSince(when time.Time) time.Duration {
if t != nil && t.transportTestHooks != nil {
return t.now().Sub(when)
}
return time.Since(when)
}
// newTimer creates a new time.Timer, or a synthetic timer in tests.
func (t *Transport) newTimer(d time.Duration) timer {
if t.transportTestHooks != nil {
return t.transportTestHooks.group.NewTimer(d)
}
return timeTimer{time.NewTimer(d)}
}
// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
func (t *Transport) afterFunc(d time.Duration, f func()) timer {
if t.transportTestHooks != nil {
return t.transportTestHooks.group.AfterFunc(d, f)
}
return timeTimer{time.AfterFunc(d, f)}
}
func (t *Transport) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) {
if t.transportTestHooks != nil {
return t.transportTestHooks.group.ContextWithTimeout(ctx, d)
}
return context.WithTimeout(ctx, d)
}
func (t *Transport) maxHeaderListSize() uint32 {
@@ -322,7 +366,7 @@ type ClientConn struct {
readerErr error // set before readerDone is closed
idleTimeout time.Duration // or 0 for never
idleTimer *time.Timer
idleTimer timer
mu sync.Mutex // guards following
cond *sync.Cond // hold mu; broadcast on flow/closed changes
@@ -490,12 +534,14 @@ func (cs *clientStream) closeReqBodyLocked() {
cs.reqBodyClosed = make(chan struct{})
reqBodyClosed := cs.reqBodyClosed
go func() {
cs.cc.t.markNewGoroutine()
cs.reqBody.Close()
close(reqBodyClosed)
}()
}
type stickyErrWriter struct {
group synctestGroupInterface
conn net.Conn
timeout time.Duration
err *error
@@ -505,7 +551,7 @@ func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
if *sew.err != nil {
return 0, *sew.err
}
n, err = writeWithByteTimeout(sew.conn, sew.timeout, p)
n, err = writeWithByteTimeout(sew.group, sew.conn, sew.timeout, p)
*sew.err = err
return n, err
}
@@ -604,9 +650,9 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
backoff := float64(uint(1) << (uint(retry) - 1))
backoff += backoff * (0.1 * mathrand.Float64())
d := time.Second * time.Duration(backoff)
tm := time.NewTimer(d)
tm := t.newTimer(d)
select {
case <-tm.C:
case <-tm.C():
t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
continue
case <-req.Context().Done():
@@ -653,7 +699,6 @@ var (
errClientConnUnusable = errors.New("http2: client conn not usable")
errClientConnNotEstablished = errors.New("http2: client conn could not be established")
errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
errClientConnForceClosed = errors.New("http2: client connection force closed via ClientConn.Close")
)
// shouldRetryRequest is called by RoundTrip when a request fails to get
@@ -793,11 +838,14 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
pingTimeout: conf.PingTimeout,
pings: make(map[[8]byte]chan struct{}),
reqHeaderMu: make(chan struct{}, 1),
lastActive: time.Now(),
lastActive: t.now(),
}
var group synctestGroupInterface
if t.transportTestHooks != nil {
t.markNewGoroutine()
t.transportTestHooks.newclientconn(cc)
c = cc.tconn
group = t.group
}
if VerboseLogs {
t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
@@ -809,6 +857,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
// TODO: adjust this writer size to account for frame size +
// MTU + crypto/tls record padding.
cc.bw = bufio.NewWriter(stickyErrWriter{
group: group,
conn: c,
timeout: conf.WriteByteTimeout,
err: &cc.werr,
@@ -857,7 +906,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
// Start the idle timer after the connection is fully initialized.
if d := t.idleConnTimeout(); d != 0 {
cc.idleTimeout = d
cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
cc.idleTimer = t.afterFunc(d, cc.onIdleTimeout)
}
go cc.readLoop()
@@ -868,7 +917,7 @@ func (cc *ClientConn) healthCheck() {
pingTimeout := cc.pingTimeout
// We don't need to periodically ping in the health check, because the readLoop of ClientConn will
// trigger the healthCheck again if there is no frame received.
ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
ctx, cancel := cc.t.contextWithTimeout(context.Background(), pingTimeout)
defer cancel()
cc.vlogf("http2: Transport sending health check")
err := cc.Ping(ctx)
@@ -1071,7 +1120,7 @@ func (cc *ClientConn) tooIdleLocked() bool {
// times are compared based on their wall time. We don't want
// to reuse a connection that's been sitting idle during
// VM/laptop suspend if monotonic time was also frozen.
return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && cc.t.timeSince(cc.lastIdle.Round(0)) > cc.idleTimeout
}
// onIdleTimeout is called from a time.AfterFunc goroutine. It will
@@ -1137,6 +1186,7 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error {
done := make(chan struct{})
cancelled := false // guarded by cc.mu
go func() {
cc.t.markNewGoroutine()
cc.mu.Lock()
defer cc.mu.Unlock()
for {
@@ -1207,7 +1257,8 @@ func (cc *ClientConn) closeForError(err error) {
//
// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
func (cc *ClientConn) Close() error {
cc.closeForError(errClientConnForceClosed)
err := errors.New("http2: client connection force closed via ClientConn.Close")
cc.closeForError(err)
return nil
}
@@ -1376,6 +1427,7 @@ func (cc *ClientConn) roundTrip(req *http.Request, streamf func(*clientStream))
//
// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
func (cs *clientStream) doRequest(req *http.Request, streamf func(*clientStream)) {
cs.cc.t.markNewGoroutine()
err := cs.writeRequest(req, streamf)
cs.cleanupWriteRequest(err)
}
@@ -1506,9 +1558,9 @@ func (cs *clientStream) writeRequest(req *http.Request, streamf func(*clientStre
var respHeaderTimer <-chan time.Time
var respHeaderRecv chan struct{}
if d := cc.responseHeaderTimeout(); d != 0 {
timer := time.NewTimer(d)
timer := cc.t.newTimer(d)
defer timer.Stop()
respHeaderTimer = timer.C
respHeaderTimer = timer.C()
respHeaderRecv = cs.respHeaderRecv
}
// Wait until the peer half-closes its end of the stream,
@@ -1701,7 +1753,7 @@ func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
// Return a fatal error which aborts the retry loop.
return errClientConnNotEstablished
}
cc.lastActive = time.Now()
cc.lastActive = cc.t.now()
if cc.closed || !cc.canTakeNewRequestLocked() {
return errClientConnUnusable
}
@@ -2040,10 +2092,10 @@ func (cc *ClientConn) forgetStreamID(id uint32) {
if len(cc.streams) != slen-1 {
panic("forgetting unknown stream id")
}
cc.lastActive = time.Now()
cc.lastActive = cc.t.now()
if len(cc.streams) == 0 && cc.idleTimer != nil {
cc.idleTimer.Reset(cc.idleTimeout)
cc.lastIdle = time.Now()
cc.lastIdle = cc.t.now()
}
// Wake up writeRequestBody via clientStream.awaitFlowControl and
// wake up RoundTrip if there is a pending request.
@@ -2069,6 +2121,7 @@ type clientConnReadLoop struct {
// readLoop runs in its own goroutine and reads and dispatches frames.
func (cc *ClientConn) readLoop() {
cc.t.markNewGoroutine()
rl := &clientConnReadLoop{cc: cc}
defer rl.cleanup()
cc.readerErr = rl.run()
@@ -2135,9 +2188,9 @@ func (rl *clientConnReadLoop) cleanup() {
if cc.idleTimeout > 0 && unusedWaitTime > cc.idleTimeout {
unusedWaitTime = cc.idleTimeout
}
idleTime := time.Now().Sub(cc.lastActive)
idleTime := cc.t.now().Sub(cc.lastActive)
if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime && !cc.closedOnIdle {
cc.idleTimer = time.AfterFunc(unusedWaitTime-idleTime, func() {
cc.idleTimer = cc.t.afterFunc(unusedWaitTime-idleTime, func() {
cc.t.connPool().MarkDead(cc)
})
} else {
@@ -2197,9 +2250,9 @@ func (rl *clientConnReadLoop) run() error {
cc := rl.cc
gotSettings := false
readIdleTimeout := cc.readIdleTimeout
var t *time.Timer
var t timer
if readIdleTimeout != 0 {
t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
t = cc.t.afterFunc(readIdleTimeout, cc.healthCheck)
}
for {
f, err := cc.fr.ReadFrame()
@@ -2945,6 +2998,7 @@ func (cc *ClientConn) Ping(ctx context.Context) error {
var pingError error
errc := make(chan struct{})
go func() {
cc.t.markNewGoroutine()
cc.wmu.Lock()
defer cc.wmu.Unlock()
if pingError = cc.fr.WritePing(false, p); pingError != nil {
@@ -3174,7 +3228,7 @@ func traceGotConn(req *http.Request, cc *ClientConn, reused bool) {
cc.mu.Lock()
ci.WasIdle = len(cc.streams) == 0 && reused
if ci.WasIdle && !cc.lastActive.IsZero() {
ci.IdleTime = time.Since(cc.lastActive)
ci.IdleTime = cc.t.timeSince(cc.lastActive)
}
cc.mu.Unlock()

12
vendor/modules.txt vendored
View File

@@ -1125,7 +1125,7 @@ github.com/nats-io/nats-server/v2/server/stree
github.com/nats-io/nats-server/v2/server/sysmem
github.com/nats-io/nats-server/v2/server/thw
github.com/nats-io/nats-server/v2/server/tpm
# github.com/nats-io/nats.go v1.46.0
# github.com/nats-io/nats.go v1.45.0
## explicit; go 1.23.0
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin
@@ -1319,7 +1319,7 @@ github.com/open-policy-agent/opa/v1/version
# github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76
## explicit; go 1.18
github.com/opencloud-eu/libre-graph-api-go
# github.com/opencloud-eu/reva/v2 v2.38.1-0.20250924125540-eaa2437c36b2
# github.com/opencloud-eu/reva/v2 v2.38.1-0.20250922152322-476bb1f0070a
## explicit; go 1.24.1
github.com/opencloud-eu/reva/v2/cmd/revad/internal/grace
github.com/opencloud-eu/reva/v2/cmd/revad/runtime
@@ -2371,8 +2371,8 @@ golang.org/x/exp/slices
golang.org/x/exp/slog
golang.org/x/exp/slog/internal
golang.org/x/exp/slog/internal/buffer
# golang.org/x/image v0.31.0
## explicit; go 1.24.0
# golang.org/x/image v0.30.0
## explicit; go 1.23.0
golang.org/x/image/bmp
golang.org/x/image/ccitt
golang.org/x/image/font
@@ -2388,8 +2388,8 @@ golang.org/x/image/vector
golang.org/x/mod/internal/lazyregexp
golang.org/x/mod/module
golang.org/x/mod/semver
# golang.org/x/net v0.44.0
## explicit; go 1.24.0
# golang.org/x/net v0.43.0
## explicit; go 1.23.0
golang.org/x/net/bpf
golang.org/x/net/context
golang.org/x/net/context/ctxhttp