Compare commits

...

2 Commits

Author SHA1 Message Date
fschade
2883a6245d enhancement: add timeoutListener config 2025-11-04 16:45:04 +01:00
Jörn Friedrich Dreyer
936a84096e introduce read http timeout listener
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
2025-11-04 14:39:55 +01:00
7 changed files with 118 additions and 30 deletions

View File

@@ -62,13 +62,14 @@ type Runtime struct {
type Config struct {
*shared.Commons `yaml:"shared"`
Tracing *shared.Tracing `yaml:"tracing"`
Log *shared.Log `yaml:"log"`
Cache *shared.Cache `yaml:"cache"`
GRPCClientTLS *shared.GRPCClientTLS `yaml:"grpc_client_tls"`
GRPCServiceTLS *shared.GRPCServiceTLS `yaml:"grpc_service_tls"`
HTTPServiceTLS shared.HTTPServiceTLS `yaml:"http_service_tls"`
Reva *shared.Reva `yaml:"reva"`
Tracing *shared.Tracing `yaml:"tracing"`
Log *shared.Log `yaml:"log"`
Cache *shared.Cache `yaml:"cache"`
GRPCClientTLS *shared.GRPCClientTLS `yaml:"grpc_client_tls"`
GRPCServiceTLS *shared.GRPCServiceTLS `yaml:"grpc_service_tls"`
HTTPServiceTLS shared.HTTPServiceTLS `yaml:"http_service_tls"`
HTTPServiceTimeout shared.HTTPServiceTimeout `yaml:"http_service_timeout"`
Reva *shared.Reva `yaml:"reva"`
Mode Mode // DEPRECATED
File string

View File

@@ -2,6 +2,7 @@ package parser
import (
"errors"
"time"
"github.com/opencloud-eu/opencloud/pkg/config"
"github.com/opencloud-eu/opencloud/pkg/config/envdecode"
@@ -61,6 +62,8 @@ func EnsureDefaults(cfg *config.Config) {
if cfg.Reva == nil {
cfg.Reva = &shared.Reva{}
}
cfg.HTTPServiceTimeout.Read = 60 * time.Second
}
// EnsureCommons copies applicable parts of the OpenCloud config into the commons part
@@ -83,6 +86,7 @@ func EnsureCommons(cfg *config.Config) {
}
cfg.Commons.HTTPServiceTLS = cfg.HTTPServiceTLS
cfg.Commons.HTTPServiceTimeout = cfg.HTTPServiceTimeout
cfg.Commons.TokenManager = structs.CopyOrZeroValue(cfg.TokenManager)

View File

@@ -4,10 +4,11 @@ import (
"context"
"net/http"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/shared"
"github.com/urfave/cli/v2"
"go.opentelemetry.io/otel/trace"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/shared"
)
// Option defines a single option function.
@@ -17,6 +18,7 @@ type Option func(o *Options)
type Options struct {
Logger log.Logger
TLSConfig shared.HTTPServiceTLS
TimeoutConfig shared.HTTPServiceTimeout
Namespace string
Name string
Version string
@@ -96,6 +98,13 @@ func TLSConfig(config shared.HTTPServiceTLS) Option {
}
}
// TimeoutConfig provides a function to set the TimeOutConfig option.
func TimeoutConfig(config shared.HTTPServiceTimeout) Option {
return func(o *Options) {
o.TimeoutConfig = config
}
}
// TraceProvider provides a function to set the TraceProvider option.
func TraceProvider(tp trace.TracerProvider) Option {
return func(o *Options) {

View File

@@ -3,16 +3,18 @@ package http
import (
"crypto/tls"
"fmt"
"net"
"strings"
"github.com/opencloud-eu/opencloud/pkg/broker"
"github.com/opencloud-eu/opencloud/pkg/registry"
netx "github.com/opencloud-eu/opencloud/pkg/x/net"
mhttps "github.com/go-micro/plugins/v4/server/http"
mtracer "github.com/go-micro/plugins/v4/wrapper/trace/opentelemetry"
occrypto "github.com/opencloud-eu/opencloud/pkg/crypto"
"go-micro.dev/v4"
"go-micro.dev/v4/server"
occrypto "github.com/opencloud-eu/opencloud/pkg/crypto"
)
// Service simply wraps the go-micro web service.
@@ -24,7 +26,9 @@ type Service struct {
func NewService(opts ...Option) (Service, error) {
noopBroker := broker.NoOp{}
sopts := newOptions(opts...)
var mServer server.Server
var listener net.Listener
var err error
if sopts.TLSConfig.Enabled {
var cert tls.Certificate
var err error
@@ -50,11 +54,27 @@ func NewService(opts ...Option) (Service, error) {
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
}
mServer = mhttps.NewServer(server.TLSConfig(tlsConfig))
// Create TLS listener
listener, err = tls.Listen("tcp", sopts.Address, tlsConfig)
if err != nil {
return Service{}, fmt.Errorf("error starting TLS listener: %w", err)
}
} else {
mServer = mhttps.NewServer()
// Create Non-TLS listener
listener, err = net.Listen("tcp", sopts.Address)
if err != nil {
return Service{}, fmt.Errorf("error starting TCP listener: %w", err)
}
}
mServer := mhttps.NewServer(
// Wrap listener with timeoutListener to set a read timeout
mhttps.Listener(netx.TimeoutListener{
Listener: listener,
ReadTimeout: sopts.TimeoutConfig.Read,
}),
)
wopts := []micro.Option{
micro.Server(mServer),
micro.Broker(noopBroker),

View File

@@ -55,6 +55,10 @@ type HTTPServiceTLS struct {
Key string `yaml:"key" env:"OC_HTTP_TLS_KEY" desc:"Path/File name for the TLS certificate key (in PEM format) for the server certificate to use for the http services." introductionVersion:"1.0.0"`
}
type HTTPServiceTimeout struct {
Read time.Duration `yaml:"duration" env:"OC_HTTP_TIMEOUT_READ" desc:"The duration after which a read operation will time out." introductionVersion:"%%NEXT%%"`
}
type Cache struct {
Store string `yaml:"store" env:"OC_CACHE_STORE" desc:"The type of the cache store. Supported values are: 'memory', 'redis-sentinel', 'nats-js-kv', 'noop'. See the text description for details." introductionVersion:"1.0.0"`
Nodes []string `yaml:"nodes" env:"OC_CACHE_STORE_NODES" desc:"A comma separated list of nodes to access the configured store. This has no effect when 'memory' store is configured. Note that the behaviour how nodes are used is dependent on the library of the configured store." introductionVersion:"1.0.0"`
@@ -69,21 +73,22 @@ type Cache struct {
// Commons holds configuration that are common to all extensions. Each extension can then decide whether
// to overwrite its values.
type Commons struct {
Log *Log `yaml:"log"`
Tracing *Tracing `yaml:"tracing"`
Cache *Cache `yaml:"cache"`
GRPCClientTLS *GRPCClientTLS `yaml:"grpc_client_tls"`
GRPCServiceTLS *GRPCServiceTLS `yaml:"grpc_service_tls"`
HTTPServiceTLS HTTPServiceTLS `yaml:"http_service_tls"`
OpenCloudURL string `yaml:"opencloud_url" env:"OC_URL" desc:"URL, where OpenCloud is reachable for users." introductionVersion:"1.0.0"`
TokenManager *TokenManager `mask:"struct" yaml:"token_manager"`
Reva *Reva `yaml:"reva"`
MachineAuthAPIKey string `mask:"password" yaml:"machine_auth_api_key" env:"OC_MACHINE_AUTH_API_KEY" desc:"Machine auth API key used to validate internal requests necessary for the access to resources from other services." introductionVersion:"1.0.0"`
TransferSecret string `mask:"password" yaml:"transfer_secret,omitempty" env:"REVA_TRANSFER_SECRET" desc:"The secret used for signing the requests towards the data gateway for up- and downloads." introductionVersion:"1.0.0"`
SystemUserID string `yaml:"system_user_id" env:"OC_SYSTEM_USER_ID" desc:"ID of the OpenCloud storage-system system user. Admins need to set the ID for the storage-system system user in this config option which is then used to reference the user. Any reasonable long string is possible, preferably this would be an UUIDv4 format." introductionVersion:"1.0.0"`
SystemUserAPIKey string `mask:"password" yaml:"system_user_api_key" env:"SYSTEM_USER_API_KEY" desc:"API key for all system users." introductionVersion:"1.0.0"`
AdminUserID string `yaml:"admin_user_id" env:"OC_ADMIN_USER_ID" desc:"ID of a user, that should receive admin privileges. Consider that the UUID can be encoded in some LDAP deployment configurations like in .ldif files. These need to be decoded beforehand." introductionVersion:"1.0.0"`
MultiTenantEnabled bool `yaml:"multi_tenant_enabled" env:"OC_MULTI_TENANT_ENABLED" desc:"Set this to true to enable multi-tenant support." introductionVersion:"%%NEXT%%"`
Log *Log `yaml:"log"`
Tracing *Tracing `yaml:"tracing"`
Cache *Cache `yaml:"cache"`
GRPCClientTLS *GRPCClientTLS `yaml:"grpc_client_tls"`
GRPCServiceTLS *GRPCServiceTLS `yaml:"grpc_service_tls"`
HTTPServiceTLS HTTPServiceTLS `yaml:"http_service_tls"`
HTTPServiceTimeout HTTPServiceTimeout `yaml:"http_service_timeout"`
OpenCloudURL string `yaml:"opencloud_url" env:"OC_URL" desc:"URL, where OpenCloud is reachable for users." introductionVersion:"1.0.0"`
TokenManager *TokenManager `mask:"struct" yaml:"token_manager"`
Reva *Reva `yaml:"reva"`
MachineAuthAPIKey string `mask:"password" yaml:"machine_auth_api_key" env:"OC_MACHINE_AUTH_API_KEY" desc:"Machine auth API key used to validate internal requests necessary for the access to resources from other services." introductionVersion:"1.0.0"`
TransferSecret string `mask:"password" yaml:"transfer_secret,omitempty" env:"REVA_TRANSFER_SECRET" desc:"The secret used for signing the requests towards the data gateway for up- and downloads." introductionVersion:"1.0.0"`
SystemUserID string `yaml:"system_user_id" env:"OC_SYSTEM_USER_ID" desc:"ID of the OpenCloud storage-system system user. Admins need to set the ID for the storage-system system user in this config option which is then used to reference the user. Any reasonable long string is possible, preferably this would be an UUIDv4 format." introductionVersion:"1.0.0"`
SystemUserAPIKey string `mask:"password" yaml:"system_user_api_key" env:"SYSTEM_USER_API_KEY" desc:"API key for all system users." introductionVersion:"1.0.0"`
AdminUserID string `yaml:"admin_user_id" env:"OC_ADMIN_USER_ID" desc:"ID of a user, that should receive admin privileges. Consider that the UUID can be encoded in some LDAP deployment configurations like in .ldif files. These need to be decoded beforehand." introductionVersion:"1.0.0"`
MultiTenantEnabled bool `yaml:"multi_tenant_enabled" env:"OC_MULTI_TENANT_ENABLED" desc:"Set this to true to enable multi-tenant support." introductionVersion:"%%NEXT%%"`
// NOTE: you will not fing GRPCMaxReceivedMessageSize size being used in the code. The envvar is actually extracted in revas `pool` package: https://github.com/cs3org/reva/blob/edge/pkg/rgrpc/todo/pool/connection.go
// It is mentioned here again so it is documented

47
pkg/x/net/listener.go Normal file
View File

@@ -0,0 +1,47 @@
package net
import (
"io"
gonet "net"
"time"
)
type TimeoutListener struct {
gonet.Listener
ReadTimeout time.Duration
}
func (l TimeoutListener) Accept() (gonet.Conn, error) {
c, err := l.Listener.Accept()
if err != nil {
return nil, err
}
return &TimeoutConn{Conn: c, readTimeout: l.ReadTimeout}, nil
}
type TimeoutConn struct {
gonet.Conn
readTimeout time.Duration
bodyDone bool
}
// Read implements a read with a sliding timeout window.
func (c *TimeoutConn) Read(b []byte) (int, error) {
if c.readTimeout > 0 && !c.bodyDone {
if err := c.SetReadDeadline(time.Now().Add(c.readTimeout)); err != nil {
return 0, err
}
}
n, err := c.Conn.Read(b)
if n > 0 && c.readTimeout > 0 && !c.bodyDone {
// reset deadline after every successful read
_ = c.SetReadDeadline(time.Now().Add(c.readTimeout))
}
if err == io.EOF {
c.bodyDone = true
}
return n, err
}

View File

@@ -4,11 +4,12 @@ import (
"fmt"
"os"
"go-micro.dev/v4"
pkgcrypto "github.com/opencloud-eu/opencloud/pkg/crypto"
"github.com/opencloud-eu/opencloud/pkg/service/http"
"github.com/opencloud-eu/opencloud/pkg/shared"
"github.com/opencloud-eu/opencloud/pkg/version"
"go-micro.dev/v4"
)
// Server initializes the http service and server.
@@ -40,6 +41,7 @@ func Server(opts ...Option) (http.Service, error) {
Cert: options.Config.HTTP.TLSCert,
Key: options.Config.HTTP.TLSKey,
}),
http.TimeoutConfig(options.Config.Commons.HTTPServiceTimeout),
http.Logger(options.Logger),
http.Address(options.Config.HTTP.Addr),
http.Namespace(options.Config.HTTP.Namespace),