Files
opencloud/services/storage-users/pkg/command/uploads.go
Jörn Friedrich Dreyer 032c218789 replace .Value.String() with .GetString(...)
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
2025-12-15 16:40:27 +01:00

307 lines
8.6 KiB
Go

package command
import (
"context"
"encoding/json"
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/olekukonko/tablewriter"
"github.com/olekukonko/tablewriter/tw"
"github.com/spf13/cobra"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/services/storage-users/pkg/config"
"github.com/opencloud-eu/opencloud/services/storage-users/pkg/config/parser"
"github.com/opencloud-eu/opencloud/services/storage-users/pkg/event"
"github.com/opencloud-eu/opencloud/services/storage-users/pkg/revaconfig"
"github.com/opencloud-eu/reva/v2/pkg/events"
"github.com/opencloud-eu/reva/v2/pkg/storage"
"github.com/opencloud-eu/reva/v2/pkg/storage/fs/registry"
"github.com/opencloud-eu/reva/v2/pkg/utils"
)
// Session contains the information of an upload session
type Session struct {
ID string `json:"id"`
Space string `json:"space"`
Filename string `json:"filename"`
Offset int64 `json:"offset"`
Size int64 `json:"size"`
Executant userpb.UserId `json:"executant"`
SpaceOwner *userpb.UserId `json:"spaceowner,omitempty"`
Expires time.Time `json:"expires"`
Processing bool `json:"processing"`
ScanDate time.Time `json:"virus_scan_date"`
ScanResult string `json:"virus_scan_result"`
}
// Uploads is the entry point for the uploads command
func Uploads(cfg *config.Config) *cobra.Command {
uploadsCmd := &cobra.Command{
Use: "uploads",
Short: "manage unfinished uploads",
}
uploadsCmd.AddCommand([]*cobra.Command{
ListUploadSessions(cfg),
}...)
return uploadsCmd
}
// ListUploadSessions prints a list of upload sessiens
func ListUploadSessions(cfg *config.Config) *cobra.Command {
listUploadSessionsCmd := &cobra.Command{
Use: "sessions",
Short: "Print a list of upload sessions",
PreRunE: func(cmd *cobra.Command, args []string) error {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
RunE: func(cmd *cobra.Command, args []string) error {
var err error
f, ok := registry.NewFuncs[cfg.Driver]
if !ok {
fmt.Fprintf(os.Stderr, "Unknown filesystem driver '%s'\n", cfg.Driver)
os.Exit(1)
}
drivers := revaconfig.StorageProviderDrivers(cfg)
var fsStream events.Stream
if cfg.Driver == "posix" {
// We need to init the posix driver with 'scanfs' disabled
drivers["posix"] = revaconfig.Posix(cfg, false, false)
// Also posix refuses to start without an events stream
fsStream, err = event.NewStream(cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create event stream for posix driver: %v\n", err)
os.Exit(1)
}
}
fs, err := f(drivers[cfg.Driver].(map[string]interface{}), fsStream, nil)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to initialize filesystem driver '%s'\n", cfg.Driver)
return err
}
managingFS, ok := fs.(storage.UploadSessionLister)
if !ok {
fmt.Fprintf(os.Stderr, "'%s' storage does not support listing upload sessions\n", cfg.Driver)
os.Exit(1)
}
restart, _ := cmd.Flags().GetBool("restart")
resume, _ := cmd.Flags().GetBool("resume")
clean, _ := cmd.Flags().GetBool("clean")
renderJson, _ := cmd.Flags().GetBool("json")
var stream events.Stream
if restart || resume {
stream, err = event.NewStream(cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create event stream: %v\n", err)
os.Exit(1)
}
}
filter := buildFilter(cmd)
uploads, err := managingFS.ListUploadSessions(cmd.Context(), filter)
if err != nil {
return err
}
var (
table *tablewriter.Table
raw []Session
)
if !renderJson {
fmt.Println(buildInfo(filter))
table = tablewriter.NewTable(os.Stdout, tablewriter.WithHeaderAutoFormat(tw.Off))
table.Header([]string{"Space", "Upload Id", "Name", "Offset", "Size", "Executant", "Owner", "Expires", "Processing", "Scan Date", "Scan Result"})
}
for _, u := range uploads {
ref := u.Reference()
sr, sd := u.ScanData()
session := Session{
Space: ref.GetResourceId().GetSpaceId(),
ID: u.ID(),
Filename: u.Filename(),
Offset: u.Offset(),
Size: u.Size(),
Executant: u.Executant(),
SpaceOwner: u.SpaceOwner(),
Expires: u.Expires(),
Processing: u.IsProcessing(),
ScanDate: sd,
ScanResult: sr,
}
if renderJson {
raw = append(raw, session)
} else {
table.Append([]string{
session.Space,
session.ID,
session.Filename,
strconv.FormatInt(session.Offset, 10),
strconv.FormatInt(session.Size, 10),
session.Executant.OpaqueId,
session.SpaceOwner.GetOpaqueId(),
session.Expires.Format(time.RFC3339),
strconv.FormatBool(session.Processing),
session.ScanDate.Format(time.RFC3339),
session.ScanResult,
})
}
switch {
case restart:
if err := events.Publish(context.Background(), stream, events.RestartPostprocessing{
UploadID: u.ID(),
Timestamp: utils.TSNow(),
}); err != nil {
fmt.Fprintf(os.Stderr, "Failed to send restart event for upload session '%s'\n", u.ID())
// if publishing fails there is no need to try publishing other events - they will fail too.
os.Exit(1)
}
case resume:
if err := events.Publish(context.Background(), stream, events.ResumePostprocessing{
UploadID: u.ID(),
Timestamp: utils.TSNow(),
}); err != nil {
fmt.Fprintf(os.Stderr, "Failed to send resume event for upload session '%s'\n", u.ID())
// if publishing fails there is no need to try publishing other events - they will fail too.
os.Exit(1)
}
case clean:
if err := u.Purge(cmd.Context()); err != nil {
fmt.Fprintf(os.Stderr, "Failed to clean upload session '%s'\n", u.ID())
}
}
}
if !renderJson {
table.Render()
return nil
}
j, err := json.Marshal(raw)
if err != nil {
fmt.Println(err)
return err
}
fmt.Println(string(j))
return nil
},
}
listUploadSessionsCmd.Flags().String("id", "", "filter sessions by upload session id")
listUploadSessionsCmd.Flags().Bool("processing", false, "filter sessions by processing status")
listUploadSessionsCmd.Flags().Bool("expired", false, "filter sessions by expired status")
listUploadSessionsCmd.Flags().Bool("has-virus", false, "filter sessions by virus scan result")
listUploadSessionsCmd.Flags().Bool("json", false, "output as json")
listUploadSessionsCmd.Flags().Bool("restart", false, "send restart event for all listed sessions. Only one of resume/restart/clean can be set.")
listUploadSessionsCmd.Flags().Bool("resume", false, "send resume event for all listed sessions. Only one of resume/restart/clean can be set.")
listUploadSessionsCmd.Flags().Bool("clean", false, "remove uploads for all listed sessions. Only one of resume/restart/clean can be set.")
return listUploadSessionsCmd
}
func buildFilter(cmd *cobra.Command) storage.UploadSessionFilter {
filter := storage.UploadSessionFilter{}
if cmd.Flag("processing").Changed {
processingValue, _ := cmd.Flags().GetBool("processing")
filter.Processing = &processingValue
}
if cmd.Flag("expired").Changed {
expiredValue, _ := cmd.Flags().GetBool("expired")
filter.Expired = &expiredValue
}
if cmd.Flag("has-virus").Changed {
infectedValue, _ := cmd.Flags().GetBool("has-virus")
filter.HasVirus = &infectedValue
}
if cmd.Flag("id").Changed {
idValue, _ := cmd.Flags().GetString("id")
if idValue != "" {
filter.ID = &idValue
}
}
return filter
}
func buildInfo(filter storage.UploadSessionFilter) string {
var b strings.Builder
if filter.Processing != nil {
if !*filter.Processing {
b.WriteString("Not ")
}
if b.Len() == 0 {
b.WriteString("Processing")
} else {
b.WriteString("processing")
}
}
if filter.Expired != nil {
if b.Len() != 0 {
b.WriteString(", ")
}
if !*filter.Expired {
if b.Len() == 0 {
b.WriteString("Not ")
} else {
b.WriteString("not ")
}
}
if b.Len() == 0 {
b.WriteString("Expired")
} else {
b.WriteString("expired")
}
}
if filter.HasVirus != nil {
if b.Len() != 0 {
b.WriteString(", ")
}
if !*filter.HasVirus {
if b.Len() == 0 {
b.WriteString("Not ")
} else {
b.WriteString("not ")
}
}
if b.Len() == 0 {
b.WriteString("Virusinfected")
} else {
b.WriteString("virusinfected")
}
}
if b.Len() == 0 {
b.WriteString("Session")
} else {
b.WriteString(" session")
}
if filter.ID != nil {
b.WriteString(" with id '" + *filter.ID + "'")
} else {
// to make `session` plural
b.WriteString("s")
}
b.WriteString(":")
return b.String()
}