diff --git a/changelog/unreleased/postprocessing-bulk-restart.md b/changelog/unreleased/postprocessing-bulk-restart.md new file mode 100644 index 0000000000..d2816f4f5a --- /dev/null +++ b/changelog/unreleased/postprocessing-bulk-restart.md @@ -0,0 +1,5 @@ +Enhancement: Allow restarting multiple uploads with one command + +Allows to restart all commands in a specific state. + +https://github.com/owncloud/ocis/pull/8287 diff --git a/services/postprocessing/README.md b/services/postprocessing/README.md index 282278d04c..9e08ba2180 100644 --- a/services/postprocessing/README.md +++ b/services/postprocessing/README.md @@ -90,3 +90,11 @@ ocis storage-users uploads list ```bash ocis postprocessing restart -u ``` + +Instead of starting one specific upload, a system admin can also restart all uploads that are currently in a specific step. +Examples: +``` +ocis postprocessing restart # Restarts all uploads where postprocessing is finished, but upload is not finished +ocis postprocessing restart -s "finished" # Equivalent to the above +ocis postprocessing restart -s "virusscan" # Restart all uploads currently in virusscan step +``` diff --git a/services/postprocessing/pkg/command/postprocessing.go b/services/postprocessing/pkg/command/postprocessing.go index 99c853a0c9..ee15588963 100644 --- a/services/postprocessing/pkg/command/postprocessing.go +++ b/services/postprocessing/pkg/command/postprocessing.go @@ -2,7 +2,6 @@ package command import ( "context" - "time" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/events/stream" @@ -20,10 +19,15 @@ func RestartPostprocessing(cfg *config.Config) *cli.Command { Usage: "restart postprocessing for an uploadID", Flags: []cli.Flag{ &cli.StringFlag{ - Name: "upload-id", - Aliases: []string{"u"}, - Required: true, - Usage: "the uploadid to restart", + Name: "upload-id", + Aliases: []string{"u"}, + Usage: "the uploadid to restart. Ignored if unset.", + }, + &cli.StringFlag{ + Name: "step", + Aliases: []string{"s"}, + Usage: "restarts all uploads in the given postprocessing step. Ignored if upload-id is set.", + Value: "finished", // Calling `ocis postprocessing restart` without any arguments will restart all uploads that are finished but failed to move the uploed from the upload area to the blobstore. }, }, Before: func(c *cli.Context) error { @@ -35,24 +39,18 @@ func RestartPostprocessing(cfg *config.Config) *cli.Command { return err } + uid, step := c.String("upload-id"), "" + if uid == "" { + step = c.String("step") + } + ev := events.ResumePostprocessing{ - UploadID: c.String("upload-id"), + UploadID: uid, + Step: events.Postprocessingstep(step), Timestamp: utils.TSNow(), } - if err := events.Publish(context.Background(), stream, ev); err != nil { - return err - } - - // go-micro nats implementation uses async publishing, - // therefore we need to manually wait. - // - // FIXME: upstream pr - // - // https://github.com/go-micro/plugins/blob/3e77393890683be4bacfb613bc5751867d584692/v4/events/natsjs/nats.go#L115 - time.Sleep(5 * time.Second) - - return nil + return events.Publish(context.Background(), stream, ev) }, } } diff --git a/services/postprocessing/pkg/postprocessing/postprocessing.go b/services/postprocessing/pkg/postprocessing/postprocessing.go index b2764f43f9..e3c9ca6bff 100644 --- a/services/postprocessing/pkg/postprocessing/postprocessing.go +++ b/services/postprocessing/pkg/postprocessing/postprocessing.go @@ -65,14 +65,14 @@ func (pp *Postprocessing) NextStep(ev events.PostprocessingStepFinished) interfa // CurrentStep returns the current postprocessing step func (pp *Postprocessing) CurrentStep() interface{} { - if pp.Status.Outcome != "" { + if pp.Status.CurrentStep == events.PPStepFinished { return pp.finished(pp.Status.Outcome) } return pp.step(pp.Status.CurrentStep) } // Delay will sleep the configured time then continue -func (pp *Postprocessing) Delay(ev events.StartPostprocessingStep) interface{} { +func (pp *Postprocessing) Delay() interface{} { time.Sleep(pp.config.Delayprocessing) return pp.next(events.PPStepDelay) } @@ -106,6 +106,7 @@ func (pp *Postprocessing) step(next events.Postprocessingstep) events.StartPostp } func (pp *Postprocessing) finished(outcome events.PostprocessingOutcome) events.PostprocessingFinished { + pp.Status.CurrentStep = events.PPStepFinished pp.Status.Outcome = outcome return events.PostprocessingFinished{ UploadID: pp.ID, diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index 50355001b1..f2a3c122e2 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -8,6 +8,7 @@ import ( "time" "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/utils" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" "github.com/owncloud/ocis/v2/services/postprocessing/pkg/postprocessing" @@ -142,29 +143,20 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") return fmt.Errorf("%w: cannot get upload", errEvent) } - next = pp.Delay(ev) + next = pp.Delay() case events.UploadReady: + if ev.Failed { + // the upload failed - let's keep it around for a while + return nil + } + // the storage provider thinks the upload is done - so no need to keep it any more if err := pps.store.Delete(ev.UploadID); err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload") return fmt.Errorf("%w: cannot delete upload", errEvent) } case events.ResumePostprocessing: - pp, err = pps.getPP(pps.store, ev.UploadID) - if err != nil { - if err == store.ErrNotFound { - if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{ - UploadID: ev.UploadID, - Timestamp: ev.Timestamp, - }); err != nil { - pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot publish RestartPostprocessing event") - } - return fmt.Errorf("%w: cannot publish RestartPostprocessing event", errEvent) - } - pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") - return fmt.Errorf("%w: cannot get upload", errEvent) - } - next = pp.CurrentStep() + return pps.handleResumePPEvent(ctx, ev) } if pp != nil { @@ -182,30 +174,6 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { return nil } -func getSteps(c config.Postprocessing) []events.Postprocessingstep { - // NOTE: improved version only allows configuring order of postprocessing steps - // But we aim for a system where postprocessing steps can be configured per space, ideally by the spaceadmin itself - // We need to iterate over configuring PP service when we see fit - var steps []events.Postprocessingstep - for _, s := range c.Steps { - steps = append(steps, events.Postprocessingstep(s)) - } - - return steps -} - -func storePP(sto store.Store, pp *postprocessing.Postprocessing) error { - b, err := json.Marshal(pp) - if err != nil { - return err - } - - return sto.Write(&store.Record{ - Key: pp.ID, - Value: b, - }) -} - func (pps *PostprocessingService) getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) { recs, err := sto.Read(uploadID) if err != nil { @@ -224,3 +192,95 @@ func (pps *PostprocessingService) getPP(sto store.Store, uploadID string) (*post return pp, nil } + +func getSteps(c config.Postprocessing) []events.Postprocessingstep { + // NOTE: improved version only allows configuring order of postprocessing steps + // But we aim for a system where postprocessing steps can be configured per space, ideally by the spaceadmin itself + // We need to iterate over configuring PP service when we see fit + steps := make([]events.Postprocessingstep, 0, len(c.Steps)) + for _, s := range c.Steps { + steps = append(steps, events.Postprocessingstep(s)) + } + + return steps +} + +func storePP(sto store.Store, pp *postprocessing.Postprocessing) error { + b, err := json.Marshal(pp) + if err != nil { + return err + } + + return sto.Write(&store.Record{ + Key: pp.ID, + Value: b, + }) +} + +func (pps *PostprocessingService) handleResumePPEvent(ctx context.Context, ev events.ResumePostprocessing) error { + ids := []string{ev.UploadID} + if ev.Step != "" { + ids = pps.findUploadsByStep(ev.Step) + } + + for _, id := range ids { + if err := pps.resumePP(ctx, id); err != nil { + pps.log.Error().Str("uploadID", id).Err(err).Msg("cannot resume upload") + return fmt.Errorf("%w: cannot resume upload", errEvent) + } + } + return nil +} + +func (pps *PostprocessingService) resumePP(ctx context.Context, uploadID string) error { + pp, err := pps.getPP(pps.store, uploadID) + if err != nil { + if err == store.ErrNotFound { + if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{ + UploadID: uploadID, + Timestamp: utils.TSNow(), + }); err != nil { + return err + } + return nil + } + return fmt.Errorf("%w: cannot get upload", errEvent) + } + + return events.Publish(ctx, pps.pub, pp.CurrentStep()) +} + +func (pps *PostprocessingService) findUploadsByStep(step events.Postprocessingstep) []string { + var ids []string + + keys, err := pps.store.List() + if err != nil { + pps.log.Error().Err(err).Msg("cannot list uploads") + } + + for _, k := range keys { + rec, err := pps.store.Read(k) + if err != nil { + pps.log.Error().Err(err).Msg("cannot read upload") + continue + } + + if len(rec) != 1 { + pps.log.Error().Err(err).Msg("expected only one result") + continue + } + + pp := &postprocessing.Postprocessing{} + err = json.Unmarshal(rec[0].Value, pp) + if err != nil { + pps.log.Error().Err(err).Msg("cannot unmarshal upload") + continue + } + + if pp.Status.CurrentStep == step { + ids = append(ids, pp.ID) + } + } + + return ids +}