enhancement: add timeoutListener config

This commit is contained in:
fschade
2025-11-04 16:45:04 +01:00
parent 936a84096e
commit 2883a6245d
8 changed files with 103 additions and 66 deletions

View File

@@ -62,13 +62,14 @@ type Runtime struct {
type Config struct {
*shared.Commons `yaml:"shared"`
Tracing *shared.Tracing `yaml:"tracing"`
Log *shared.Log `yaml:"log"`
Cache *shared.Cache `yaml:"cache"`
GRPCClientTLS *shared.GRPCClientTLS `yaml:"grpc_client_tls"`
GRPCServiceTLS *shared.GRPCServiceTLS `yaml:"grpc_service_tls"`
HTTPServiceTLS shared.HTTPServiceTLS `yaml:"http_service_tls"`
Reva *shared.Reva `yaml:"reva"`
Tracing *shared.Tracing `yaml:"tracing"`
Log *shared.Log `yaml:"log"`
Cache *shared.Cache `yaml:"cache"`
GRPCClientTLS *shared.GRPCClientTLS `yaml:"grpc_client_tls"`
GRPCServiceTLS *shared.GRPCServiceTLS `yaml:"grpc_service_tls"`
HTTPServiceTLS shared.HTTPServiceTLS `yaml:"http_service_tls"`
HTTPServiceTimeout shared.HTTPServiceTimeout `yaml:"http_service_timeout"`
Reva *shared.Reva `yaml:"reva"`
Mode Mode // DEPRECATED
File string

View File

@@ -2,6 +2,7 @@ package parser
import (
"errors"
"time"
"github.com/opencloud-eu/opencloud/pkg/config"
"github.com/opencloud-eu/opencloud/pkg/config/envdecode"
@@ -61,6 +62,8 @@ func EnsureDefaults(cfg *config.Config) {
if cfg.Reva == nil {
cfg.Reva = &shared.Reva{}
}
cfg.HTTPServiceTimeout.Read = 60 * time.Second
}
// EnsureCommons copies applicable parts of the OpenCloud config into the commons part
@@ -83,6 +86,7 @@ func EnsureCommons(cfg *config.Config) {
}
cfg.Commons.HTTPServiceTLS = cfg.HTTPServiceTLS
cfg.Commons.HTTPServiceTimeout = cfg.HTTPServiceTimeout
cfg.Commons.TokenManager = structs.CopyOrZeroValue(cfg.TokenManager)

View File

@@ -4,10 +4,11 @@ import (
"context"
"net/http"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/shared"
"github.com/urfave/cli/v2"
"go.opentelemetry.io/otel/trace"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/shared"
)
// Option defines a single option function.
@@ -17,6 +18,7 @@ type Option func(o *Options)
type Options struct {
Logger log.Logger
TLSConfig shared.HTTPServiceTLS
TimeoutConfig shared.HTTPServiceTimeout
Namespace string
Name string
Version string
@@ -96,6 +98,13 @@ func TLSConfig(config shared.HTTPServiceTLS) Option {
}
}
// TimeoutConfig provides a function to set the TimeOutConfig option.
func TimeoutConfig(config shared.HTTPServiceTimeout) Option {
return func(o *Options) {
o.TimeoutConfig = config
}
}
// TraceProvider provides a function to set the TraceProvider option.
func TraceProvider(tp trace.TracerProvider) Option {
return func(o *Options) {

View File

@@ -5,15 +5,16 @@ import (
"fmt"
"net"
"strings"
"time"
"github.com/opencloud-eu/opencloud/pkg/broker"
"github.com/opencloud-eu/opencloud/pkg/registry"
netx "github.com/opencloud-eu/opencloud/pkg/x/net"
mhttps "github.com/go-micro/plugins/v4/server/http"
mtracer "github.com/go-micro/plugins/v4/wrapper/trace/opentelemetry"
occrypto "github.com/opencloud-eu/opencloud/pkg/crypto"
"go-micro.dev/v4"
occrypto "github.com/opencloud-eu/opencloud/pkg/crypto"
)
// Service simply wraps the go-micro web service.
@@ -66,13 +67,13 @@ func NewService(opts ...Option) (Service, error) {
}
}
// Wrap listener with timeoutListener to set a read timeout
tl := timeoutListener{
Listener: listener,
readTimeout: time.Duration(3) * time.Second,
}
mServer := mhttps.NewServer(mhttps.Listener(tl))
mServer := mhttps.NewServer(
// Wrap listener with timeoutListener to set a read timeout
mhttps.Listener(netx.TimeoutListener{
Listener: listener,
ReadTimeout: sopts.TimeoutConfig.Read,
}),
)
wopts := []micro.Option{
micro.Server(mServer),

View File

@@ -1,32 +0,0 @@
package http
import (
"net"
"time"
)
type timeoutListener struct {
net.Listener
readTimeout time.Duration
}
func (tl timeoutListener) Accept() (net.Conn, error) {
c, err := tl.Listener.Accept()
if err != nil {
return nil, err
}
return &timeoutConn{Conn: c, readTimeout: tl.readTimeout}, nil
}
type timeoutConn struct {
net.Conn
readTimeout time.Duration
}
// Read implements a read with sliding timeout window.
func (c *timeoutConn) Read(b []byte) (int, error) {
if c.readTimeout > 0 {
_ = c.SetReadDeadline(time.Now().Add(c.readTimeout))
}
return c.Conn.Read(b)
}

View File

@@ -55,6 +55,10 @@ type HTTPServiceTLS struct {
Key string `yaml:"key" env:"OC_HTTP_TLS_KEY" desc:"Path/File name for the TLS certificate key (in PEM format) for the server certificate to use for the http services." introductionVersion:"1.0.0"`
}
type HTTPServiceTimeout struct {
Read time.Duration `yaml:"duration" env:"OC_HTTP_TIMEOUT_READ" desc:"The duration after which a read operation will time out." introductionVersion:"%%NEXT%%"`
}
type Cache struct {
Store string `yaml:"store" env:"OC_CACHE_STORE" desc:"The type of the cache store. Supported values are: 'memory', 'redis-sentinel', 'nats-js-kv', 'noop'. See the text description for details." introductionVersion:"1.0.0"`
Nodes []string `yaml:"nodes" env:"OC_CACHE_STORE_NODES" desc:"A comma separated list of nodes to access the configured store. This has no effect when 'memory' store is configured. Note that the behaviour how nodes are used is dependent on the library of the configured store." introductionVersion:"1.0.0"`
@@ -69,21 +73,22 @@ type Cache struct {
// Commons holds configuration that are common to all extensions. Each extension can then decide whether
// to overwrite its values.
type Commons struct {
Log *Log `yaml:"log"`
Tracing *Tracing `yaml:"tracing"`
Cache *Cache `yaml:"cache"`
GRPCClientTLS *GRPCClientTLS `yaml:"grpc_client_tls"`
GRPCServiceTLS *GRPCServiceTLS `yaml:"grpc_service_tls"`
HTTPServiceTLS HTTPServiceTLS `yaml:"http_service_tls"`
OpenCloudURL string `yaml:"opencloud_url" env:"OC_URL" desc:"URL, where OpenCloud is reachable for users." introductionVersion:"1.0.0"`
TokenManager *TokenManager `mask:"struct" yaml:"token_manager"`
Reva *Reva `yaml:"reva"`
MachineAuthAPIKey string `mask:"password" yaml:"machine_auth_api_key" env:"OC_MACHINE_AUTH_API_KEY" desc:"Machine auth API key used to validate internal requests necessary for the access to resources from other services." introductionVersion:"1.0.0"`
TransferSecret string `mask:"password" yaml:"transfer_secret,omitempty" env:"REVA_TRANSFER_SECRET" desc:"The secret used for signing the requests towards the data gateway for up- and downloads." introductionVersion:"1.0.0"`
SystemUserID string `yaml:"system_user_id" env:"OC_SYSTEM_USER_ID" desc:"ID of the OpenCloud storage-system system user. Admins need to set the ID for the storage-system system user in this config option which is then used to reference the user. Any reasonable long string is possible, preferably this would be an UUIDv4 format." introductionVersion:"1.0.0"`
SystemUserAPIKey string `mask:"password" yaml:"system_user_api_key" env:"SYSTEM_USER_API_KEY" desc:"API key for all system users." introductionVersion:"1.0.0"`
AdminUserID string `yaml:"admin_user_id" env:"OC_ADMIN_USER_ID" desc:"ID of a user, that should receive admin privileges. Consider that the UUID can be encoded in some LDAP deployment configurations like in .ldif files. These need to be decoded beforehand." introductionVersion:"1.0.0"`
MultiTenantEnabled bool `yaml:"multi_tenant_enabled" env:"OC_MULTI_TENANT_ENABLED" desc:"Set this to true to enable multi-tenant support." introductionVersion:"%%NEXT%%"`
Log *Log `yaml:"log"`
Tracing *Tracing `yaml:"tracing"`
Cache *Cache `yaml:"cache"`
GRPCClientTLS *GRPCClientTLS `yaml:"grpc_client_tls"`
GRPCServiceTLS *GRPCServiceTLS `yaml:"grpc_service_tls"`
HTTPServiceTLS HTTPServiceTLS `yaml:"http_service_tls"`
HTTPServiceTimeout HTTPServiceTimeout `yaml:"http_service_timeout"`
OpenCloudURL string `yaml:"opencloud_url" env:"OC_URL" desc:"URL, where OpenCloud is reachable for users." introductionVersion:"1.0.0"`
TokenManager *TokenManager `mask:"struct" yaml:"token_manager"`
Reva *Reva `yaml:"reva"`
MachineAuthAPIKey string `mask:"password" yaml:"machine_auth_api_key" env:"OC_MACHINE_AUTH_API_KEY" desc:"Machine auth API key used to validate internal requests necessary for the access to resources from other services." introductionVersion:"1.0.0"`
TransferSecret string `mask:"password" yaml:"transfer_secret,omitempty" env:"REVA_TRANSFER_SECRET" desc:"The secret used for signing the requests towards the data gateway for up- and downloads." introductionVersion:"1.0.0"`
SystemUserID string `yaml:"system_user_id" env:"OC_SYSTEM_USER_ID" desc:"ID of the OpenCloud storage-system system user. Admins need to set the ID for the storage-system system user in this config option which is then used to reference the user. Any reasonable long string is possible, preferably this would be an UUIDv4 format." introductionVersion:"1.0.0"`
SystemUserAPIKey string `mask:"password" yaml:"system_user_api_key" env:"SYSTEM_USER_API_KEY" desc:"API key for all system users." introductionVersion:"1.0.0"`
AdminUserID string `yaml:"admin_user_id" env:"OC_ADMIN_USER_ID" desc:"ID of a user, that should receive admin privileges. Consider that the UUID can be encoded in some LDAP deployment configurations like in .ldif files. These need to be decoded beforehand." introductionVersion:"1.0.0"`
MultiTenantEnabled bool `yaml:"multi_tenant_enabled" env:"OC_MULTI_TENANT_ENABLED" desc:"Set this to true to enable multi-tenant support." introductionVersion:"%%NEXT%%"`
// NOTE: you will not fing GRPCMaxReceivedMessageSize size being used in the code. The envvar is actually extracted in revas `pool` package: https://github.com/cs3org/reva/blob/edge/pkg/rgrpc/todo/pool/connection.go
// It is mentioned here again so it is documented

47
pkg/x/net/listener.go Normal file
View File

@@ -0,0 +1,47 @@
package net
import (
"io"
gonet "net"
"time"
)
type TimeoutListener struct {
gonet.Listener
ReadTimeout time.Duration
}
func (l TimeoutListener) Accept() (gonet.Conn, error) {
c, err := l.Listener.Accept()
if err != nil {
return nil, err
}
return &TimeoutConn{Conn: c, readTimeout: l.ReadTimeout}, nil
}
type TimeoutConn struct {
gonet.Conn
readTimeout time.Duration
bodyDone bool
}
// Read implements a read with a sliding timeout window.
func (c *TimeoutConn) Read(b []byte) (int, error) {
if c.readTimeout > 0 && !c.bodyDone {
if err := c.SetReadDeadline(time.Now().Add(c.readTimeout)); err != nil {
return 0, err
}
}
n, err := c.Conn.Read(b)
if n > 0 && c.readTimeout > 0 && !c.bodyDone {
// reset deadline after every successful read
_ = c.SetReadDeadline(time.Now().Add(c.readTimeout))
}
if err == io.EOF {
c.bodyDone = true
}
return n, err
}

View File

@@ -4,11 +4,12 @@ import (
"fmt"
"os"
"go-micro.dev/v4"
pkgcrypto "github.com/opencloud-eu/opencloud/pkg/crypto"
"github.com/opencloud-eu/opencloud/pkg/service/http"
"github.com/opencloud-eu/opencloud/pkg/shared"
"github.com/opencloud-eu/opencloud/pkg/version"
"go-micro.dev/v4"
)
// Server initializes the http service and server.
@@ -40,6 +41,7 @@ func Server(opts ...Option) (http.Service, error) {
Cert: options.Config.HTTP.TLSCert,
Key: options.Config.HTTP.TLSKey,
}),
http.TimeoutConfig(options.Config.Commons.HTTPServiceTimeout),
http.Logger(options.Logger),
http.Address(options.Config.HTTP.Addr),
http.Namespace(options.Config.HTTP.Namespace),