From 2b6e75ad7f518ef36df34449911bf5a6413e247f Mon Sep 17 00:00:00 2001 From: jkoberg Date: Wed, 5 Jun 2024 16:20:37 +0200 Subject: [PATCH] feat(activitylog): store activities per resource Signed-off-by: jkoberg --- changelog/unreleased/activity-service.md | 5 ++ ocis/pkg/init/init.go | 8 ++ services/activitylog/pkg/command/server.go | 26 +++++- services/activitylog/pkg/config/config.go | 11 +++ .../pkg/config/defaults/defaultconfig.go | 9 +- services/activitylog/pkg/service/options.go | 10 +++ services/activitylog/pkg/service/service.go | 89 ++++++++++++++++++- 7 files changed, 153 insertions(+), 5 deletions(-) create mode 100644 changelog/unreleased/activity-service.md diff --git a/changelog/unreleased/activity-service.md b/changelog/unreleased/activity-service.md new file mode 100644 index 0000000000..1058280a06 --- /dev/null +++ b/changelog/unreleased/activity-service.md @@ -0,0 +1,5 @@ +Enhancement: Activitylog Service + +Adds a new service `activitylog` which stores events (activities) per resource. This data can be retrieved by clients to show item activities + +https://github.com/owncloud/ocis/pull/9327 diff --git a/ocis/pkg/init/init.go b/ocis/pkg/init/init.go index 9b68605b19..f6adc4e2a6 100644 --- a/ocis/pkg/init/init.go +++ b/ocis/pkg/init/init.go @@ -172,6 +172,10 @@ type Nats struct { } } +type Activitylog struct { + ServiceAccount ServiceAccount `yaml:"service_account"` +} + // ServiceAccount is the configuration for the used service account type ServiceAccount struct { ServiceAccountID string `yaml:"service_account_id"` @@ -221,6 +225,7 @@ type OcisConfig struct { Userlog Userlog AuthService AuthService `yaml:"auth_service"` Clientlog Clientlog + Activitylog Activitylog } func checkConfigPath(configPath string) error { @@ -429,6 +434,9 @@ func CreateConfig(insecure, forceOverwrite bool, configPath, adminPassword strin Settings: SettingsService{ ServiceAccountIDs: []string{serviceAccount.ServiceAccountID}, }, + Activitylog: Activitylog{ + ServiceAccount: serviceAccount, + }, } if insecure { diff --git a/services/activitylog/pkg/command/server.go b/services/activitylog/pkg/command/server.go index 9f7e94e2de..21fb84944f 100644 --- a/services/activitylog/pkg/command/server.go +++ b/services/activitylog/pkg/command/server.go @@ -7,10 +7,12 @@ import ( "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/events/stream" + "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/v2/pkg/store" "github.com/oklog/run" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" "github.com/owncloud/ocis/v2/ocis-pkg/handlers" + "github.com/owncloud/ocis/v2/ocis-pkg/registry" "github.com/owncloud/ocis/v2/ocis-pkg/service/debug" "github.com/owncloud/ocis/v2/ocis-pkg/tracing" "github.com/owncloud/ocis/v2/ocis-pkg/version" @@ -24,7 +26,7 @@ import ( ) var _registeredEvents = []events.Unmarshaller{ - events.PostprocessingFinished{}, + events.UploadReady{}, } // Server is the entrypoint for the server command. @@ -40,6 +42,7 @@ func Server(cfg *config.Config) *cli.Command { logger := logging.Configure(cfg.Service.Name, cfg.Log) tracerProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name) if err != nil { + logger.Error().Err(err).Msg("Failed to initialize tracer") return err } @@ -58,6 +61,7 @@ func Server(cfg *config.Config) *cli.Command { evStream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events)) if err != nil { + logger.Error().Err(err).Msg("Failed to initialize event stream") return err } @@ -71,6 +75,23 @@ func Server(cfg *config.Config) *cli.Command { store.Authentication(cfg.Store.AuthUsername, cfg.Store.AuthPassword), ) + tm, err := pool.StringToTLSMode(cfg.GRPCClientTLS.Mode) + if err != nil { + logger.Error().Err(err).Msg("Failed to parse tls mode") + return err + } + gatewaySelector, err := pool.GatewaySelector( + cfg.RevaGateway, + pool.WithTLSCACert(cfg.GRPCClientTLS.CACert), + pool.WithTLSMode(tm), + pool.WithRegistry(registry.GetRegistry()), + pool.WithTracerProvider(tracerProvider), + ) + if err != nil { + logger.Error().Err(err).Msg("Failed to initialize gateway selector") + return fmt.Errorf("could not get reva client selector: %s", err) + } + { svc, err := service.New( service.Logger(logger), @@ -79,10 +100,11 @@ func Server(cfg *config.Config) *cli.Command { service.Stream(evStream), service.RegisteredEvents(_registeredEvents), service.Store(evStore), + service.GatewaySelector(gatewaySelector), ) if err != nil { - logger.Info().Err(err).Str("transport", "http").Msg("Failed to initialize server") + logger.Error().Err(err).Str("transport", "http").Msg("Failed to initialize server") return err } diff --git a/services/activitylog/pkg/config/config.go b/services/activitylog/pkg/config/config.go index b1df9abc90..7cd61a24c3 100644 --- a/services/activitylog/pkg/config/config.go +++ b/services/activitylog/pkg/config/config.go @@ -20,6 +20,11 @@ type Config struct { Events Events `yaml:"events"` Store Store `yaml:"store"` + RevaGateway string `yaml:"reva_gateway" env:"OCIS_REVA_GATEWAY" desc:"CS3 gateway used to look up user metadata" introductionVersion:"5.0"` + GRPCClientTLS *shared.GRPCClientTLS `yaml:"grpc_client_tls"` + + ServiceAccount ServiceAccount `yaml:"service_account"` + Context context.Context `yaml:"-"` } @@ -45,3 +50,9 @@ type Store struct { AuthUsername string `yaml:"username" env:"OCIS_PERSISTENT_STORE_AUTH_USERNAME;ACTIVITYLOG_STORE_AUTH_USERNAME" desc:"The username to authenticate with the store. Only applies when store type 'nats-js-kv' is configured." introductionVersion:"5.0"` AuthPassword string `yaml:"password" env:"OCIS_PERSISTENT_STORE_AUTH_PASSWORD;ACTIVITYLOG_STORE_AUTH_PASSWORD" desc:"The password to authenticate with the store. Only applies when store type 'nats-js-kv' is configured." introductionVersion:"5.0"` } + +// ServiceAccount is the configuration for the used service account +type ServiceAccount struct { + ServiceAccountID string `yaml:"service_account_id" env:"OCIS_SERVICE_ACCOUNT_ID;ACTIVITYLOG_SERVICE_ACCOUNT_ID" desc:"The ID of the service account the service should use. See the 'auth-service' service description for more details." introductionVersion:"5.0"` + ServiceAccountSecret string `yaml:"service_account_secret" env:"OCIS_SERVICE_ACCOUNT_SECRET;ACTIVITYOG_SERVICE_ACCOUNT_SECRET" desc:"The service account secret." introductionVersion:"5.0"` +} diff --git a/services/activitylog/pkg/config/defaults/defaultconfig.go b/services/activitylog/pkg/config/defaults/defaultconfig.go index f5d52d6a96..84edc80fd3 100644 --- a/services/activitylog/pkg/config/defaults/defaultconfig.go +++ b/services/activitylog/pkg/config/defaults/defaultconfig.go @@ -1,6 +1,8 @@ package defaults import ( + "github.com/owncloud/ocis/v2/ocis-pkg/shared" + "github.com/owncloud/ocis/v2/ocis-pkg/structs" "github.com/owncloud/ocis/v2/services/activitylog/pkg/config" ) @@ -32,9 +34,10 @@ func DefaultConfig() *config.Config { Store: config.Store{ Store: "nats-js-kv", Nodes: []string{"127.0.0.1:9233"}, - Database: "postprocessing", + Database: "activitylog", Table: "", }, + RevaGateway: shared.DefaultRevaConfig().Address, } } @@ -63,6 +66,10 @@ func EnsureDefaults(cfg *config.Config) { } else if cfg.Tracing == nil { cfg.Tracing = &config.Tracing{} } + + if cfg.GRPCClientTLS == nil && cfg.Commons != nil { + cfg.GRPCClientTLS = structs.CopyOrZeroValue(cfg.Commons.GRPCClientTLS) + } } // Sanitize sanitizes the config diff --git a/services/activitylog/pkg/service/options.go b/services/activitylog/pkg/service/options.go index 145ac5c656..cc96d16034 100644 --- a/services/activitylog/pkg/service/options.go +++ b/services/activitylog/pkg/service/options.go @@ -1,7 +1,9 @@ package service import ( + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/services/activitylog/pkg/config" microstore "go-micro.dev/v4/store" @@ -19,6 +21,7 @@ type Options struct { Stream events.Stream RegisteredEvents []events.Unmarshaller Store microstore.Store + GatewaySelector pool.Selectable[gateway.GatewayAPIClient] } // Logger configures a logger for the activitylog service @@ -62,3 +65,10 @@ func Store(store microstore.Store) Option { o.Store = store } } + +// GatewaySelector adds a grpc client selector for the gateway service +func GatewaySelector(gatewaySelector pool.Selectable[gateway.GatewayAPIClient]) Option { + return func(o *Options) { + o.GatewaySelector = gatewaySelector + } +} diff --git a/services/activitylog/pkg/service/service.go b/services/activitylog/pkg/service/service.go index 185224fdaa..e278957abb 100644 --- a/services/activitylog/pkg/service/service.go +++ b/services/activitylog/pkg/service/service.go @@ -1,11 +1,17 @@ package service import ( + "encoding/json" "errors" "fmt" "time" + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" + "github.com/cs3org/reva/v2/pkg/storagespace" + "github.com/cs3org/reva/v2/pkg/utils" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/services/activitylog/pkg/config" microstore "go-micro.dev/v4/store" @@ -24,6 +30,7 @@ type ActivitylogService struct { log log.Logger events <-chan events.Event store microstore.Store + gws pool.Selectable[gateway.GatewayAPIClient] } // New is what you need to implement. @@ -51,6 +58,7 @@ func New(opts ...Option) (*ActivitylogService, error) { cfg: o.Config, events: ch, store: o.Store, + gws: o.GatewaySelector, } return s, nil @@ -59,10 +67,87 @@ func New(opts ...Option) (*ActivitylogService, error) { // Run runs the service func (a *ActivitylogService) Run() error { for e := range a.events { + var err error switch ev := e.Event.(type) { - case events.PostprocessingFinished: - fmt.Println("PostprocessingFinished event received", ev) + case events.UploadReady: + err = a.addActivity(ev.FileRef, e.ID, utils.TSToTime(ev.Timestamp)) + } + + if err != nil { + a.log.Error().Err(err).Interface("event", e).Msg("could not process event") } } return nil } + +func (a *ActivitylogService) addActivity(initRef *provider.Reference, eventID string, timestamp time.Time) error { + gwc, err := a.gws.Next() + if err != nil { + return fmt.Errorf("cant get gateway client: %w", err) + } + + ctx, err := utils.GetServiceUserContext(a.cfg.ServiceAccount.ServiceAccountID, gwc, a.cfg.ServiceAccount.ServiceAccountSecret) + if err != nil { + return fmt.Errorf("cant get service user context: %w", err) + } + + var info *provider.ResourceInfo + depth, ref := 0, initRef + for { + if err := a.addActivityToReference(ref, eventID, depth, timestamp); err != nil { + return fmt.Errorf("could not store activity: %w", err) + } + + if info != nil && utils.IsSpaceRoot(info) { + return nil + } + + info, err = utils.GetResource(ctx, ref, gwc) + if err != nil { + return fmt.Errorf("could not get resource info: %w", err) + } + + depth++ + ref = &provider.Reference{ResourceId: info.GetParentId()} + } +} + +func (a *ActivitylogService) addActivityToReference(ref *provider.Reference, eventID string, depth int, timestamp time.Time) error { + fileID, err := storagespace.FormatReference(ref) + if err != nil { + return err + } + + return a.storeActivity(fileID, Activity{ + EventID: eventID, + Depth: depth, + Timestamp: timestamp, + }) +} + +func (a *ActivitylogService) storeActivity(resourceID string, activity Activity) error { + records, err := a.store.Read(resourceID) + if err != nil { + return err + } + + var activities []Activity + if len(records) > 0 { + if err := json.Unmarshal(records[0].Value, &activities); err != nil { + return err + } + } + + // TODO: max len check? + activities = append(activities, activity) + + b, err := json.Marshal(activities) + if err != nil { + return err + } + + return a.store.Write(µstore.Record{ + Key: resourceID, + Value: b, + }) +}