From e21fe7a4fe8ba601b306d7653b8e62f8b4fc061f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Tue, 15 Jul 2025 09:31:32 +0200 Subject: [PATCH] Expose the MaxAckPending and AckWait settings for the search service --- services/search/pkg/config/defaults/defaultconfig.go | 3 +++ services/search/pkg/config/search.go | 5 +++++ services/search/pkg/search/events.go | 2 ++ 3 files changed, 10 insertions(+) diff --git a/services/search/pkg/config/defaults/defaultconfig.go b/services/search/pkg/config/defaults/defaultconfig.go index 21a5fe9a53..ba9aefaf1d 100644 --- a/services/search/pkg/config/defaults/defaultconfig.go +++ b/services/search/pkg/config/defaults/defaultconfig.go @@ -2,6 +2,7 @@ package defaults import ( "path/filepath" + "time" "github.com/opencloud-eu/opencloud/pkg/config/defaults" "github.com/opencloud-eu/opencloud/pkg/shared" @@ -53,6 +54,8 @@ func DefaultConfig() *config.Config { DebounceDuration: 1000, AsyncUploads: true, EnableTLS: false, + MaxAckPending: 1000, + AckWait: 1 * time.Minute, }, ContentExtractionSizeLimit: 20 * 1024 * 1024, // Limit content extraction to <20MB files by default } diff --git a/services/search/pkg/config/search.go b/services/search/pkg/config/search.go index 8f1637fd29..b08dea984d 100644 --- a/services/search/pkg/config/search.go +++ b/services/search/pkg/config/search.go @@ -1,5 +1,7 @@ package config +import "time" + // Events combines the configuration options for the event bus. type Events struct { Endpoint string `yaml:"endpoint" env:"OC_EVENTS_ENDPOINT;SEARCH_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture." introductionVersion:"1.0.0"` @@ -13,4 +15,7 @@ type Events struct { EnableTLS bool `yaml:"enable_tls" env:"OC_EVENTS_ENABLE_TLS;SEARCH_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the OpenCloud service which receives and delivers events between the services." introductionVersion:"1.0.0"` AuthUsername string `yaml:"username" env:"OC_EVENTS_AUTH_USERNAME;SEARCH_EVENTS_AUTH_USERNAME" desc:"The username to authenticate with the events broker. The events broker is the OpenCloud service which receives and delivers events between the services." introductionVersion:"1.0.0"` AuthPassword string `yaml:"password" env:"OC_EVENTS_AUTH_PASSWORD;SEARCH_EVENTS_AUTH_PASSWORD" desc:"The password to authenticate with the events broker. The events broker is the OpenCloud service which receives and delivers events between the services." introductionVersion:"1.0.0"` + + MaxAckPending int `yaml:"max_ack_pending" env:"SEARCH_EVENTS_MAX_ACK_PENDING" desc:"The maximum number of unacknowledged messages. This is used to limit the number of messages that can be in flight at the same time." introductionVersion:"%%NEXT%%"` + AckWait time.Duration `yaml:"ack_wait" env:"SEARCH_EVENTS_ACK_WAIT" desc:"The time to wait for an ack before the message is redelivered. This is used to ensure that messages are not lost if the consumer crashes." introductionVersion:"%%NEXT%%"` } diff --git a/services/search/pkg/search/events.go b/services/search/pkg/search/events.go index 481878a0e8..7ad43b6ea3 100644 --- a/services/search/pkg/search/events.go +++ b/services/search/pkg/search/events.go @@ -41,6 +41,8 @@ func HandleEvents(s Searcher, cfg *config.Config, logger log.Logger) error { TLSRootCACertificate: cfg.Events.TLSRootCACertificate, AuthUsername: cfg.Events.AuthUsername, AuthPassword: cfg.Events.AuthPassword, + MaxAckPending: cfg.Events.MaxAckPending, + AckWait: cfg.Events.AckWait, }) if err != nil { return err