Compare commits

...

11 Commits

Author SHA1 Message Date
Michael Barz
1cb980c307 fix: make dev docker builds possible with reva replace 2025-09-26 15:35:43 +02:00
Michael Barz
e7b7ceafd5 feat: make it possible to start the collaboration service in the single process (#1569)
* feat: make it possible to start the collaboration service in the single process

* feat: add proxy policy
2025-09-26 09:08:20 +02:00
opencloudeu
a2f59ce15b [tx] updated from transifex 2025-09-26 00:02:08 +00:00
dependabot[bot]
dc123fb11d build(deps): bump golang.org/x/net from 0.43.0 to 0.44.0
Bumps [golang.org/x/net](https://github.com/golang/net) from 0.43.0 to 0.44.0.
- [Commits](https://github.com/golang/net/compare/v0.43.0...v0.44.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-version: 0.44.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-09-25 17:47:14 +02:00
Jörn Friedrich Dreyer
a29f911272 let the runtime always create a cancel context
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
2025-09-25 16:49:40 +02:00
Jörn Friedrich Dreyer
79516892bd Merge pull request #1542 from opencloud-eu/non-blocking-collaborations-service
introduce AppURLs helper for atomic backgroud updates
2025-09-25 12:32:26 +02:00
Alex
5c39fd5e53 chore: add config for capability CheckForUpdates (#1556) 2025-09-25 10:19:33 +02:00
Viktor Scharf
59bc215f96 reva bump (#1555) 2025-09-24 18:06:33 +02:00
dependabot[bot]
a2f4106bca build(deps): bump golang.org/x/image from 0.30.0 to 0.31.0
Bumps [golang.org/x/image](https://github.com/golang/image) from 0.30.0 to 0.31.0.
- [Commits](https://github.com/golang/image/compare/v0.30.0...v0.31.0)

---
updated-dependencies:
- dependency-name: golang.org/x/image
  dependency-version: 0.31.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-09-24 17:26:58 +02:00
dependabot[bot]
865d4b6980 build(deps): bump github.com/nats-io/nats.go from 1.45.0 to 1.46.0
Bumps [github.com/nats-io/nats.go](https://github.com/nats-io/nats.go) from 1.45.0 to 1.46.0.
- [Release notes](https://github.com/nats-io/nats.go/releases)
- [Commits](https://github.com/nats-io/nats.go/compare/v1.45.0...v1.46.0)

---
updated-dependencies:
- dependency-name: github.com/nats-io/nats.go
  dependency-version: 1.46.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-09-24 17:26:25 +02:00
Jörn Friedrich Dreyer
314390f302 introduce AppURLs helper for atomic backgroud updates
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
2025-09-23 12:54:05 +02:00
50 changed files with 1230 additions and 444 deletions

8
go.mod
View File

@@ -57,7 +57,7 @@ require (
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.11.9
github.com/nats-io/nats.go v1.45.0
github.com/nats-io/nats.go v1.46.0
github.com/oklog/run v1.2.0
github.com/olekukonko/tablewriter v1.0.9
github.com/onsi/ginkgo v1.16.5
@@ -65,7 +65,7 @@ require (
github.com/onsi/gomega v1.38.2
github.com/open-policy-agent/opa v1.8.0
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76
github.com/opencloud-eu/reva/v2 v2.38.1-0.20250922152322-476bb1f0070a
github.com/opencloud-eu/reva/v2 v2.38.1-0.20250924125540-eaa2437c36b2
github.com/opensearch-project/opensearch-go/v4 v4.5.0
github.com/orcaman/concurrent-map v1.0.0
github.com/pkg/errors v0.9.1
@@ -104,8 +104,8 @@ require (
go.opentelemetry.io/otel/trace v1.38.0
golang.org/x/crypto v0.42.0
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac
golang.org/x/image v0.30.0
golang.org/x/net v0.43.0
golang.org/x/image v0.31.0
golang.org/x/net v0.44.0
golang.org/x/oauth2 v0.31.0
golang.org/x/sync v0.17.0
golang.org/x/term v0.35.0

16
go.sum
View File

@@ -885,8 +885,8 @@ github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.11.9 h1:k7nzHZjUf51W1b08xiQih63Rdxh0yr5O4K892Mx5gQA=
github.com/nats-io/nats-server/v2 v2.11.9/go.mod h1:1MQgsAQX1tVjpf3Yzrk3x2pzdsZiNL/TVP3Amhp3CR8=
github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA=
github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nats.go v1.46.0 h1:iUcX+MLT0HHXskGkz+Sg20sXrPtJLsOojMDTDzOHSb8=
github.com/nats-io/nats.go v1.46.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
@@ -930,8 +930,8 @@ github.com/opencloud-eu/go-micro-plugins/v4/store/nats-js-kv v0.0.0-202505121527
github.com/opencloud-eu/go-micro-plugins/v4/store/nats-js-kv v0.0.0-20250512152754-23325793059a/go.mod h1:pjcozWijkNPbEtX5SIQaxEW/h8VAVZYTLx+70bmB3LY=
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76 h1:vD/EdfDUrv4omSFjrinT8Mvf+8D7f9g4vgQ2oiDrVUI=
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76/go.mod h1:pzatilMEHZFT3qV7C/X3MqOa3NlRQuYhlRhZTL+hN6Q=
github.com/opencloud-eu/reva/v2 v2.38.1-0.20250922152322-476bb1f0070a h1:LwqYIxoBH26sWdk2xHzgEdWlqVnQJ2lC8MuwBMjkuC0=
github.com/opencloud-eu/reva/v2 v2.38.1-0.20250922152322-476bb1f0070a/go.mod h1:jykpTTQ0QWw0EzLop+6neuKCPccE0rj2asOwzls283U=
github.com/opencloud-eu/reva/v2 v2.38.1-0.20250924125540-eaa2437c36b2 h1:e3B6KbWMjloKpqoTwTwvBLoCETRyyCDkQsqwRQMUdxc=
github.com/opencloud-eu/reva/v2 v2.38.1-0.20250924125540-eaa2437c36b2/go.mod h1:8mGCM9tLIPsC5aEKS022Z5u89u6jKuOl0znK0gNFReM=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040=
@@ -1343,8 +1343,8 @@ golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac/go.mod h1:hH+7mtFmImwwcMvScy
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/image v0.18.0/go.mod h1:4yyo5vMFQjVjUcVk4jEQcU9MGy/rulF5WvUILseCM2E=
golang.org/x/image v0.30.0 h1:jD5RhkmVAnjqaCUXfbGBrn3lpxbknfN9w2UhHHU+5B4=
golang.org/x/image v0.30.0/go.mod h1:SAEUTxCCMWSrJcCy/4HwavEsfZZJlYxeHLc6tTiAe/c=
golang.org/x/image v0.31.0 h1:mLChjE2MV6g1S7oqbXC0/UcKijjm5fnJLUYKIYrLESA=
golang.org/x/image v0.31.0/go.mod h1:R9ec5Lcp96v9FTF+ajwaH3uGxPH4fKfHHAVbUILxghA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -1424,8 +1424,8 @@ golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I=
golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=

View File

@@ -28,7 +28,7 @@ dev-docker-multiarch:
docker buildx rm opencloudbuilder || true
docker buildx create --platform linux/arm64,linux/amd64 --name opencloudbuilder
docker buildx use opencloudbuilder
cd .. && docker buildx build --platform linux/arm64,linux/amd64 --output type=docker --file opencloud/docker/Dockerfile.multiarch --tag opencloudeu/opencloud:dev-multiarch .
docker buildx build --platform linux/arm64,linux/amd64 --output type=docker --file docker/Dockerfile.multiarch --tag opencloudeu/opencloud:dev-multiarch ../..
docker buildx rm opencloudbuilder
.PHONY: debug-docker

View File

@@ -7,11 +7,11 @@ ARG STRING
RUN apk add bash make git curl gcc musl-dev libc-dev binutils-gold inotify-tools vips-dev
WORKDIR /opencloud
RUN --mount=type=bind,target=/opencloud \
RUN --mount=type=bind,target=/opencloud,rw\
--mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache \
GOOS="${TARGETOS:-linux}" GOARCH="${TARGETARCH:-amd64}" ; \
make -C opencloud release-linux-docker-${TARGETARCH} ENABLE_VIPS=true DIST=/dist
cd opencloud && make -C opencloud release-linux-docker-${TARGETARCH} ENABLE_VIPS=true DIST=/dist
FROM alpine:3.21
ARG VERSION

View File

@@ -7,7 +7,6 @@ import (
"net"
"net/http"
"net/rpc"
"os/signal"
"sort"
"strings"
"sync"
@@ -31,6 +30,7 @@ import (
authmachine "github.com/opencloud-eu/opencloud/services/auth-machine/pkg/command"
authservice "github.com/opencloud-eu/opencloud/services/auth-service/pkg/command"
clientlog "github.com/opencloud-eu/opencloud/services/clientlog/pkg/command"
collaboration "github.com/opencloud-eu/opencloud/services/collaboration/pkg/command"
eventhistory "github.com/opencloud-eu/opencloud/services/eventhistory/pkg/command"
frontend "github.com/opencloud-eu/opencloud/services/frontend/pkg/command"
gateway "github.com/opencloud-eu/opencloud/services/gateway/pkg/command"
@@ -329,6 +329,11 @@ func NewService(ctx context.Context, options ...Option) (*Service, error) {
cfg.Audit.Commons = cfg.Commons
return audit.Execute(cfg.Audit)
})
areg(opts.Config.Collaboration.Service.Name, func(ctx context.Context, cfg *occfg.Config) error {
cfg.Collaboration.Context = ctx
cfg.Collaboration.Commons = cfg.Commons
return collaboration.Execute(cfg.Collaboration)
})
areg(opts.Config.Policies.Service.Name, func(ctx context.Context, cfg *occfg.Config) error {
cfg.Policies.Context = ctx
cfg.Policies.Commons = cfg.Commons
@@ -360,12 +365,9 @@ func Start(ctx context.Context, o ...Option) error {
return err
}
// cancel the context when a signal is received.
var cancel context.CancelFunc
if ctx == nil {
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}
// create a context that will be cancelled when too many backoff cycles on one of the services happens
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// tolerance controls backoff cycles from the supervisor.
tolerance := 5

View File

@@ -69,16 +69,26 @@ func Server(cfg *config.Config) *cli.Command {
return err
}
appUrls, err := helpers.GetAppURLs(cfg, logger)
if err != nil {
return err
}
// use the AppURLs helper (an atomic pointer) to fetch and store the app URLs
// this is required as the app URLs are fetched periodically in the background
// and read when handling requests
appURLs := helpers.NewAppURLs()
ticker := time.NewTicker(cfg.CS3Api.APPRegistrationInterval)
defer ticker.Stop()
go func() {
for ; true; <-ticker.C {
if err := helpers.RegisterAppProvider(ctx, cfg, logger, gatewaySelector, appUrls); err != nil {
// fetch and store the app URLs
v, err := helpers.GetAppURLs(cfg, logger)
if err != nil {
logger.Warn().Err(err).Msg("Failed to get app URLs")
// empty map to clear previous URLs
v = make(map[string]map[string]string)
}
appURLs.Store(v)
// register the app provider
if err := helpers.RegisterAppProvider(ctx, cfg, logger, gatewaySelector, appURLs); err != nil {
logger.Warn().Err(err).Msg("Failed to register app provider")
}
}
@@ -97,7 +107,7 @@ func Server(cfg *config.Config) *cli.Command {
// start GRPC server
grpcServer, teardown, err := grpc.Server(
grpc.AppURLs(appUrls),
grpc.AppURLs(appURLs),
grpc.Config(cfg),
grpc.Logger(logger),
grpc.TraceProvider(traceProvider),

View File

@@ -6,13 +6,75 @@ import (
"net/http"
"net/url"
"strings"
"sync/atomic"
"github.com/beevik/etree"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/collaboration/pkg/config"
"github.com/opencloud-eu/reva/v2/pkg/mime"
"github.com/pkg/errors"
)
// AppURLs holds the app urls fetched from the WOPI app discovery endpoint
// It is a type safe wrapper around an atomic pointer to a map
type AppURLs struct {
urls atomic.Pointer[map[string]map[string]string]
}
func NewAppURLs() *AppURLs {
a := &AppURLs{}
a.urls.Store(&map[string]map[string]string{})
return a
}
func (a *AppURLs) Store(urls map[string]map[string]string) {
a.urls.Store(&urls)
}
func (a *AppURLs) GetMimeTypes() []string {
currentURLs := a.urls.Load()
if currentURLs == nil {
return []string{}
}
mimeTypesMap := make(map[string]bool)
for _, extensions := range *currentURLs {
for ext := range extensions {
m := mime.Detect(false, ext)
// skip the default
if m == "application/octet-stream" {
continue
}
mimeTypesMap[m] = true
}
}
// Convert map to slice
mimeTypes := make([]string, 0, len(mimeTypesMap))
for mimeType := range mimeTypesMap {
mimeTypes = append(mimeTypes, mimeType)
}
return mimeTypes
}
// GetAppURLFor gets the appURL from the list of appURLs based on the
// action and file extension provided. If there is no match, an empty
// string will be returned.
func (a *AppURLs) GetAppURLFor(action, fileExt string) string {
currentURLs := a.urls.Load()
if currentURLs == nil {
return ""
}
if actionURL, ok := (*currentURLs)[action]; ok {
if actionExtensionURL, ok := actionURL[fileExt]; ok {
return actionExtensionURL
}
}
return ""
}
// GetAppURLs gets the edit and view urls for different file types from the
// target WOPI app (onlyoffice, collabora, etc) via their "/hosting/discovery"
// endpoint.
@@ -30,10 +92,6 @@ func GetAppURLs(cfg *config.Config, logger log.Logger) (map[string]map[string]st
httpResp, err := httpClient.Get(wopiAppUrl)
if err != nil {
logger.Error().
Err(err).
Str("WopiAppUrl", wopiAppUrl).
Msg("WopiDiscovery: failed to access wopi app url")
return nil, err
}

View File

@@ -12,6 +12,265 @@ import (
"github.com/opencloud-eu/opencloud/services/collaboration/pkg/helpers"
)
var _ = Describe("AppURLs", func() {
var appURLs *helpers.AppURLs
BeforeEach(func() {
appURLs = helpers.NewAppURLs()
})
Describe("NewAppURLs", func() {
It("should create a new AppURLs instance with empty map", func() {
Expect(appURLs).NotTo(BeNil())
Expect(appURLs.GetMimeTypes()).To(BeEmpty())
Expect(appURLs.GetAppURLFor("view", ".pdf")).To(BeEmpty())
})
})
Describe("Store and GetAppURLFor", func() {
It("should store and retrieve app URLs correctly", func() {
testURLs := map[string]map[string]string{
"view": {
".pdf": "https://example.com/view/pdf",
".docx": "https://example.com/view/docx",
".xlsx": "https://example.com/view/xlsx",
},
"edit": {
".docx": "https://example.com/edit/docx",
".xlsx": "https://example.com/edit/xlsx",
},
}
appURLs.Store(testURLs)
// Test successful lookups
Expect(appURLs.GetAppURLFor("view", ".pdf")).To(Equal("https://example.com/view/pdf"))
Expect(appURLs.GetAppURLFor("view", ".docx")).To(Equal("https://example.com/view/docx"))
Expect(appURLs.GetAppURLFor("edit", ".docx")).To(Equal("https://example.com/edit/docx"))
})
It("should return empty string for non-existent action", func() {
testURLs := map[string]map[string]string{
"view": {".pdf": "https://example.com/view/pdf"},
}
appURLs.Store(testURLs)
Expect(appURLs.GetAppURLFor("nonexistent", ".pdf")).To(BeEmpty())
})
It("should return empty string for non-existent extension", func() {
testURLs := map[string]map[string]string{
"view": {".pdf": "https://example.com/view/pdf"},
}
appURLs.Store(testURLs)
Expect(appURLs.GetAppURLFor("view", ".nonexistent")).To(BeEmpty())
})
It("should handle empty maps gracefully", func() {
emptyURLs := map[string]map[string]string{}
appURLs.Store(emptyURLs)
Expect(appURLs.GetAppURLFor("view", ".pdf")).To(BeEmpty())
})
It("should handle nil action maps gracefully", func() {
testURLs := map[string]map[string]string{
"view": nil,
}
appURLs.Store(testURLs)
Expect(appURLs.GetAppURLFor("view", ".pdf")).To(BeEmpty())
})
})
Describe("GetMimeTypes", func() {
It("should return empty slice for empty AppURLs", func() {
mimeTypes := appURLs.GetMimeTypes()
Expect(mimeTypes).To(BeEmpty())
})
It("should return correct mime types for known extensions", func() {
testURLs := map[string]map[string]string{
"view": {
".pdf": "https://example.com/view/pdf",
".docx": "https://example.com/view/docx",
".xlsx": "https://example.com/view/xlsx",
".pptx": "https://example.com/view/pptx",
},
"edit": {
".txt": "https://example.com/edit/txt",
".html": "https://example.com/edit/html",
},
}
appURLs.Store(testURLs)
mimeTypes := appURLs.GetMimeTypes()
// Should contain expected mime types (order doesn't matter)
Expect(mimeTypes).To(ContainElements(
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
"text/plain",
"text/html",
))
// Should not contain application/octet-stream (filtered out)
Expect(mimeTypes).NotTo(ContainElement("application/octet-stream"))
})
It("should deduplicate mime types across actions", func() {
testURLs := map[string]map[string]string{
"view": {
".pdf": "https://example.com/view/pdf",
".txt": "https://example.com/view/txt",
},
"edit": {
".pdf": "https://example.com/edit/pdf", // Same extension as view
".txt": "https://example.com/edit/txt", // Same extension as view
},
}
appURLs.Store(testURLs)
mimeTypes := appURLs.GetMimeTypes()
// Should only have unique mime types
Expect(mimeTypes).To(ContainElements("application/pdf", "text/plain"))
Expect(len(mimeTypes)).To(Equal(2)) // No duplicates
})
It("should filter out application/octet-stream mime types", func() {
testURLs := map[string]map[string]string{
"view": {
".pdf": "https://example.com/view/pdf",
".unknown": "https://example.com/view/unknown", // This might return application/octet-stream
".fake-ext": "https://example.com/view/fake", // This might return application/octet-stream
},
}
appURLs.Store(testURLs)
mimeTypes := appURLs.GetMimeTypes()
// Should contain PDF but not octet-stream
Expect(mimeTypes).To(ContainElement("application/pdf"))
Expect(mimeTypes).NotTo(ContainElement("application/octet-stream"))
})
It("should handle empty extension maps", func() {
testURLs := map[string]map[string]string{
"view": {},
"edit": {},
}
appURLs.Store(testURLs)
mimeTypes := appURLs.GetMimeTypes()
Expect(mimeTypes).To(BeEmpty())
})
It("should handle nil extension maps", func() {
testURLs := map[string]map[string]string{
"view": nil,
"edit": nil,
}
appURLs.Store(testURLs)
mimeTypes := appURLs.GetMimeTypes()
Expect(mimeTypes).To(BeEmpty())
})
})
Describe("Concurrent Access", func() {
It("should handle concurrent reads and writes safely", func() {
// This is a basic smoke test for concurrent access
// In practice, you'd want more sophisticated race testing
initialURLs := map[string]map[string]string{
"view": {".pdf": "https://example.com/view/pdf"},
}
appURLs.Store(initialURLs)
done := make(chan bool, 10)
// Start multiple readers
for i := 0; i < 5; i++ {
go func() {
defer GinkgoRecover()
for j := 0; j < 100; j++ {
_ = appURLs.GetAppURLFor("view", ".pdf")
_ = appURLs.GetMimeTypes()
}
done <- true
}()
}
// Start multiple writers
for i := 0; i < 5; i++ {
go func(id int) {
defer GinkgoRecover()
for j := 0; j < 100; j++ {
newURLs := map[string]map[string]string{
"view": {".pdf": "https://example.com/updated/pdf"},
}
appURLs.Store(newURLs)
}
done <- true
}(i)
}
// Wait for all goroutines to complete
for i := 0; i < 10; i++ {
<-done
}
// Should still be functional after concurrent access
Expect(appURLs.GetAppURLFor("view", ".pdf")).NotTo(BeEmpty())
})
})
Describe("Real-world scenarios", func() {
It("should handle realistic WOPI discovery data", func() {
// Based on the test data from the discovery tests
realisticURLs := map[string]map[string]string{
"view": {
".pdf": "https://cloud.opencloud.test/hosting/wopi/word/view",
".djvu": "https://cloud.opencloud.test/hosting/wopi/word/view",
".docx": "https://cloud.opencloud.test/hosting/wopi/word/view",
".xls": "https://cloud.opencloud.test/hosting/wopi/cell/view",
".xlsb": "https://cloud.opencloud.test/hosting/wopi/cell/view",
},
"edit": {
".docx": "https://cloud.opencloud.test/hosting/wopi/word/edit",
},
}
appURLs.Store(realisticURLs)
// Test specific lookups
Expect(appURLs.GetAppURLFor("view", ".pdf")).To(Equal("https://cloud.opencloud.test/hosting/wopi/word/view"))
Expect(appURLs.GetAppURLFor("edit", ".docx")).To(Equal("https://cloud.opencloud.test/hosting/wopi/word/edit"))
Expect(appURLs.GetAppURLFor("edit", ".pdf")).To(BeEmpty()) // No edit for PDF
// Test mime types
mimeTypes := appURLs.GetMimeTypes()
Expect(mimeTypes).To(ContainElements(
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.ms-excel",
))
})
})
})
var _ = Describe("Discovery", func() {
var (
discoveryContent1 string

View File

@@ -7,7 +7,6 @@ import (
registryv1beta1 "github.com/cs3org/go-cs3apis/cs3/app/registry/v1beta1"
gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
"github.com/opencloud-eu/reva/v2/pkg/mime"
"github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool"
"github.com/opencloud-eu/opencloud/pkg/log"
@@ -35,24 +34,9 @@ func RegisterAppProvider(
cfg *config.Config,
logger log.Logger,
gws pool.Selectable[gatewayv1beta1.GatewayAPIClient],
appUrls map[string]map[string]string,
appUrls *AppURLs,
) error {
mimeTypesMap := make(map[string]bool)
for _, extensions := range appUrls {
for ext := range extensions {
m := mime.Detect(false, ext)
// skip the default
if m == "application/octet-stream" {
continue
}
mimeTypesMap[m] = true
}
}
mimeTypes := make([]string, 0, len(mimeTypesMap))
for m := range mimeTypesMap {
mimeTypes = append(mimeTypes, m)
}
mimeTypes := appUrls.GetMimeTypes()
logger.Debug().
Str("AppName", cfg.App.Name).

View File

@@ -5,6 +5,7 @@ import (
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/collaboration/pkg/config"
"github.com/opencloud-eu/opencloud/services/collaboration/pkg/helpers"
microstore "go-micro.dev/v4/store"
"go.opentelemetry.io/otel/trace"
)
@@ -14,7 +15,7 @@ type Option func(o *Options)
// Options defines the available options for this package.
type Options struct {
AppURLs map[string]map[string]string
AppURLs *helpers.AppURLs
Name string
Logger log.Logger
Context context.Context
@@ -35,7 +36,7 @@ func newOptions(opts ...Option) Options {
}
// AppURLs provides app urls based on mimetypes.
func AppURLs(val map[string]map[string]string) Option {
func AppURLs(val *helpers.AppURLs) Option {
return func(o *Options) {
o.AppURLs = val
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/collaboration/pkg/config"
"github.com/opencloud-eu/opencloud/services/collaboration/pkg/helpers"
)
// Option defines a single option function.
@@ -16,7 +17,7 @@ type Option func(o *Options)
type Options struct {
Logger log.Logger
Config *config.Config
AppURLs map[string]map[string]string
AppURLs *helpers.AppURLs
GatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient]
Store microstore.Store
}
@@ -47,7 +48,7 @@ func Config(val *config.Config) Option {
}
// AppURLs provides a function to set the AppURLs option.
func AppURLs(val map[string]map[string]string) Option {
func AppURLs(val *helpers.AppURLs) Option {
return func(o *Options) {
o.AppURLs = val
}

View File

@@ -42,6 +42,10 @@ func NewHandler(opts ...Option) (*Service, func(), error) {
}
}
if options.AppURLs == nil {
return nil, teardown, errors.New("AppURLs option is required")
}
return &Service{
id: options.Config.GRPC.Namespace + "." + options.Config.Service.Name + "." + options.Config.App.Name,
appURLs: options.AppURLs,
@@ -55,7 +59,7 @@ func NewHandler(opts ...Option) (*Service, func(), error) {
// Service implements the OpenInApp interface
type Service struct {
id string
appURLs map[string]map[string]string
appURLs *helpers.AppURLs
logger log.Logger
config *config.Config
gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient]
@@ -173,18 +177,6 @@ func (s *Service) OpenInApp(
}, nil
}
// getAppUrlFor gets the appURL from the list of appURLs based on the
// action and file extension provided. If there is no match, an empty
// string will be returned.
func (s *Service) getAppUrlFor(action, fileExt string) string {
if actionURL, ok := s.appURLs[action]; ok {
if actionExtensionURL, ok := actionURL[fileExt]; ok {
return actionExtensionURL
}
}
return ""
}
// getAppUrl will get the appURL that should be used based on the extension
// and the provided view mode.
// "view" urls will be chosen first, then if the view mode is "read/write",
@@ -192,17 +184,17 @@ func (s *Service) getAppUrlFor(action, fileExt string) string {
// "read/write" view mode if no "edit" url is found.
func (s *Service) getAppUrl(fileExt string, viewMode appproviderv1beta1.ViewMode) string {
// prioritize view action if possible
appURL := s.getAppUrlFor("view", fileExt)
appURL := s.appURLs.GetAppURLFor("view", fileExt)
if strings.ToLower(s.config.App.Product) == "collabora" {
// collabora provides only one action per extension. usual options
// are "view" (checked above), "edit" or "view_comment" (this last one
// is exclusive of collabora)
if appURL == "" {
if editURL := s.getAppUrlFor("edit", fileExt); editURL != "" {
if editURL := s.appURLs.GetAppURLFor("edit", fileExt); editURL != "" {
return editURL
}
if commentURL := s.getAppUrlFor("view_comment", fileExt); commentURL != "" {
if commentURL := s.appURLs.GetAppURLFor("view_comment", fileExt); commentURL != "" {
return commentURL
}
}
@@ -210,7 +202,7 @@ func (s *Service) getAppUrl(fileExt string, viewMode appproviderv1beta1.ViewMode
// If not collabora, there might be an edit action for the extension.
// If read/write mode has been requested, prioritize edit action.
if viewMode == appproviderv1beta1.ViewMode_VIEW_MODE_READ_WRITE {
if editAppURL := s.getAppUrlFor("edit", fileExt); editAppURL != "" {
if editAppURL := s.appURLs.GetAppURLFor("edit", fileExt); editAppURL != "" {
appURL = editAppURL
}
}

View File

@@ -23,6 +23,7 @@ import (
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/collaboration/mocks"
"github.com/opencloud-eu/opencloud/services/collaboration/pkg/config"
"github.com/opencloud-eu/opencloud/services/collaboration/pkg/helpers"
service "github.com/opencloud-eu/opencloud/services/collaboration/pkg/service/grpc/v0"
)
@@ -80,22 +81,25 @@ var _ = Describe("Discovery", func() {
gatewaySelector := mocks.NewSelectable[gatewayv1beta1.GatewayAPIClient](GinkgoT())
gatewaySelector.On("Next").Return(gatewayClient, nil)
appURLs := helpers.NewAppURLs()
appURLs.Store(map[string]map[string]string{
"view": {
".pdf": "https://cloud.opencloud.test/hosting/wopi/word/view",
".djvu": "https://cloud.opencloud.test/hosting/wopi/word/view",
".docx": "https://cloud.opencloud.test/hosting/wopi/word/view",
".xls": "https://cloud.opencloud.test/hosting/wopi/cell/view",
".xlsb": "https://cloud.opencloud.test/hosting/wopi/cell/view",
},
"edit": {
".docx": "https://cloud.opencloud.test/hosting/wopi/word/edit",
".invalid": "://cloud.opencloud.test/hosting/wopi/cell/edit",
},
})
srv, srvTear, _ = service.NewHandler(
service.Logger(log.NopLogger()),
service.Config(cfg),
service.AppURLs(map[string]map[string]string{
"view": {
".pdf": "https://cloud.opencloud.test/hosting/wopi/word/view",
".djvu": "https://cloud.opencloud.test/hosting/wopi/word/view",
".docx": "https://cloud.opencloud.test/hosting/wopi/word/view",
".xls": "https://cloud.opencloud.test/hosting/wopi/cell/view",
".xlsb": "https://cloud.opencloud.test/hosting/wopi/cell/view",
},
"edit": {
".docx": "https://cloud.opencloud.test/hosting/wopi/word/edit",
".invalid": "://cloud.opencloud.test/hosting/wopi/cell/edit",
},
}),
service.AppURLs(appURLs),
service.GatewaySelector(gatewaySelector),
)
})

View File

@@ -49,6 +49,7 @@ type Config struct {
LDAPServerWriteEnabled bool `yaml:"ldap_server_write_enabled" env:"OC_LDAP_SERVER_WRITE_ENABLED;FRONTEND_LDAP_SERVER_WRITE_ENABLED" desc:"Allow creating, modifying and deleting LDAP users via the GRAPH API. This can only be set to 'true' when keeping default settings for the LDAP user and group attribute types (the 'OC_LDAP_USER_SCHEMA_* and 'OC_LDAP_GROUP_SCHEMA_* variables)." introductionVersion:"1.0.0"`
EditLoginAllowedDisabled bool `yaml:"edit_login_allowed_disabled" env:"FRONTEND_EDIT_LOGIN_ALLOWED_DISABLED" desc:"Used to set if login is allowed/forbidden for for User." introductionVersion:"3.4.0"`
FullTextSearch bool `yaml:"full_text_search" env:"FRONTEND_FULL_TEXT_SEARCH_ENABLED" desc:"Set to true to signal the web client that full-text search is enabled." introductionVersion:"1.0.0"`
CheckForUpdates bool `yaml:"check_for_updates" env:"FRONTEND_CHECK_FOR_UPDATES" desc:"Enable automatic checking for updates. Defaults to true." introductionVersion:"3.6.0"`
Middleware Middleware `yaml:"middleware"`

View File

@@ -88,6 +88,7 @@ func DefaultConfig() *config.Config {
DefaultLinkPermissions: 1,
SearchMinLength: 3,
Edition: "",
CheckForUpdates: true,
Checksums: config.Checksums{
SupportedTypes: []string{"sha1", "md5", "adler32"},
PreferredUploadType: "sha1",

View File

@@ -214,6 +214,7 @@ func FrontendConfigFromStruct(cfg *config.Config, logger log.Logger) (map[string
"productversion": version.GetString(),
"hostname": "",
},
"check_for_updates": cfg.CheckForUpdates,
"support_url_signing": true,
"support_sse": !cfg.DisableSSE,
},

View File

@@ -0,0 +1,142 @@
# SOME DESCRIPTIVE TITLE.
# Copyright (C) YEAR THE PACKAGE'S COPYRIGHT HOLDER
# This file is distributed under the same license as the PACKAGE package.
# FIRST AUTHOR <EMAIL@ADDRESS>, YEAR.
#
# Translators:
# LinkinWires <darkinsonic13@gmail.com>, 2025
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2025-09-26 00:01+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: LinkinWires <darkinsonic13@gmail.com>, 2025\n"
"Language-Team: Ukrainian (https://app.transifex.com/opencloud-eu/teams/204053/uk/)\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Language: uk\n"
"Plural-Forms: nplurals=4; plural=(n % 1 == 0 && n % 10 == 1 && n % 100 != 11 ? 0 : n % 1 == 0 && n % 10 >= 2 && n % 10 <= 4 && (n % 100 < 12 || n % 100 > 14) ? 1 : n % 1 == 0 && (n % 10 ==0 || (n % 10 >=5 && n % 10 <=9) || (n % 100 >=11 && n % 100 <=14 )) ? 2: 3);\n"
#. UnifiedRole Editor, Role DisplayName (resolves directly)
#. UnifiedRole EditorListGrants, Role DisplayName (resolves directly)
#. UnifiedRole SpaseEditor, Role DisplayName (resolves directly)
#. UnifiedRole FileEditor, Role DisplayName (resolves directly)
#. UnifiedRole FileEditorListGrants, Role DisplayName (resolves directly)
#: pkg/unifiedrole/roles.go:116 pkg/unifiedrole/roles.go:122
#: pkg/unifiedrole/roles.go:128 pkg/unifiedrole/roles.go:140
#: pkg/unifiedrole/roles.go:146
msgid "Can edit"
msgstr "Може редагувати"
#. UnifiedRole SpaseEditorWithoutVersions, Role DisplayName (resolves
#. directly)
#: pkg/unifiedrole/roles.go:134
msgid "Can edit without versions"
msgstr "Може редагувати (без версій)"
#. UnifiedRole Manager, Role DisplayName (resolves directly)
#: pkg/unifiedrole/roles.go:158
msgid "Can manage"
msgstr "Може керувати"
#. UnifiedRole EditorLite, Role DisplayName (resolves directly)
#: pkg/unifiedrole/roles.go:152
msgid "Can upload"
msgstr "Може вивантажувати"
#. UnifiedRole Viewer, Role DisplayName (resolves directly)
#. UnifiedRole Viewer, Role DisplayName (resolves directly)
#. UnifiedRole SpaseViewer, Role DisplayName (resolves directly)
#: pkg/unifiedrole/roles.go:98 pkg/unifiedrole/roles.go:104
#: pkg/unifiedrole/roles.go:110
msgid "Can view"
msgstr "Може переглядати"
#. UnifiedRole SecureViewer, Role DisplayName (resolves directly)
#: pkg/unifiedrole/roles.go:164
msgid "Can view (secure)"
msgstr "Може переглядати (обмежено)"
#. UnifiedRole FullDenial, Role DisplayName (resolves directly)
#: pkg/unifiedrole/roles.go:170
msgid "Cannot access"
msgstr "Не має доступу"
#. UnifiedRole FullDenial, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:167
msgid "Deny all access."
msgstr "Заборонити будь-який доступ."
#. default description for new spaces
#: pkg/service/v0/spacetemplates.go:32
msgid "Here you can add a description for this Space."
msgstr "Тут можна додати опис до цього простору."
#. UnifiedRole Viewer, Role Description (resolves directly)
#. UnifiedRole SpaceViewer, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:95 pkg/unifiedrole/roles.go:107
msgid "View and download."
msgstr "Може переглядати та завантажувати файли."
#. UnifiedRole SecureViewer, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:161
msgid "View only documents, images and PDFs. Watermarks will be applied."
msgstr ""
"Може переглядати лише документи, зображення та PDF-файли. Будуть застосовані"
" водяні знаки."
#. UnifiedRole FileEditor, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:137
msgid "View, download and edit."
msgstr "Може переглядати, завантажувати та редагувати файли."
#. UnifiedRole ViewerListGrants, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:101
msgid "View, download and show all invited people."
msgstr "Може переглядати, завантажувати та показувати усіх запрошених людей."
#. UnifiedRole EditorLite, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:149
msgid "View, download and upload."
msgstr "Може переглядати, завантажувати та вивантажувати файли."
#. UnifiedRole FileEditorListGrants, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:143
msgid "View, download, edit and show all invited people."
msgstr ""
"Може переглядати, завантажувати, редагувати та показувати усіх запрошених "
"людей."
#. UnifiedRole Editor, Role Description (resolves directly)
#. UnifiedRole SpaseEditorWithoutVersions, Role Description (resolves
#. directly)
#: pkg/unifiedrole/roles.go:113 pkg/unifiedrole/roles.go:131
msgid "View, download, upload, edit, add and delete."
msgstr ""
"Може переглядати, завантажувати, вивантажувати, редагувати, додавати та "
"видаляти файли."
#. UnifiedRole Manager, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:155
msgid "View, download, upload, edit, add, delete and manage members."
msgstr ""
"Може переглядати, завантажувати, вивантажувати, редагувати, додавати та "
"видаляти файли, а також може керувати учасниками."
#. UnifiedRoleListGrants Editor, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:119
msgid "View, download, upload, edit, add, delete and show all invited people."
msgstr ""
"Може переглядати, завантажувати, редагувати, додавати, видаляти та "
"показувати усіх запрошених людей."
#. UnifiedRole SpaseEditor, Role Description (resolves directly)
#: pkg/unifiedrole/roles.go:125
msgid "View, download, upload, edit, add, delete including the history."
msgstr ""
"Може переглядати, завантажувати, вивантажувати, редагувати, додавати та "
"видаляти файли, у тому числі їх історію."

View File

@@ -272,6 +272,12 @@ func DefaultPolicies() []config.Policy {
Endpoint: "/auth-app/tokens",
Service: "eu.opencloud.web.auth-app",
},
{
Endpoint: "/wopi",
Service: "eu.opencloud.web.collaboration.Collabora",
Unprotected: true,
SkipXAccessToken: true,
},
},
},
}

View File

@@ -0,0 +1,146 @@
# SOME DESCRIPTIVE TITLE.
# Copyright (C) YEAR THE PACKAGE'S COPYRIGHT HOLDER
# This file is distributed under the same license as the PACKAGE package.
# FIRST AUTHOR <EMAIL@ADDRESS>, YEAR.
#
# Translators:
# LinkinWires <darkinsonic13@gmail.com>, 2025
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2025-09-26 00:01+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: LinkinWires <darkinsonic13@gmail.com>, 2025\n"
"Language-Team: Ukrainian (https://app.transifex.com/opencloud-eu/teams/204053/uk/)\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Language: uk\n"
"Plural-Forms: nplurals=4; plural=(n % 1 == 0 && n % 10 == 1 && n % 100 != 11 ? 0 : n % 1 == 0 && n % 10 >= 2 && n % 10 <= 4 && (n % 100 < 12 || n % 100 > 14) ? 1 : n % 1 == 0 && (n % 10 ==0 || (n % 10 >=5 && n % 10 <=9) || (n % 100 >=11 && n % 100 <=14 )) ? 2: 3);\n"
#. name of the notification option 'Space Shared'
#: pkg/store/defaults/templates.go:20
msgid "Added as space member"
msgstr "Додавання до простору у якості учасника (-ці)"
#. translation for the 'daily' email interval option
#: pkg/store/defaults/templates.go:50
msgid "Daily"
msgstr "Щоденно"
#. name of the notification option 'Email Interval'
#: pkg/store/defaults/templates.go:44
msgid "Email sending interval"
msgstr "Інтервал надсилання електронних листів"
#. name of the notification option 'File Rejected'
#: pkg/store/defaults/templates.go:40
msgid "File rejected"
msgstr "Відхилення файлу"
#. translation for the 'instant' email interval option
#: pkg/store/defaults/templates.go:48
msgid "Instant"
msgstr "Миттєво"
#. translation for the 'never' email interval option
#: pkg/store/defaults/templates.go:54
msgid "Never"
msgstr "Ніколи"
#. description of the notification option 'Space Shared'
#: pkg/store/defaults/templates.go:22
msgid "Notify when I have been added as a member to a space"
msgstr "Сповіщати, коли мене додають як учасника (-цю) до простору"
#. description of the notification option 'Space Unshared'
#: pkg/store/defaults/templates.go:26
msgid "Notify when I have been removed as member from a space"
msgstr "Сповіщати, коли мене видаляють з учасників простору"
#. description of the notification option 'Share Received'
#: pkg/store/defaults/templates.go:10
msgid "Notify when I have received a share"
msgstr "Сповіщати, коли зі мною поділилися файлом або папкою"
#. description of the notification option 'File Rejected'
#: pkg/store/defaults/templates.go:42
msgid ""
"Notify when a file I uploaded was rejected because of a virus infection or "
"policy violation"
msgstr ""
"Сповіщати, коли завантажений мною файл було відхилено через знайдений вірус "
"або порушення політик"
#. description of the notification option 'Share Removed'
#: pkg/store/defaults/templates.go:14
msgid "Notify when a received share has been removed"
msgstr "Сповіщати, коли зі мною більше не діляться файлом або папкою"
#. description of the notification option 'Share Expired'
#: pkg/store/defaults/templates.go:18
msgid "Notify when a received share has expired"
msgstr "Сповіщати, коли термін доступу до файлу або папки сплинув"
#. description of the notification option 'Space Deleted'
#: pkg/store/defaults/templates.go:38
msgid "Notify when a space I am member of has been deleted"
msgstr "Сповіщати, коли простір, учасником (-цею) якого я є, був видалений"
#. description of the notification option 'Space Disabled'
#: pkg/store/defaults/templates.go:34
msgid "Notify when a space I am member of has been disabled"
msgstr "Сповіщати, коли простір, учасником (-цею) якого я є, був вимкнений"
#. description of the notification option 'Space Membership Expired'
#: pkg/store/defaults/templates.go:30
msgid "Notify when a space membership has expired"
msgstr "Сповіщати, коли термін доступу до простору сплинув"
#. name of the notification option 'Space Unshared'
#: pkg/store/defaults/templates.go:24
msgid "Removed as space member"
msgstr "Видалення з простору"
#. description of the notification option 'Email Interval'
#: pkg/store/defaults/templates.go:46
msgid "Selected value:"
msgstr "Обране значення:"
#. name of the notification option 'Share Expired'
#: pkg/store/defaults/templates.go:16
msgid "Share Expired"
msgstr "Сплинення доступу до файлу / папки"
#. name of the notification option 'Share Received'
#: pkg/store/defaults/templates.go:8
msgid "Share Received"
msgstr "Надання доступу"
#. name of the notification option 'Share Removed'
#: pkg/store/defaults/templates.go:12
msgid "Share Removed"
msgstr "Видалення доступу"
#. name of the notification option 'Space Deleted'
#: pkg/store/defaults/templates.go:36
msgid "Space deleted"
msgstr "Видалення простору"
#. name of the notification option 'Space Disabled'
#: pkg/store/defaults/templates.go:32
msgid "Space disabled"
msgstr "Вимкнення простору"
#. name of the notification option 'Space Membership Expired'
#: pkg/store/defaults/templates.go:28
msgid "Space membership expired"
msgstr "Сплинення доступу до простору"
#. translation for the 'weekly' email interval option
#: pkg/store/defaults/templates.go:52
msgid "Weekly"
msgstr "Щотижнево"

View File

@@ -21,6 +21,9 @@ _testmain.go
*.exe
# Git backup files
*.orig
# Emacs
*~
\#*\#

View File

@@ -23,7 +23,7 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io
go get github.com/nats-io/nats.go@latest
# To get a specific version:
go get github.com/nats-io/nats.go@v1.45.0
go get github.com/nats-io/nats.go@v1.46.0
# Note that the latest major version for NATS Server is v2:
go get github.com/nats-io/nats-server/v2@latest

View File

@@ -1,22 +1,22 @@
module github.com/nats-io/nats.go
go 1.23.0
go 1.24.0
require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.18.0
github.com/nats-io/jwt v1.2.2
github.com/nats-io/nats-server/v2 v2.11.2
github.com/nats-io/jwt/v2 v2.8.0
github.com/nats-io/nats-server/v2 v2.12.0
github.com/nats-io/nkeys v0.4.11
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
)
require (
github.com/google/go-tpm v0.9.3 // indirect
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op // indirect
github.com/google/go-tpm v0.9.5 // indirect
github.com/minio/highwayhash v1.0.3 // indirect
github.com/nats-io/jwt/v2 v2.7.4 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/time v0.11.0 // indirect
golang.org/x/crypto v0.42.0 // indirect
golang.org/x/sys v0.36.0 // indirect
golang.org/x/time v0.13.0 // indirect
)

View File

@@ -12,36 +12,27 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-tpm v0.9.3 h1:+yx0/anQuGzi+ssRqeD6WpXjW2L/V0dItUayO0i9sRc=
github.com/google/go-tpm v0.9.3/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/go-tpm v0.9.5 h1:ocUmnDebX54dnW+MQWGQRbdaAcJELsa6PqZhJ48KwVU=
github.com/google/go-tpm v0.9.5/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.11.2 h1:k5KBAuRpJW9qAF11Io2txNhR5m1KUmqVkalLAw2yLfk=
github.com/nats-io/nats-server/v2 v2.11.2/go.mod h1:6Z6Fd+JgckqzKig7DYwhgrE7bJ6fypPHnGPND+DqgMY=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.12.0 h1:OIwe8jZUqJFrh+hhiyKu8snNib66qsx806OslqJuo74=
github.com/nats-io/nats-server/v2 v2.12.0/go.mod h1:nr8dhzqkP5E/lDwmn+A2CvQPMd1yDKXQI7iGg3lAvww=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0=
golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/time v0.13.0 h1:eUlYslOIt32DgYD6utsuUeHs4d7AsEYLuIAdg7FlYgI=
golang.org/x/time v0.13.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=

View File

@@ -314,6 +314,10 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
if resp.Error.ErrorCode == JSErrCodeMaximumConsumersLimit {
return nil, ErrMaximumConsumersLimit
}
return nil, resp.Error
}

View File

@@ -325,6 +325,12 @@ type (
// associating metadata on the consumer. This feature requires
// nats-server v2.10.0 or later.
Metadata map[string]string `json:"metadata,omitempty"`
// NamePrefix is an optional custom prefix for the consumer name.
// If provided, ordered consumer names will be generated as:
// {NamePrefix}_{sequence_number} (e.g., "custom_1", "custom_2").
// If not provided, a unique ID (NUID) will be used as the prefix.
NamePrefix string `json:"-"`
}
// DeliverPolicy determines from which point to start delivering messages.
@@ -362,6 +368,11 @@ const (
// restricting when a consumer will receive messages based on the number of
// pending messages or acks.
PriorityPolicyOverflow
// PriorityPolicyPrioritized is the priority policy that allows for the
// server to deliver messages to clients based on their priority (instead
// of round-robin). Requires nats-server v2.12.0 or later.
PriorityPolicyPrioritized
)
func (p *PriorityPolicy) UnmarshalJSON(data []byte) error {
@@ -372,6 +383,8 @@ func (p *PriorityPolicy) UnmarshalJSON(data []byte) error {
*p = PriorityPolicyPinned
case jsonString("overflow"):
*p = PriorityPolicyOverflow
case jsonString("prioritized"):
*p = PriorityPolicyPrioritized
default:
return fmt.Errorf("nats: can not unmarshal %q", data)
}
@@ -386,6 +399,8 @@ func (p PriorityPolicy) MarshalJSON() ([]byte, error) {
return json.Marshal("pinned_client")
case PriorityPolicyOverflow:
return json.Marshal("overflow")
case PriorityPolicyPrioritized:
return json.Marshal("prioritized")
}
return nil, fmt.Errorf("nats: unknown priority policy %v", p)
}

View File

@@ -43,27 +43,28 @@ type (
)
const (
JSErrCodeBadRequest ErrorCode = 10003
JSErrCodeConsumerCreate ErrorCode = 10012
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeMaximumConsumersLimit ErrorCode = 10026
JSErrCodeMessageNotFound ErrorCode = 10037
JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039
JSErrCodeJetStreamNotEnabled ErrorCode = 10076
JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamNameInUse ErrorCode = 10058
JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamWrongLastSequence ErrorCode = 10071
JSErrCodeJetStreamNotEnabled ErrorCode = 10076
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeConsumerCreate ErrorCode = 10012
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeConsumerExists ErrorCode = 10148
JSErrCodeDuplicateFilterSubjects ErrorCode = 10136
JSErrCodeOverlappingFilterSubjects ErrorCode = 10138
JSErrCodeConsumerEmptyFilter ErrorCode = 10139
JSErrCodeConsumerExists ErrorCode = 10148
JSErrCodeConsumerDoesNotExist ErrorCode = 10149
JSErrCodeMessageNotFound ErrorCode = 10037
JSErrCodeBadRequest ErrorCode = 10003
JSErrCodeStreamWrongLastSequence ErrorCode = 10071
)
var (
@@ -142,6 +143,10 @@ var (
// creating consumer (e.g. illegal update).
ErrConsumerCreate JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerCreate, Description: "could not create consumer", Code: 500}}
// ErrMaximumConsumersLimit is returned when user limit of allowed
// consumers for stream is reached
ErrMaximumConsumersLimit JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMaximumConsumersLimit, Description: "maximum consumers limit reached", Code: 400}}
// ErrDuplicateFilterSubjects is returned when both FilterSubject and
// FilterSubjects are specified when creating consumer.
ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}}

View File

@@ -864,11 +864,15 @@ func (js *jetStream) OrderedConsumer(ctx context.Context, stream string, cfg Ord
if err := validateStreamName(stream); err != nil {
return nil, err
}
namePrefix := cfg.NamePrefix
if namePrefix == "" {
namePrefix = nuid.Next()
}
oc := &orderedConsumer{
js: js,
cfg: &cfg,
stream: stream,
namePrefix: nuid.Next(),
namePrefix: namePrefix,
doReset: make(chan struct{}, 1),
}
consCfg := oc.getConsumerConfig()

View File

@@ -14,6 +14,7 @@
package jetstream
import (
"context"
"fmt"
"time"
)
@@ -347,6 +348,26 @@ func (min PullMinAckPending) configureMessages(opts *consumeOpts) error {
return nil
}
// PullPrioritized sets the priority used when sending pull requests for consumer with
// PriorityPolicyPrioritized. Lower values indicate higher priority (0 is the
// highest priority). Maximum priority value is 9.
//
// If provided, PullPriorityGroup must be set as well and the consumer has to
// have PriorityPolicy set to PriorityPolicyPrioritized.
//
// PullPrioritized implements both PullConsumeOpt and PullMessagesOpt, allowing
// it to configure Consumer.Consume and Consumer.Messages.
type PullPrioritized uint8
func (p PullPrioritized) configureConsume(opts *consumeOpts) error {
opts.Priority = uint8(p)
return nil
}
func (p PullPrioritized) configureMessages(opts *consumeOpts) error {
opts.Priority = uint8(p)
return nil
}
// PullPriorityGroup sets the priority group for a consumer.
// It has to match one of the priority groups set on the consumer.
//
@@ -468,6 +489,19 @@ func FetchMinAckPending(min int64) FetchOpt {
}
}
// FetchPrioritized sets the priority used when sending fetch requests for consumer with
// PriorityPolicyPrioritized. Lower values indicate higher priority (0 is the
// highest priority). Maximum priority value is 9.
//
// If provided, FetchPriorityGroup must be set as well and the consumer has to
// have PriorityPolicy set to PriorityPolicyPrioritized.
func FetchPrioritized(priority uint8) FetchOpt {
return func(req *pullRequest) error {
req.Priority = priority
return nil
}
}
// FetchPriorityGroup sets the priority group for a consumer.
// It has to match one of the priority groups set on the consumer.
func FetchPriorityGroup(group string) FetchOpt {
@@ -486,6 +520,7 @@ func FetchMaxWait(timeout time.Duration) FetchOpt {
return fmt.Errorf("%w: timeout value must be greater than 0", ErrInvalidOption)
}
req.Expires = timeout
req.maxWaitSet = true
return nil
}
}
@@ -508,6 +543,31 @@ func FetchHeartbeat(hb time.Duration) FetchOpt {
}
}
// FetchContext sets a context for the Fetch operation.
// The Fetch operation will be canceled if the context is canceled.
// If the context has a deadline, it will be used to set expiry on pull request.
func FetchContext(ctx context.Context) FetchOpt {
return func(req *pullRequest) error {
req.ctx = ctx
// If context has a deadline, use it to set expiry
if deadline, ok := ctx.Deadline(); ok {
remaining := time.Until(deadline)
if remaining <= 0 {
return fmt.Errorf("%w: context deadline already exceeded", ErrInvalidOption)
}
// Use 90% of remaining time for server (capped at 1s)
buffer := time.Duration(float64(remaining) * 0.1)
if buffer > time.Second {
buffer = time.Second
}
req.Expires = remaining - buffer
}
return nil
}
}
// WithDeletedDetails can be used to display the information about messages
// deleted from a stream on a stream info request
func WithDeletedDetails(deletedDetails bool) StreamInfoOpt {
@@ -648,3 +708,25 @@ func WithStallWait(ttl time.Duration) PublishOpt {
return nil
}
}
type nextOptFunc func(*nextOpts)
func (fn nextOptFunc) configureNext(opts *nextOpts) {
fn(opts)
}
// NextMaxWait sets a timeout for the Next operation.
// If the timeout is reached before a message is available, a timeout error is returned.
func NextMaxWait(timeout time.Duration) NextOpt {
return nextOptFunc(func(opts *nextOpts) {
opts.timeout = timeout
})
}
// NextContext sets a context for the Next operation.
// The Next operation will be canceled if the context is canceled.
func NextContext(ctx context.Context) NextOpt {
return nextOptFunc(func(opts *nextOpts) {
opts.ctx = ctx
})
}

View File

@@ -256,7 +256,11 @@ type (
// removed by the TTL setting.
// It is required for per-key TTL to work and for watcher to notify
// about TTL expirations (both per key and per bucket)
LimitMarkerTTL time.Duration
LimitMarkerTTL time.Duration `json:"limit_marker_ttl,omitempty"`
// Metadata is a set of application-defined key-value pairs that can be
// used to store arbitrary metadata about the bucket.
Metadata map[string]string `json:"metadata,omitempty"`
}
// KeyLister is used to retrieve a list of key value store keys. It returns
@@ -316,6 +320,9 @@ type (
// LimitMarkerTTL is how long the bucket keeps markers when keys are
// removed by the TTL setting, 0 meaning markers are not supported.
LimitMarkerTTL() time.Duration
// Metadata returns the metadata associated with the bucket.
Metadata() map[string]string
}
// KeyWatcher is what is returned when doing a watch. It can be used to
@@ -667,6 +674,7 @@ func (js *jetStream) prepareKeyValueConfig(ctx context.Context, cfg KeyValueConf
Discard: DiscardNew,
AllowMsgTTL: allowMsgTTL,
SubjectDeleteMarkerTTL: subjectDeleteMarkerTTL,
Metadata: cfg.Metadata,
}
if cfg.Mirror != nil {
// Copy in case we need to make changes so we do not change caller's version.
@@ -813,6 +821,11 @@ func (s *KeyValueBucketStatus) LimitMarkerTTL() time.Duration {
return s.info.Config.SubjectDeleteMarkerTTL
}
// Metadata returns the metadata associated with the bucket.
func (s *KeyValueBucketStatus) Metadata() map[string]string {
return s.info.Config.Metadata
}
type kvLister struct {
kvs chan KeyValueStatus
kvNames chan string

View File

@@ -282,10 +282,21 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er
return sub, nil
}
func (s *orderedSubscription) Next() (Msg, error) {
func (s *orderedSubscription) Next(opts ...NextOpt) (Msg, error) {
for {
msg, err := s.consumer.currentSub.Next()
msg, err := s.consumer.currentSub.Next(opts...)
if err != nil {
// Check for errors which should be returned directly
// without resetting the consumer
if errors.Is(err, ErrInvalidOption) {
return nil, err
}
if errors.Is(err, nats.ErrTimeout) {
return nil, err
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return nil, err
}
if errors.Is(err, ErrMsgIteratorClosed) {
s.Stop()
return nil, err

View File

@@ -130,6 +130,8 @@ type (
// Domain is the domain the message was published to.
Domain string `json:"domain,omitempty"`
Value string `json:"val,omitempty"`
}
)

View File

@@ -14,6 +14,7 @@
package jetstream
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -34,8 +35,11 @@ type (
MessagesContext interface {
// Next retrieves next message on a stream. It will block until the next
// message is available. If the context is canceled, Next will return
// ErrMsgIteratorClosed error.
Next() (Msg, error)
// ErrMsgIteratorClosed error. An optional timeout or context can be
// provided using NextOpt options. If none are provided, Next will block
// indefinitely until a message is available, iterator is closed or a
// heartbeat error occurs.
Next(opts ...NextOpt) (Msg, error)
// Stop unsubscribes from the stream and cancels subscription. Calling
// Next after calling Stop will return ErrMsgIteratorClosed error.
@@ -92,15 +96,18 @@ type (
}
pullRequest struct {
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
MaxBytes int `json:"max_bytes,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
MinPending int64 `json:"min_pending,omitempty"`
MinAckPending int64 `json:"min_ack_pending,omitempty"`
PinID string `json:"id,omitempty"`
Group string `json:"group,omitempty"`
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
MaxBytes int `json:"max_bytes,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
MinPending int64 `json:"min_pending,omitempty"`
MinAckPending int64 `json:"min_ack_pending,omitempty"`
PinID string `json:"id,omitempty"`
Group string `json:"group,omitempty"`
Priority uint8 `json:"priority,omitempty"`
ctx context.Context `json:"-"`
maxWaitSet bool `json:"-"`
}
consumeOpts struct {
@@ -110,6 +117,7 @@ type (
LimitSize bool
MinPending int64
MinAckPending int64
Priority uint8
Group string
Heartbeat time.Duration
ErrHandler ConsumeErrHandler
@@ -167,6 +175,16 @@ type (
timer *time.Timer
sync.Mutex
}
// NextOpt is an option for configuring the behavior of MessagesContext.Next.
NextOpt interface {
configureNext(*nextOpts)
}
nextOpts struct {
timeout time.Duration
ctx context.Context
}
)
const (
@@ -314,6 +332,7 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
Heartbeat: consumeOpts.Heartbeat,
MinPending: consumeOpts.MinPending,
MinAckPending: consumeOpts.MinAckPending,
Priority: consumeOpts.Priority,
Group: consumeOpts.Group,
PinID: p.getPinID(),
}, subject); err != nil {
@@ -353,6 +372,7 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
Heartbeat: sub.consumeOpts.Heartbeat,
MinPending: sub.consumeOpts.MinPending,
MinAckPending: sub.consumeOpts.MinAckPending,
Priority: sub.consumeOpts.Priority,
Group: sub.consumeOpts.Group,
PinID: p.getPinID(),
}
@@ -383,6 +403,7 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
Heartbeat: sub.consumeOpts.Heartbeat,
MinPending: sub.consumeOpts.MinPending,
MinAckPending: sub.consumeOpts.MinAckPending,
Priority: sub.consumeOpts.Priority,
Group: sub.consumeOpts.Group,
PinID: p.getPinID(),
}
@@ -468,6 +489,7 @@ func (s *pullSubscription) checkPending() {
Group: s.consumeOpts.Group,
MinPending: s.consumeOpts.MinPending,
MinAckPending: s.consumeOpts.MinAckPending,
Priority: s.consumeOpts.Priority,
}
s.pending.msgCount = s.consumeOpts.MaxMessages
@@ -569,7 +591,30 @@ var (
// Next retrieves next message on a stream. It will block until the next
// message is available. If the context is canceled, Next will return
// ErrMsgIteratorClosed error.
func (s *pullSubscription) Next() (Msg, error) {
func (s *pullSubscription) Next(opts ...NextOpt) (Msg, error) {
var nextOpts nextOpts
for _, opt := range opts {
opt.configureNext(&nextOpts)
}
if nextOpts.timeout > 0 && nextOpts.ctx != nil {
return nil, fmt.Errorf("%w: cannot specify both NextMaxWait and NextContext", ErrInvalidOption)
}
// Create timeout channel if needed
var timeoutCh <-chan time.Time
if nextOpts.timeout > 0 {
timer := time.NewTimer(nextOpts.timeout)
defer timer.Stop()
timeoutCh = timer.C
}
// Use context if provided
var ctxDone <-chan struct{}
if nextOpts.ctx != nil {
ctxDone = nextOpts.ctx.Done()
}
s.Lock()
defer s.Unlock()
drainMode := s.draining.Load() == 1
@@ -660,6 +705,10 @@ func (s *pullSubscription) Next() (Msg, error) {
}
isConnected = false
}
case <-timeoutCh:
return nil, nats.ErrTimeout
case <-ctxDone:
return nil, nextOpts.ctx.Err()
}
}
}
@@ -779,6 +828,11 @@ func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error)
return nil, err
}
}
if req.ctx != nil && req.maxWaitSet {
return nil, fmt.Errorf("%w: cannot specify both FetchContext and FetchMaxWait", ErrInvalidOption)
}
// if heartbeat was not explicitly set, set it to 5 seconds for longer pulls
// and disable it for shorter pulls
if req.Heartbeat == unset {
@@ -808,6 +862,11 @@ func (p *pullConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch,
return nil, err
}
}
if req.ctx != nil && req.maxWaitSet {
return nil, fmt.Errorf("%w: cannot specify both FetchContext and FetchMaxWait", ErrInvalidOption)
}
// if heartbeat was not explicitly set, set it to 5 seconds for longer pulls
// and disable it for shorter pulls
if req.Heartbeat == unset {
@@ -862,6 +921,13 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) {
var receivedMsgs, receivedBytes int
hbTimer := sub.scheduleHeartbeatCheck(req.Heartbeat)
// Use context if provided
var ctxDone <-chan struct{}
if req.ctx != nil {
ctxDone = req.ctx.Done()
}
go func(res *fetchResult) {
defer sub.subscription.Unsubscribe()
defer close(res.msgs)
@@ -922,6 +988,12 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) {
res.done = true
res.Unlock()
return
case <-ctxDone:
res.Lock()
res.err = req.ctx.Err()
res.done = true
res.Unlock()
return
}
}
}(res)

View File

@@ -349,11 +349,15 @@ func (s *stream) UpdatePushConsumer(ctx context.Context, cfg ConsumerConfig) (Pu
// messages from a stream. Ordered consumers are ephemeral in-memory
// pull consumers and are resilient to deletes and restarts.
func (s *stream) OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig) (Consumer, error) {
namePrefix := cfg.NamePrefix
if namePrefix == "" {
namePrefix = nuid.Next()
}
oc := &orderedConsumer{
js: s.js,
cfg: &cfg,
stream: s.name,
namePrefix: nuid.Next(),
namePrefix: namePrefix,
doReset: make(chan struct{}, 1),
}
consCfg := oc.getConsumerConfig()
@@ -528,6 +532,7 @@ func (s *stream) getMsg(ctx context.Context, mreq *apiMsgGetRequest) (*RawStream
if err != nil {
return nil, err
}
var gmSubj string
// handle direct gets
@@ -598,6 +603,7 @@ func convertDirectGetMsgResponseToMsg(r *nats.Msg) (*RawStreamMsg, error) {
}
}
}
// Check for headers that give us the required information to
// reconstruct the message.
if len(r.Header) == 0 {

View File

@@ -201,6 +201,18 @@ type (
// Enables and sets a duration for adding server markers for delete, purge and max age limits.
// This feature requires nats-server v2.11.0 or later.
SubjectDeleteMarkerTTL time.Duration `json:"subject_delete_marker_ttl,omitempty"`
// AllowMsgCounter enables the feature
AllowMsgCounter bool `json:"allow_msg_counter"`
// AllowAtomicPublish allows atomic batch publishing into the stream.
AllowAtomicPublish bool `json:"allow_atomic,omitempty"`
// AllowMsgSchedules enables the scheduling of messages
AllowMsgSchedules bool `json:"allow_msg_schedules,omitempty"`
// PersistMode allows to opt-in to different persistence mode settings.
PersistMode PersistModeType `json:"persist_mode,omitempty"`
}
// StreamSourceInfo shows information about an upstream stream
@@ -276,10 +288,25 @@ type (
// Name is the name of the cluster.
Name string `json:"name,omitempty"`
// RaftGroup is the name of the Raft group managing the asset (in
// clustered environments).
RaftGroup string `json:"raft_group,omitempty"`
// Leader is the server name of the RAFT leader.
Leader string `json:"leader,omitempty"`
// Replicas is the list of members of the RAFT cluster
// LeaderSince is the time that it was elected as leader in RFC3339
// format, absent when not the leader.
LeaderSince *time.Time `json:"leader_since,omitempty"`
// SystemAcc indicates if the traffic_account is the system account.
// When true, replication traffic goes over the system account.
SystemAcc bool `json:"system_account,omitempty"`
// TrafficAcc is the account where the replication traffic goes over.
TrafficAcc string `json:"traffic_account,omitempty"`
// Replicas is the list of members of the RAFT cluster.
Replicas []*PeerInfo `json:"replicas,omitempty"`
}
@@ -407,6 +434,9 @@ type (
// StoreCompression determines how messages are compressed.
StoreCompression uint8
// PersistModeType determines what persistence mode the stream uses.
PersistModeType int
)
const (
@@ -438,6 +468,16 @@ const (
workQueuePolicyString = "workqueue"
)
const (
// DefaultPersistMode specifies the default persist mode. Writes to the stream will immediately be flushed.
// The publish acknowledgement will be sent after the persisting completes.
DefaultPersistMode = PersistModeType(iota)
// AsyncPersistMode specifies writes to the stream will be flushed asynchronously.
// The publish acknowledgement may be sent before the persisting completes.
// This means writes could be lost if they weren't flushed prior to a hard kill of the server.
AsyncPersistMode
)
func (rp RetentionPolicy) String() string {
switch rp {
case LimitsPolicy:
@@ -512,6 +552,40 @@ func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error {
return nil
}
func (pm PersistModeType) String() string {
switch pm {
case DefaultPersistMode:
return "Default"
case AsyncPersistMode:
return "Async"
default:
return "Unknown Persist Mode"
}
}
func (pm PersistModeType) MarshalJSON() ([]byte, error) {
switch pm {
case DefaultPersistMode:
return json.Marshal("default")
case AsyncPersistMode:
return json.Marshal("async")
default:
return nil, fmt.Errorf("nats: can not marshal %v", pm)
}
}
func (pm *PersistModeType) UnmarshalJSON(data []byte) error {
switch strings.ToLower(string(data)) {
case jsonString("default"):
*pm = DefaultPersistMode
case jsonString("async"):
*pm = AsyncPersistMode
default:
return fmt.Errorf("nats: can not unmarshal %q", data)
}
return nil
}
const (
// FileStorage specifies on disk storage. It's the default.
FileStorage StorageType = iota

View File

@@ -2836,6 +2836,10 @@ func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
if sub.jsi == nil || sub.jsi.consumer == _EMPTY_ {
if sub.jsi.ordered {
sub.mu.Unlock()
return nil, ErrConsumerInfoOnOrderedReset
}
sub.mu.Unlock()
return nil, ErrTypeSubscription
}

View File

@@ -150,6 +150,9 @@ var (
// ErrConsumerLeadershipChanged is returned when pending requests are no longer valid after leadership has changed
ErrConsumerLeadershipChanged JetStreamError = &jsError{message: "Leadership Changed"}
// ErrConsumerInfoOnOrderedReset is returned when attempting to fetch consumer info for an ordered consumer that is currently being recreated.
ErrConsumerInfoOnOrderedReset JetStreamError = &jsError{message: "cannot fetch consumer info; ordered consumer is being reset"}
// ErrNoHeartbeat is returned when no heartbeat is received from server when sending requests with pull consumer.
ErrNoHeartbeat JetStreamError = &jsError{message: "no heartbeat received"}

View File

@@ -1093,9 +1093,13 @@ type StreamState struct {
// ClusterInfo shows information about the underlying set of servers
// that make up the stream or consumer.
type ClusterInfo struct {
Name string `json:"name,omitempty"`
Leader string `json:"leader,omitempty"`
Replicas []*PeerInfo `json:"replicas,omitempty"`
Name string `json:"name,omitempty"`
RaftGroup string `json:"raft_group,omitempty"`
Leader string `json:"leader,omitempty"`
LeaderSince *time.Time `json:"leader_since,omitempty"`
SystemAcc bool `json:"system_account,omitempty"`
TrafficAcc string `json:"traffic_account,omitempty"`
Replicas []*PeerInfo `json:"replicas,omitempty"`
}
// PeerInfo shows information about all the peers in the cluster that

View File

@@ -48,7 +48,7 @@ import (
// Default Constants
const (
Version = "1.45.0"
Version = "1.46.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60

View File

@@ -106,6 +106,7 @@ type CapabilitiesCore struct {
PollInterval int `json:"pollinterval" xml:"pollinterval" mapstructure:"poll_interval"`
WebdavRoot string `json:"webdav-root,omitempty" xml:"webdav-root,omitempty" mapstructure:"webdav_root"`
Status *Status `json:"status" xml:"status"`
CheckForUpdates ocsBool `json:"check-for-updates" xml:"check-for-updates" mapstructure:"check_for_updates"`
SupportURLSigning ocsBool `json:"support-url-signing" xml:"support-url-signing" mapstructure:"support_url_signing"`
SupportSSE ocsBool `json:"support-sse" xml:"support-sse" mapstructure:"support_sse"`
}

View File

@@ -21,6 +21,7 @@ package tree
import (
"context"
"crypto/sha256"
"fmt"
"io"
"io/fs"
@@ -393,6 +394,17 @@ func (t *Tree) findSpaceId(path string) (string, error) {
return "", fmt.Errorf("could not find space for path %s", path)
}
func (t *Tree) generateTempNodeId(path string) string {
if len(path) < 240 {
return strings.ReplaceAll(strings.TrimPrefix(path, "/"), "/", "-")
} else {
// Use sha256 if path too long
pathHash := fmt.Sprintf("%x", sha256.Sum256([]byte(path)))[:240]
t.log.Info().Str("path", path).Msg("path too long, using sha256 as lock: " + pathHash)
return pathHash
}
}
func (t *Tree) assimilate(item scanItem) error {
t.log.Debug().Str("path", item.Path).Bool("recurse", item.Recurse).Msg("assimilate")
var err error
@@ -532,7 +544,8 @@ func (t *Tree) assimilate(item scanItem) error {
assimilationNode := &assimilationNode{
spaceID: spaceID,
// Use the path as the node ID (which is used for calculating the lock file path) since we do not have an ID yet
nodeId: strings.ReplaceAll(strings.TrimPrefix(item.Path, "/"), "/", "-"),
// In the case path is too long, we do sha256 and extract first 240 characters
nodeId: t.generateTempNodeId(item.Path),
}
unlock, err := t.lookup.MetadataBackend().Lock(assimilationNode)
if err != nil {

View File

@@ -6,7 +6,7 @@
// cancellation signals, and other request-scoped values across API boundaries
// and between processes.
// As of Go 1.7 this package is available in the standard library under the
// name [context], and migrating to it can be done automatically with [go fix].
// name [context].
//
// Incoming requests to a server should create a [Context], and outgoing
// calls to servers should accept a Context. The chain of function
@@ -38,8 +38,6 @@
//
// See https://go.dev/blog/context for example code for a server that uses
// Contexts.
//
// [go fix]: https://go.dev/cmd/go#hdr-Update_packages_to_use_new_APIs
package context
import (
@@ -51,36 +49,37 @@ import (
// API boundaries.
//
// Context's methods may be called by multiple goroutines simultaneously.
//
//go:fix inline
type Context = context.Context
// Canceled is the error returned by [Context.Err] when the context is canceled
// for some reason other than its deadline passing.
//
//go:fix inline
var Canceled = context.Canceled
// DeadlineExceeded is the error returned by [Context.Err] when the context is canceled
// due to its deadline passing.
//
//go:fix inline
var DeadlineExceeded = context.DeadlineExceeded
// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
return background
}
//
//go:fix inline
func Background() Context { return context.Background() }
// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter).
func TODO() Context {
return todo
}
var (
background = context.Background()
todo = context.TODO()
)
//
//go:fix inline
func TODO() Context { return context.TODO() }
// A CancelFunc tells an operation to abandon its work.
// A CancelFunc does not wait for the work to stop.
@@ -95,6 +94,8 @@ type CancelFunc = context.CancelFunc
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this [Context] complete.
//
//go:fix inline
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
return context.WithCancel(parent)
}
@@ -108,6 +109,8 @@ func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this [Context] complete.
//
//go:fix inline
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
return context.WithDeadline(parent, d)
}
@@ -122,6 +125,8 @@ func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
// defer cancel() // releases resources if slowOperation completes before timeout elapses
// return slowOperation(ctx)
// }
//
//go:fix inline
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return context.WithTimeout(parent, timeout)
}
@@ -139,6 +144,8 @@ func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
// interface{}, context keys often have concrete type
// struct{}. Alternatively, exported context key variables' static
// type should be a pointer or interface.
//
//go:fix inline
func WithValue(parent Context, key, val interface{}) Context {
return context.WithValue(parent, key, val)
}

View File

@@ -55,7 +55,7 @@ func configFromServer(h1 *http.Server, h2 *Server) http2Config {
PermitProhibitedCipherSuites: h2.PermitProhibitedCipherSuites,
CountError: h2.CountError,
}
fillNetHTTPServerConfig(&conf, h1)
fillNetHTTPConfig(&conf, h1.HTTP2)
setConfigDefaults(&conf, true)
return conf
}
@@ -81,7 +81,7 @@ func configFromTransport(h2 *Transport) http2Config {
}
if h2.t1 != nil {
fillNetHTTPTransportConfig(&conf, h2.t1)
fillNetHTTPConfig(&conf, h2.t1.HTTP2)
}
setConfigDefaults(&conf, false)
return conf
@@ -120,3 +120,45 @@ func adjustHTTP1MaxHeaderSize(n int64) int64 {
const typicalHeaders = 10 // conservative
return n + typicalHeaders*perFieldOverhead
}
func fillNetHTTPConfig(conf *http2Config, h2 *http.HTTP2Config) {
if h2 == nil {
return
}
if h2.MaxConcurrentStreams != 0 {
conf.MaxConcurrentStreams = uint32(h2.MaxConcurrentStreams)
}
if h2.MaxEncoderHeaderTableSize != 0 {
conf.MaxEncoderHeaderTableSize = uint32(h2.MaxEncoderHeaderTableSize)
}
if h2.MaxDecoderHeaderTableSize != 0 {
conf.MaxDecoderHeaderTableSize = uint32(h2.MaxDecoderHeaderTableSize)
}
if h2.MaxConcurrentStreams != 0 {
conf.MaxConcurrentStreams = uint32(h2.MaxConcurrentStreams)
}
if h2.MaxReadFrameSize != 0 {
conf.MaxReadFrameSize = uint32(h2.MaxReadFrameSize)
}
if h2.MaxReceiveBufferPerConnection != 0 {
conf.MaxUploadBufferPerConnection = int32(h2.MaxReceiveBufferPerConnection)
}
if h2.MaxReceiveBufferPerStream != 0 {
conf.MaxUploadBufferPerStream = int32(h2.MaxReceiveBufferPerStream)
}
if h2.SendPingTimeout != 0 {
conf.SendPingTimeout = h2.SendPingTimeout
}
if h2.PingTimeout != 0 {
conf.PingTimeout = h2.PingTimeout
}
if h2.WriteByteTimeout != 0 {
conf.WriteByteTimeout = h2.WriteByteTimeout
}
if h2.PermitProhibitedCipherSuites {
conf.PermitProhibitedCipherSuites = true
}
if h2.CountError != nil {
conf.CountError = h2.CountError
}
}

View File

@@ -1,61 +0,0 @@
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.24
package http2
import "net/http"
// fillNetHTTPServerConfig sets fields in conf from srv.HTTP2.
func fillNetHTTPServerConfig(conf *http2Config, srv *http.Server) {
fillNetHTTPConfig(conf, srv.HTTP2)
}
// fillNetHTTPTransportConfig sets fields in conf from tr.HTTP2.
func fillNetHTTPTransportConfig(conf *http2Config, tr *http.Transport) {
fillNetHTTPConfig(conf, tr.HTTP2)
}
func fillNetHTTPConfig(conf *http2Config, h2 *http.HTTP2Config) {
if h2 == nil {
return
}
if h2.MaxConcurrentStreams != 0 {
conf.MaxConcurrentStreams = uint32(h2.MaxConcurrentStreams)
}
if h2.MaxEncoderHeaderTableSize != 0 {
conf.MaxEncoderHeaderTableSize = uint32(h2.MaxEncoderHeaderTableSize)
}
if h2.MaxDecoderHeaderTableSize != 0 {
conf.MaxDecoderHeaderTableSize = uint32(h2.MaxDecoderHeaderTableSize)
}
if h2.MaxConcurrentStreams != 0 {
conf.MaxConcurrentStreams = uint32(h2.MaxConcurrentStreams)
}
if h2.MaxReadFrameSize != 0 {
conf.MaxReadFrameSize = uint32(h2.MaxReadFrameSize)
}
if h2.MaxReceiveBufferPerConnection != 0 {
conf.MaxUploadBufferPerConnection = int32(h2.MaxReceiveBufferPerConnection)
}
if h2.MaxReceiveBufferPerStream != 0 {
conf.MaxUploadBufferPerStream = int32(h2.MaxReceiveBufferPerStream)
}
if h2.SendPingTimeout != 0 {
conf.SendPingTimeout = h2.SendPingTimeout
}
if h2.PingTimeout != 0 {
conf.PingTimeout = h2.PingTimeout
}
if h2.WriteByteTimeout != 0 {
conf.WriteByteTimeout = h2.WriteByteTimeout
}
if h2.PermitProhibitedCipherSuites {
conf.PermitProhibitedCipherSuites = true
}
if h2.CountError != nil {
conf.CountError = h2.CountError
}
}

View File

@@ -1,16 +0,0 @@
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !go1.24
package http2
import "net/http"
// Pre-Go 1.24 fallback.
// The Server.HTTP2 and Transport.HTTP2 config fields were added in Go 1.24.
func fillNetHTTPServerConfig(conf *http2Config, srv *http.Server) {}
func fillNetHTTPTransportConfig(conf *http2Config, tr *http.Transport) {}

View File

@@ -15,21 +15,32 @@ import (
"runtime"
"strconv"
"sync"
"sync/atomic"
)
var DebugGoroutines = os.Getenv("DEBUG_HTTP2_GOROUTINES") == "1"
// Setting DebugGoroutines to false during a test to disable goroutine debugging
// results in race detector complaints when a test leaves goroutines running before
// returning. Tests shouldn't do this, of course, but when they do it generally shows
// up as infrequent, hard-to-debug flakes. (See #66519.)
//
// Disable goroutine debugging during individual tests with an atomic bool.
// (Note that it's safe to enable/disable debugging mid-test, so the actual race condition
// here is harmless.)
var disableDebugGoroutines atomic.Bool
type goroutineLock uint64
func newGoroutineLock() goroutineLock {
if !DebugGoroutines {
if !DebugGoroutines || disableDebugGoroutines.Load() {
return 0
}
return goroutineLock(curGoroutineID())
}
func (g goroutineLock) check() {
if !DebugGoroutines {
if !DebugGoroutines || disableDebugGoroutines.Load() {
return
}
if curGoroutineID() != uint64(g) {
@@ -38,7 +49,7 @@ func (g goroutineLock) check() {
}
func (g goroutineLock) checkNotOn() {
if !DebugGoroutines {
if !DebugGoroutines || disableDebugGoroutines.Load() {
return
}
if curGoroutineID() == uint64(g) {

View File

@@ -15,7 +15,6 @@ package http2 // import "golang.org/x/net/http2"
import (
"bufio"
"context"
"crypto/tls"
"errors"
"fmt"
@@ -255,15 +254,13 @@ func (cw closeWaiter) Wait() {
// idle memory usage with many connections.
type bufferedWriter struct {
_ incomparable
group synctestGroupInterface // immutable
conn net.Conn // immutable
bw *bufio.Writer // non-nil when data is buffered
byteTimeout time.Duration // immutable, WriteByteTimeout
conn net.Conn // immutable
bw *bufio.Writer // non-nil when data is buffered
byteTimeout time.Duration // immutable, WriteByteTimeout
}
func newBufferedWriter(group synctestGroupInterface, conn net.Conn, timeout time.Duration) *bufferedWriter {
func newBufferedWriter(conn net.Conn, timeout time.Duration) *bufferedWriter {
return &bufferedWriter{
group: group,
conn: conn,
byteTimeout: timeout,
}
@@ -314,24 +311,18 @@ func (w *bufferedWriter) Flush() error {
type bufferedWriterTimeoutWriter bufferedWriter
func (w *bufferedWriterTimeoutWriter) Write(p []byte) (n int, err error) {
return writeWithByteTimeout(w.group, w.conn, w.byteTimeout, p)
return writeWithByteTimeout(w.conn, w.byteTimeout, p)
}
// writeWithByteTimeout writes to conn.
// If more than timeout passes without any bytes being written to the connection,
// the write fails.
func writeWithByteTimeout(group synctestGroupInterface, conn net.Conn, timeout time.Duration, p []byte) (n int, err error) {
func writeWithByteTimeout(conn net.Conn, timeout time.Duration, p []byte) (n int, err error) {
if timeout <= 0 {
return conn.Write(p)
}
for {
var now time.Time
if group == nil {
now = time.Now()
} else {
now = group.Now()
}
conn.SetWriteDeadline(now.Add(timeout))
conn.SetWriteDeadline(time.Now().Add(timeout))
nn, err := conn.Write(p[n:])
n += nn
if n == len(p) || nn == 0 || !errors.Is(err, os.ErrDeadlineExceeded) {
@@ -417,14 +408,3 @@ func (s *sorter) SortStrings(ss []string) {
// makes that struct also non-comparable, and generally doesn't add
// any size (as long as it's first).
type incomparable [0]func()
// synctestGroupInterface is the methods of synctestGroup used by Server and Transport.
// It's defined as an interface here to let us keep synctestGroup entirely test-only
// and not a part of non-test builds.
type synctestGroupInterface interface {
Join()
Now() time.Time
NewTimer(d time.Duration) timer
AfterFunc(d time.Duration, f func()) timer
ContextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc)
}

View File

@@ -176,39 +176,6 @@ type Server struct {
// so that we don't embed a Mutex in this struct, which will make the
// struct non-copyable, which might break some callers.
state *serverInternalState
// Synchronization group used for testing.
// Outside of tests, this is nil.
group synctestGroupInterface
}
func (s *Server) markNewGoroutine() {
if s.group != nil {
s.group.Join()
}
}
func (s *Server) now() time.Time {
if s.group != nil {
return s.group.Now()
}
return time.Now()
}
// newTimer creates a new time.Timer, or a synthetic timer in tests.
func (s *Server) newTimer(d time.Duration) timer {
if s.group != nil {
return s.group.NewTimer(d)
}
return timeTimer{time.NewTimer(d)}
}
// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
func (s *Server) afterFunc(d time.Duration, f func()) timer {
if s.group != nil {
return s.group.AfterFunc(d, f)
}
return timeTimer{time.AfterFunc(d, f)}
}
type serverInternalState struct {
@@ -423,6 +390,9 @@ func (o *ServeConnOpts) handler() http.Handler {
//
// The opts parameter is optional. If nil, default values are used.
func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
if opts == nil {
opts = &ServeConnOpts{}
}
s.serveConn(c, opts, nil)
}
@@ -438,7 +408,7 @@ func (s *Server) serveConn(c net.Conn, opts *ServeConnOpts, newf func(*serverCon
conn: c,
baseCtx: baseCtx,
remoteAddrStr: c.RemoteAddr().String(),
bw: newBufferedWriter(s.group, c, conf.WriteByteTimeout),
bw: newBufferedWriter(c, conf.WriteByteTimeout),
handler: opts.handler(),
streams: make(map[uint32]*stream),
readFrameCh: make(chan readFrameResult),
@@ -638,11 +608,11 @@ type serverConn struct {
pingSent bool
sentPingData [8]byte
goAwayCode ErrCode
shutdownTimer timer // nil until used
idleTimer timer // nil if unused
shutdownTimer *time.Timer // nil until used
idleTimer *time.Timer // nil if unused
readIdleTimeout time.Duration
pingTimeout time.Duration
readIdleTimer timer // nil if unused
readIdleTimer *time.Timer // nil if unused
// Owned by the writeFrameAsync goroutine:
headerWriteBuf bytes.Buffer
@@ -687,12 +657,12 @@ type stream struct {
flow outflow // limits writing from Handler to client
inflow inflow // what the client is allowed to POST/etc to us
state streamState
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
gotTrailerHeader bool // HEADER frame for trailers was seen
wroteHeaders bool // whether we wrote headers (not status 100)
readDeadline timer // nil if unused
writeDeadline timer // nil if unused
closeErr error // set before cw is closed
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
gotTrailerHeader bool // HEADER frame for trailers was seen
wroteHeaders bool // whether we wrote headers (not status 100)
readDeadline *time.Timer // nil if unused
writeDeadline *time.Timer // nil if unused
closeErr error // set before cw is closed
trailer http.Header // accumulated trailers
reqTrailer http.Header // handler's Request.Trailer
@@ -848,7 +818,6 @@ type readFrameResult struct {
// consumer is done with the frame.
// It's run on its own goroutine.
func (sc *serverConn) readFrames() {
sc.srv.markNewGoroutine()
gate := make(chan struct{})
gateDone := func() { gate <- struct{}{} }
for {
@@ -881,7 +850,6 @@ type frameWriteResult struct {
// At most one goroutine can be running writeFrameAsync at a time per
// serverConn.
func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest, wd *writeData) {
sc.srv.markNewGoroutine()
var err error
if wd == nil {
err = wr.write.writeFrame(sc)
@@ -965,22 +933,22 @@ func (sc *serverConn) serve(conf http2Config) {
sc.setConnState(http.StateIdle)
if sc.srv.IdleTimeout > 0 {
sc.idleTimer = sc.srv.afterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
defer sc.idleTimer.Stop()
}
if conf.SendPingTimeout > 0 {
sc.readIdleTimeout = conf.SendPingTimeout
sc.readIdleTimer = sc.srv.afterFunc(conf.SendPingTimeout, sc.onReadIdleTimer)
sc.readIdleTimer = time.AfterFunc(conf.SendPingTimeout, sc.onReadIdleTimer)
defer sc.readIdleTimer.Stop()
}
go sc.readFrames() // closed by defer sc.conn.Close above
settingsTimer := sc.srv.afterFunc(firstSettingsTimeout, sc.onSettingsTimer)
settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer)
defer settingsTimer.Stop()
lastFrameTime := sc.srv.now()
lastFrameTime := time.Now()
loopNum := 0
for {
loopNum++
@@ -994,7 +962,7 @@ func (sc *serverConn) serve(conf http2Config) {
case res := <-sc.wroteFrameCh:
sc.wroteFrame(res)
case res := <-sc.readFrameCh:
lastFrameTime = sc.srv.now()
lastFrameTime = time.Now()
// Process any written frames before reading new frames from the client since a
// written frame could have triggered a new stream to be started.
if sc.writingFrameAsync {
@@ -1077,7 +1045,7 @@ func (sc *serverConn) handlePingTimer(lastFrameReadTime time.Time) {
}
pingAt := lastFrameReadTime.Add(sc.readIdleTimeout)
now := sc.srv.now()
now := time.Now()
if pingAt.After(now) {
// We received frames since arming the ping timer.
// Reset it for the next possible timeout.
@@ -1141,10 +1109,10 @@ func (sc *serverConn) readPreface() error {
errc <- nil
}
}()
timer := sc.srv.newTimer(prefaceTimeout) // TODO: configurable on *Server?
timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server?
defer timer.Stop()
select {
case <-timer.C():
case <-timer.C:
return errPrefaceTimeout
case err := <-errc:
if err == nil {
@@ -1160,6 +1128,21 @@ var errChanPool = sync.Pool{
New: func() interface{} { return make(chan error, 1) },
}
func getErrChan() chan error {
if inTests {
// Channels cannot be reused across synctest tests.
return make(chan error, 1)
} else {
return errChanPool.Get().(chan error)
}
}
func putErrChan(ch chan error) {
if !inTests {
errChanPool.Put(ch)
}
}
var writeDataPool = sync.Pool{
New: func() interface{} { return new(writeData) },
}
@@ -1167,7 +1150,7 @@ var writeDataPool = sync.Pool{
// writeDataFromHandler writes DATA response frames from a handler on
// the given stream.
func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error {
ch := errChanPool.Get().(chan error)
ch := getErrChan()
writeArg := writeDataPool.Get().(*writeData)
*writeArg = writeData{stream.id, data, endStream}
err := sc.writeFrameFromHandler(FrameWriteRequest{
@@ -1199,7 +1182,7 @@ func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStrea
return errStreamClosed
}
}
errChanPool.Put(ch)
putErrChan(ch)
if frameWriteDone {
writeDataPool.Put(writeArg)
}
@@ -1513,7 +1496,7 @@ func (sc *serverConn) goAway(code ErrCode) {
func (sc *serverConn) shutDownIn(d time.Duration) {
sc.serveG.check()
sc.shutdownTimer = sc.srv.afterFunc(d, sc.onShutdownTimer)
sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
}
func (sc *serverConn) resetStream(se StreamError) {
@@ -2118,7 +2101,7 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
// (in Go 1.8), though. That's a more sane option anyway.
if sc.hs.ReadTimeout > 0 {
sc.conn.SetReadDeadline(time.Time{})
st.readDeadline = sc.srv.afterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
st.readDeadline = time.AfterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
}
return sc.scheduleHandler(id, rw, req, handler)
@@ -2216,7 +2199,7 @@ func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream
st.flow.add(sc.initialStreamSendWindowSize)
st.inflow.init(sc.initialStreamRecvWindowSize)
if sc.hs.WriteTimeout > 0 {
st.writeDeadline = sc.srv.afterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
}
sc.streams[id] = st
@@ -2405,7 +2388,6 @@ func (sc *serverConn) handlerDone() {
// Run on its own goroutine.
func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
sc.srv.markNewGoroutine()
defer sc.sendServeMsg(handlerDoneMsg)
didPanic := true
defer func() {
@@ -2454,7 +2436,7 @@ func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) erro
// waiting for this frame to be written, so an http.Flush mid-handler
// writes out the correct value of keys, before a handler later potentially
// mutates it.
errc = errChanPool.Get().(chan error)
errc = getErrChan()
}
if err := sc.writeFrameFromHandler(FrameWriteRequest{
write: headerData,
@@ -2466,7 +2448,7 @@ func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) erro
if errc != nil {
select {
case err := <-errc:
errChanPool.Put(errc)
putErrChan(errc)
return err
case <-sc.doneServing:
return errClientDisconnected
@@ -2573,7 +2555,7 @@ func (b *requestBody) Read(p []byte) (n int, err error) {
if err == io.EOF {
b.sawEOF = true
}
if b.conn == nil && inTests {
if b.conn == nil {
return
}
b.conn.noteBodyReadFromHandler(b.stream, n, err)
@@ -2702,7 +2684,7 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
var date string
if _, ok := rws.snapHeader["Date"]; !ok {
// TODO(bradfitz): be faster here, like net/http? measure.
date = rws.conn.srv.now().UTC().Format(http.TimeFormat)
date = time.Now().UTC().Format(http.TimeFormat)
}
for _, v := range rws.snapHeader["Trailer"] {
@@ -2824,7 +2806,7 @@ func (rws *responseWriterState) promoteUndeclaredTrailers() {
func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
st := w.rws.stream
if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) {
if !deadline.IsZero() && deadline.Before(time.Now()) {
// If we're setting a deadline in the past, reset the stream immediately
// so writes after SetWriteDeadline returns will fail.
st.onReadTimeout()
@@ -2840,9 +2822,9 @@ func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
if deadline.IsZero() {
st.readDeadline = nil
} else if st.readDeadline == nil {
st.readDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onReadTimeout)
st.readDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onReadTimeout)
} else {
st.readDeadline.Reset(deadline.Sub(sc.srv.now()))
st.readDeadline.Reset(deadline.Sub(time.Now()))
}
})
return nil
@@ -2850,7 +2832,7 @@ func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
func (w *responseWriter) SetWriteDeadline(deadline time.Time) error {
st := w.rws.stream
if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) {
if !deadline.IsZero() && deadline.Before(time.Now()) {
// If we're setting a deadline in the past, reset the stream immediately
// so writes after SetWriteDeadline returns will fail.
st.onWriteTimeout()
@@ -2866,9 +2848,9 @@ func (w *responseWriter) SetWriteDeadline(deadline time.Time) error {
if deadline.IsZero() {
st.writeDeadline = nil
} else if st.writeDeadline == nil {
st.writeDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onWriteTimeout)
st.writeDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onWriteTimeout)
} else {
st.writeDeadline.Reset(deadline.Sub(sc.srv.now()))
st.writeDeadline.Reset(deadline.Sub(time.Now()))
}
})
return nil
@@ -3147,7 +3129,7 @@ func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
method: opts.Method,
url: u,
header: cloneHeader(opts.Header),
done: errChanPool.Get().(chan error),
done: getErrChan(),
}
select {
@@ -3164,7 +3146,7 @@ func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
case <-st.cw:
return errStreamClosed
case err := <-msg.done:
errChanPool.Put(msg.done)
putErrChan(msg.done)
return err
}
}

View File

@@ -1,20 +0,0 @@
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import "time"
// A timer is a time.Timer, as an interface which can be replaced in tests.
type timer = interface {
C() <-chan time.Time
Reset(d time.Duration) bool
Stop() bool
}
// timeTimer adapts a time.Timer to the timer interface.
type timeTimer struct {
*time.Timer
}
func (t timeTimer) C() <-chan time.Time { return t.Timer.C }

View File

@@ -193,50 +193,6 @@ type Transport struct {
type transportTestHooks struct {
newclientconn func(*ClientConn)
group synctestGroupInterface
}
func (t *Transport) markNewGoroutine() {
if t != nil && t.transportTestHooks != nil {
t.transportTestHooks.group.Join()
}
}
func (t *Transport) now() time.Time {
if t != nil && t.transportTestHooks != nil {
return t.transportTestHooks.group.Now()
}
return time.Now()
}
func (t *Transport) timeSince(when time.Time) time.Duration {
if t != nil && t.transportTestHooks != nil {
return t.now().Sub(when)
}
return time.Since(when)
}
// newTimer creates a new time.Timer, or a synthetic timer in tests.
func (t *Transport) newTimer(d time.Duration) timer {
if t.transportTestHooks != nil {
return t.transportTestHooks.group.NewTimer(d)
}
return timeTimer{time.NewTimer(d)}
}
// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
func (t *Transport) afterFunc(d time.Duration, f func()) timer {
if t.transportTestHooks != nil {
return t.transportTestHooks.group.AfterFunc(d, f)
}
return timeTimer{time.AfterFunc(d, f)}
}
func (t *Transport) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) {
if t.transportTestHooks != nil {
return t.transportTestHooks.group.ContextWithTimeout(ctx, d)
}
return context.WithTimeout(ctx, d)
}
func (t *Transport) maxHeaderListSize() uint32 {
@@ -366,7 +322,7 @@ type ClientConn struct {
readerErr error // set before readerDone is closed
idleTimeout time.Duration // or 0 for never
idleTimer timer
idleTimer *time.Timer
mu sync.Mutex // guards following
cond *sync.Cond // hold mu; broadcast on flow/closed changes
@@ -534,14 +490,12 @@ func (cs *clientStream) closeReqBodyLocked() {
cs.reqBodyClosed = make(chan struct{})
reqBodyClosed := cs.reqBodyClosed
go func() {
cs.cc.t.markNewGoroutine()
cs.reqBody.Close()
close(reqBodyClosed)
}()
}
type stickyErrWriter struct {
group synctestGroupInterface
conn net.Conn
timeout time.Duration
err *error
@@ -551,7 +505,7 @@ func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
if *sew.err != nil {
return 0, *sew.err
}
n, err = writeWithByteTimeout(sew.group, sew.conn, sew.timeout, p)
n, err = writeWithByteTimeout(sew.conn, sew.timeout, p)
*sew.err = err
return n, err
}
@@ -650,9 +604,9 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
backoff := float64(uint(1) << (uint(retry) - 1))
backoff += backoff * (0.1 * mathrand.Float64())
d := time.Second * time.Duration(backoff)
tm := t.newTimer(d)
tm := time.NewTimer(d)
select {
case <-tm.C():
case <-tm.C:
t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
continue
case <-req.Context().Done():
@@ -699,6 +653,7 @@ var (
errClientConnUnusable = errors.New("http2: client conn not usable")
errClientConnNotEstablished = errors.New("http2: client conn could not be established")
errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
errClientConnForceClosed = errors.New("http2: client connection force closed via ClientConn.Close")
)
// shouldRetryRequest is called by RoundTrip when a request fails to get
@@ -838,14 +793,11 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
pingTimeout: conf.PingTimeout,
pings: make(map[[8]byte]chan struct{}),
reqHeaderMu: make(chan struct{}, 1),
lastActive: t.now(),
lastActive: time.Now(),
}
var group synctestGroupInterface
if t.transportTestHooks != nil {
t.markNewGoroutine()
t.transportTestHooks.newclientconn(cc)
c = cc.tconn
group = t.group
}
if VerboseLogs {
t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
@@ -857,7 +809,6 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
// TODO: adjust this writer size to account for frame size +
// MTU + crypto/tls record padding.
cc.bw = bufio.NewWriter(stickyErrWriter{
group: group,
conn: c,
timeout: conf.WriteByteTimeout,
err: &cc.werr,
@@ -906,7 +857,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
// Start the idle timer after the connection is fully initialized.
if d := t.idleConnTimeout(); d != 0 {
cc.idleTimeout = d
cc.idleTimer = t.afterFunc(d, cc.onIdleTimeout)
cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
}
go cc.readLoop()
@@ -917,7 +868,7 @@ func (cc *ClientConn) healthCheck() {
pingTimeout := cc.pingTimeout
// We don't need to periodically ping in the health check, because the readLoop of ClientConn will
// trigger the healthCheck again if there is no frame received.
ctx, cancel := cc.t.contextWithTimeout(context.Background(), pingTimeout)
ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
defer cancel()
cc.vlogf("http2: Transport sending health check")
err := cc.Ping(ctx)
@@ -1120,7 +1071,7 @@ func (cc *ClientConn) tooIdleLocked() bool {
// times are compared based on their wall time. We don't want
// to reuse a connection that's been sitting idle during
// VM/laptop suspend if monotonic time was also frozen.
return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && cc.t.timeSince(cc.lastIdle.Round(0)) > cc.idleTimeout
return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
}
// onIdleTimeout is called from a time.AfterFunc goroutine. It will
@@ -1186,7 +1137,6 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error {
done := make(chan struct{})
cancelled := false // guarded by cc.mu
go func() {
cc.t.markNewGoroutine()
cc.mu.Lock()
defer cc.mu.Unlock()
for {
@@ -1257,8 +1207,7 @@ func (cc *ClientConn) closeForError(err error) {
//
// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
func (cc *ClientConn) Close() error {
err := errors.New("http2: client connection force closed via ClientConn.Close")
cc.closeForError(err)
cc.closeForError(errClientConnForceClosed)
return nil
}
@@ -1427,7 +1376,6 @@ func (cc *ClientConn) roundTrip(req *http.Request, streamf func(*clientStream))
//
// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
func (cs *clientStream) doRequest(req *http.Request, streamf func(*clientStream)) {
cs.cc.t.markNewGoroutine()
err := cs.writeRequest(req, streamf)
cs.cleanupWriteRequest(err)
}
@@ -1558,9 +1506,9 @@ func (cs *clientStream) writeRequest(req *http.Request, streamf func(*clientStre
var respHeaderTimer <-chan time.Time
var respHeaderRecv chan struct{}
if d := cc.responseHeaderTimeout(); d != 0 {
timer := cc.t.newTimer(d)
timer := time.NewTimer(d)
defer timer.Stop()
respHeaderTimer = timer.C()
respHeaderTimer = timer.C
respHeaderRecv = cs.respHeaderRecv
}
// Wait until the peer half-closes its end of the stream,
@@ -1753,7 +1701,7 @@ func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
// Return a fatal error which aborts the retry loop.
return errClientConnNotEstablished
}
cc.lastActive = cc.t.now()
cc.lastActive = time.Now()
if cc.closed || !cc.canTakeNewRequestLocked() {
return errClientConnUnusable
}
@@ -2092,10 +2040,10 @@ func (cc *ClientConn) forgetStreamID(id uint32) {
if len(cc.streams) != slen-1 {
panic("forgetting unknown stream id")
}
cc.lastActive = cc.t.now()
cc.lastActive = time.Now()
if len(cc.streams) == 0 && cc.idleTimer != nil {
cc.idleTimer.Reset(cc.idleTimeout)
cc.lastIdle = cc.t.now()
cc.lastIdle = time.Now()
}
// Wake up writeRequestBody via clientStream.awaitFlowControl and
// wake up RoundTrip if there is a pending request.
@@ -2121,7 +2069,6 @@ type clientConnReadLoop struct {
// readLoop runs in its own goroutine and reads and dispatches frames.
func (cc *ClientConn) readLoop() {
cc.t.markNewGoroutine()
rl := &clientConnReadLoop{cc: cc}
defer rl.cleanup()
cc.readerErr = rl.run()
@@ -2188,9 +2135,9 @@ func (rl *clientConnReadLoop) cleanup() {
if cc.idleTimeout > 0 && unusedWaitTime > cc.idleTimeout {
unusedWaitTime = cc.idleTimeout
}
idleTime := cc.t.now().Sub(cc.lastActive)
idleTime := time.Now().Sub(cc.lastActive)
if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime && !cc.closedOnIdle {
cc.idleTimer = cc.t.afterFunc(unusedWaitTime-idleTime, func() {
cc.idleTimer = time.AfterFunc(unusedWaitTime-idleTime, func() {
cc.t.connPool().MarkDead(cc)
})
} else {
@@ -2250,9 +2197,9 @@ func (rl *clientConnReadLoop) run() error {
cc := rl.cc
gotSettings := false
readIdleTimeout := cc.readIdleTimeout
var t timer
var t *time.Timer
if readIdleTimeout != 0 {
t = cc.t.afterFunc(readIdleTimeout, cc.healthCheck)
t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
}
for {
f, err := cc.fr.ReadFrame()
@@ -2998,7 +2945,6 @@ func (cc *ClientConn) Ping(ctx context.Context) error {
var pingError error
errc := make(chan struct{})
go func() {
cc.t.markNewGoroutine()
cc.wmu.Lock()
defer cc.wmu.Unlock()
if pingError = cc.fr.WritePing(false, p); pingError != nil {
@@ -3228,7 +3174,7 @@ func traceGotConn(req *http.Request, cc *ClientConn, reused bool) {
cc.mu.Lock()
ci.WasIdle = len(cc.streams) == 0 && reused
if ci.WasIdle && !cc.lastActive.IsZero() {
ci.IdleTime = cc.t.timeSince(cc.lastActive)
ci.IdleTime = time.Since(cc.lastActive)
}
cc.mu.Unlock()

12
vendor/modules.txt vendored
View File

@@ -1125,7 +1125,7 @@ github.com/nats-io/nats-server/v2/server/stree
github.com/nats-io/nats-server/v2/server/sysmem
github.com/nats-io/nats-server/v2/server/thw
github.com/nats-io/nats-server/v2/server/tpm
# github.com/nats-io/nats.go v1.45.0
# github.com/nats-io/nats.go v1.46.0
## explicit; go 1.23.0
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin
@@ -1319,7 +1319,7 @@ github.com/open-policy-agent/opa/v1/version
# github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76
## explicit; go 1.18
github.com/opencloud-eu/libre-graph-api-go
# github.com/opencloud-eu/reva/v2 v2.38.1-0.20250922152322-476bb1f0070a
# github.com/opencloud-eu/reva/v2 v2.38.1-0.20250924125540-eaa2437c36b2
## explicit; go 1.24.1
github.com/opencloud-eu/reva/v2/cmd/revad/internal/grace
github.com/opencloud-eu/reva/v2/cmd/revad/runtime
@@ -2371,8 +2371,8 @@ golang.org/x/exp/slices
golang.org/x/exp/slog
golang.org/x/exp/slog/internal
golang.org/x/exp/slog/internal/buffer
# golang.org/x/image v0.30.0
## explicit; go 1.23.0
# golang.org/x/image v0.31.0
## explicit; go 1.24.0
golang.org/x/image/bmp
golang.org/x/image/ccitt
golang.org/x/image/font
@@ -2388,8 +2388,8 @@ golang.org/x/image/vector
golang.org/x/mod/internal/lazyregexp
golang.org/x/mod/module
golang.org/x/mod/semver
# golang.org/x/net v0.43.0
## explicit; go 1.23.0
# golang.org/x/net v0.44.0
## explicit; go 1.24.0
golang.org/x/net/bpf
golang.org/x/net/context
golang.org/x/net/context/ctxhttp