mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-05-18 21:46:19 -04:00
feat(activitylog): store activities per resource
Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
5
changelog/unreleased/activity-service.md
Normal file
5
changelog/unreleased/activity-service.md
Normal file
@@ -0,0 +1,5 @@
|
||||
Enhancement: Activitylog Service
|
||||
|
||||
Adds a new service `activitylog` which stores events (activities) per resource. This data can be retrieved by clients to show item activities
|
||||
|
||||
https://github.com/owncloud/ocis/pull/9327
|
||||
@@ -172,6 +172,10 @@ type Nats struct {
|
||||
}
|
||||
}
|
||||
|
||||
type Activitylog struct {
|
||||
ServiceAccount ServiceAccount `yaml:"service_account"`
|
||||
}
|
||||
|
||||
// ServiceAccount is the configuration for the used service account
|
||||
type ServiceAccount struct {
|
||||
ServiceAccountID string `yaml:"service_account_id"`
|
||||
@@ -221,6 +225,7 @@ type OcisConfig struct {
|
||||
Userlog Userlog
|
||||
AuthService AuthService `yaml:"auth_service"`
|
||||
Clientlog Clientlog
|
||||
Activitylog Activitylog
|
||||
}
|
||||
|
||||
func checkConfigPath(configPath string) error {
|
||||
@@ -429,6 +434,9 @@ func CreateConfig(insecure, forceOverwrite bool, configPath, adminPassword strin
|
||||
Settings: SettingsService{
|
||||
ServiceAccountIDs: []string{serviceAccount.ServiceAccountID},
|
||||
},
|
||||
Activitylog: Activitylog{
|
||||
ServiceAccount: serviceAccount,
|
||||
},
|
||||
}
|
||||
|
||||
if insecure {
|
||||
|
||||
@@ -7,10 +7,12 @@ import (
|
||||
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
"github.com/cs3org/reva/v2/pkg/events/stream"
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/cs3org/reva/v2/pkg/store"
|
||||
"github.com/oklog/run"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/version"
|
||||
@@ -24,7 +26,7 @@ import (
|
||||
)
|
||||
|
||||
var _registeredEvents = []events.Unmarshaller{
|
||||
events.PostprocessingFinished{},
|
||||
events.UploadReady{},
|
||||
}
|
||||
|
||||
// Server is the entrypoint for the server command.
|
||||
@@ -40,6 +42,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
logger := logging.Configure(cfg.Service.Name, cfg.Log)
|
||||
tracerProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("Failed to initialize tracer")
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -58,6 +61,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
|
||||
evStream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("Failed to initialize event stream")
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -71,6 +75,23 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
store.Authentication(cfg.Store.AuthUsername, cfg.Store.AuthPassword),
|
||||
)
|
||||
|
||||
tm, err := pool.StringToTLSMode(cfg.GRPCClientTLS.Mode)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("Failed to parse tls mode")
|
||||
return err
|
||||
}
|
||||
gatewaySelector, err := pool.GatewaySelector(
|
||||
cfg.RevaGateway,
|
||||
pool.WithTLSCACert(cfg.GRPCClientTLS.CACert),
|
||||
pool.WithTLSMode(tm),
|
||||
pool.WithRegistry(registry.GetRegistry()),
|
||||
pool.WithTracerProvider(tracerProvider),
|
||||
)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("Failed to initialize gateway selector")
|
||||
return fmt.Errorf("could not get reva client selector: %s", err)
|
||||
}
|
||||
|
||||
{
|
||||
svc, err := service.New(
|
||||
service.Logger(logger),
|
||||
@@ -79,10 +100,11 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
service.Stream(evStream),
|
||||
service.RegisteredEvents(_registeredEvents),
|
||||
service.Store(evStore),
|
||||
service.GatewaySelector(gatewaySelector),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
logger.Info().Err(err).Str("transport", "http").Msg("Failed to initialize server")
|
||||
logger.Error().Err(err).Str("transport", "http").Msg("Failed to initialize server")
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -20,6 +20,11 @@ type Config struct {
|
||||
Events Events `yaml:"events"`
|
||||
Store Store `yaml:"store"`
|
||||
|
||||
RevaGateway string `yaml:"reva_gateway" env:"OCIS_REVA_GATEWAY" desc:"CS3 gateway used to look up user metadata" introductionVersion:"5.0"`
|
||||
GRPCClientTLS *shared.GRPCClientTLS `yaml:"grpc_client_tls"`
|
||||
|
||||
ServiceAccount ServiceAccount `yaml:"service_account"`
|
||||
|
||||
Context context.Context `yaml:"-"`
|
||||
}
|
||||
|
||||
@@ -45,3 +50,9 @@ type Store struct {
|
||||
AuthUsername string `yaml:"username" env:"OCIS_PERSISTENT_STORE_AUTH_USERNAME;ACTIVITYLOG_STORE_AUTH_USERNAME" desc:"The username to authenticate with the store. Only applies when store type 'nats-js-kv' is configured." introductionVersion:"5.0"`
|
||||
AuthPassword string `yaml:"password" env:"OCIS_PERSISTENT_STORE_AUTH_PASSWORD;ACTIVITYLOG_STORE_AUTH_PASSWORD" desc:"The password to authenticate with the store. Only applies when store type 'nats-js-kv' is configured." introductionVersion:"5.0"`
|
||||
}
|
||||
|
||||
// ServiceAccount is the configuration for the used service account
|
||||
type ServiceAccount struct {
|
||||
ServiceAccountID string `yaml:"service_account_id" env:"OCIS_SERVICE_ACCOUNT_ID;ACTIVITYLOG_SERVICE_ACCOUNT_ID" desc:"The ID of the service account the service should use. See the 'auth-service' service description for more details." introductionVersion:"5.0"`
|
||||
ServiceAccountSecret string `yaml:"service_account_secret" env:"OCIS_SERVICE_ACCOUNT_SECRET;ACTIVITYOG_SERVICE_ACCOUNT_SECRET" desc:"The service account secret." introductionVersion:"5.0"`
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package defaults
|
||||
|
||||
import (
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/structs"
|
||||
"github.com/owncloud/ocis/v2/services/activitylog/pkg/config"
|
||||
)
|
||||
|
||||
@@ -32,9 +34,10 @@ func DefaultConfig() *config.Config {
|
||||
Store: config.Store{
|
||||
Store: "nats-js-kv",
|
||||
Nodes: []string{"127.0.0.1:9233"},
|
||||
Database: "postprocessing",
|
||||
Database: "activitylog",
|
||||
Table: "",
|
||||
},
|
||||
RevaGateway: shared.DefaultRevaConfig().Address,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,6 +66,10 @@ func EnsureDefaults(cfg *config.Config) {
|
||||
} else if cfg.Tracing == nil {
|
||||
cfg.Tracing = &config.Tracing{}
|
||||
}
|
||||
|
||||
if cfg.GRPCClientTLS == nil && cfg.Commons != nil {
|
||||
cfg.GRPCClientTLS = structs.CopyOrZeroValue(cfg.Commons.GRPCClientTLS)
|
||||
}
|
||||
}
|
||||
|
||||
// Sanitize sanitizes the config
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
"github.com/owncloud/ocis/v2/services/activitylog/pkg/config"
|
||||
microstore "go-micro.dev/v4/store"
|
||||
@@ -19,6 +21,7 @@ type Options struct {
|
||||
Stream events.Stream
|
||||
RegisteredEvents []events.Unmarshaller
|
||||
Store microstore.Store
|
||||
GatewaySelector pool.Selectable[gateway.GatewayAPIClient]
|
||||
}
|
||||
|
||||
// Logger configures a logger for the activitylog service
|
||||
@@ -62,3 +65,10 @@ func Store(store microstore.Store) Option {
|
||||
o.Store = store
|
||||
}
|
||||
}
|
||||
|
||||
// GatewaySelector adds a grpc client selector for the gateway service
|
||||
func GatewaySelector(gatewaySelector pool.Selectable[gateway.GatewayAPIClient]) Option {
|
||||
return func(o *Options) {
|
||||
o.GatewaySelector = gatewaySelector
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,17 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
|
||||
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/cs3org/reva/v2/pkg/storagespace"
|
||||
"github.com/cs3org/reva/v2/pkg/utils"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
"github.com/owncloud/ocis/v2/services/activitylog/pkg/config"
|
||||
microstore "go-micro.dev/v4/store"
|
||||
@@ -24,6 +30,7 @@ type ActivitylogService struct {
|
||||
log log.Logger
|
||||
events <-chan events.Event
|
||||
store microstore.Store
|
||||
gws pool.Selectable[gateway.GatewayAPIClient]
|
||||
}
|
||||
|
||||
// New is what you need to implement.
|
||||
@@ -51,6 +58,7 @@ func New(opts ...Option) (*ActivitylogService, error) {
|
||||
cfg: o.Config,
|
||||
events: ch,
|
||||
store: o.Store,
|
||||
gws: o.GatewaySelector,
|
||||
}
|
||||
|
||||
return s, nil
|
||||
@@ -59,10 +67,87 @@ func New(opts ...Option) (*ActivitylogService, error) {
|
||||
// Run runs the service
|
||||
func (a *ActivitylogService) Run() error {
|
||||
for e := range a.events {
|
||||
var err error
|
||||
switch ev := e.Event.(type) {
|
||||
case events.PostprocessingFinished:
|
||||
fmt.Println("PostprocessingFinished event received", ev)
|
||||
case events.UploadReady:
|
||||
err = a.addActivity(ev.FileRef, e.ID, utils.TSToTime(ev.Timestamp))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
a.log.Error().Err(err).Interface("event", e).Msg("could not process event")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *ActivitylogService) addActivity(initRef *provider.Reference, eventID string, timestamp time.Time) error {
|
||||
gwc, err := a.gws.Next()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cant get gateway client: %w", err)
|
||||
}
|
||||
|
||||
ctx, err := utils.GetServiceUserContext(a.cfg.ServiceAccount.ServiceAccountID, gwc, a.cfg.ServiceAccount.ServiceAccountSecret)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cant get service user context: %w", err)
|
||||
}
|
||||
|
||||
var info *provider.ResourceInfo
|
||||
depth, ref := 0, initRef
|
||||
for {
|
||||
if err := a.addActivityToReference(ref, eventID, depth, timestamp); err != nil {
|
||||
return fmt.Errorf("could not store activity: %w", err)
|
||||
}
|
||||
|
||||
if info != nil && utils.IsSpaceRoot(info) {
|
||||
return nil
|
||||
}
|
||||
|
||||
info, err = utils.GetResource(ctx, ref, gwc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get resource info: %w", err)
|
||||
}
|
||||
|
||||
depth++
|
||||
ref = &provider.Reference{ResourceId: info.GetParentId()}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *ActivitylogService) addActivityToReference(ref *provider.Reference, eventID string, depth int, timestamp time.Time) error {
|
||||
fileID, err := storagespace.FormatReference(ref)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return a.storeActivity(fileID, Activity{
|
||||
EventID: eventID,
|
||||
Depth: depth,
|
||||
Timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
func (a *ActivitylogService) storeActivity(resourceID string, activity Activity) error {
|
||||
records, err := a.store.Read(resourceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var activities []Activity
|
||||
if len(records) > 0 {
|
||||
if err := json.Unmarshal(records[0].Value, &activities); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: max len check?
|
||||
activities = append(activities, activity)
|
||||
|
||||
b, err := json.Marshal(activities)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return a.store.Write(µstore.Record{
|
||||
Key: resourceID,
|
||||
Value: b,
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user