Merge pull request #8947 from kobergj/VirusHandling

[full-ci] Rework Virus Handling
This commit is contained in:
kobergj
2024-04-26 10:10:38 +02:00
committed by GitHub
13 changed files with 108 additions and 22 deletions

2
go.mod
View File

@@ -15,7 +15,7 @@ require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/coreos/go-oidc/v3 v3.10.0
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781
github.com/cs3org/reva/v2 v2.19.2-0.20240422150349-51ab7655f858
github.com/cs3org/reva/v2 v2.19.2-0.20240426071117-7ed0671f9e0b
github.com/dhowden/tag v0.0.0-20230630033851-978a0926ee25
github.com/disintegration/imaging v1.6.2
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e

4
go.sum
View File

@@ -1026,8 +1026,8 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c=
github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 h1:BUdwkIlf8IS2FasrrPg8gGPHQPOrQ18MS1Oew2tmGtY=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/reva/v2 v2.19.2-0.20240422150349-51ab7655f858 h1:cohKwOI/6UXXYhrjrZMYxw5GlM8wFS5445TZr/jmSzs=
github.com/cs3org/reva/v2 v2.19.2-0.20240422150349-51ab7655f858/go.mod h1:GRUrOp5HbFVwZTgR9bVrMZ/MvVy+Jhxw1PdMmhhKP9E=
github.com/cs3org/reva/v2 v2.19.2-0.20240426071117-7ed0671f9e0b h1:lYNsGv/E06cD+p5RLmxBVySV17/OVRBiOhL50FEUrGs=
github.com/cs3org/reva/v2 v2.19.2-0.20240426071117-7ed0671f9e0b/go.mod h1:GRUrOp5HbFVwZTgR9bVrMZ/MvVy+Jhxw1PdMmhhKP9E=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg=
github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=

View File

@@ -146,7 +146,7 @@ func (av Antivirus) processEvent(e events.Event, s events.Publisher) error {
}
if av.c.DebugScanOutcome != "" {
av.l.Warn().Str("antivir, clamav", ">>>>>>> ANTIVIRUS_DEBUG_SCAN_OUTCOME IS SET NO ACTUAL VIRUS SCAN IS PERFORMED!")
av.l.Warn().Str("antivir, clamav", ">>>>>>> ANTIVIRUS_DEBUG_SCAN_OUTCOME IS SET NO ACTUAL VIRUS SCAN IS PERFORMED!").Send()
if err := events.Publish(ctx, s, events.PostprocessingStepFinished{
FinishedStep: events.PPStepAntivirus,
Outcome: events.PostprocessingOutcome(av.c.DebugScanOutcome),
@@ -158,7 +158,6 @@ func (av Antivirus) processEvent(e events.Event, s events.Publisher) error {
Description: "DEBUG: forced outcome",
Scandate: time.Now(),
ResourceID: ev.ResourceID,
ErrorMsg: "DEBUG: forced outcome",
},
}); err != nil {
av.l.Fatal().Err(err).Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("cannot publish events - exiting")

View File

@@ -113,6 +113,10 @@ func (cl *ClientlogService) processEvent(event events.Event) {
default:
err = errors.New("unhandled event")
case events.UploadReady:
if e.Failed {
// we don't inform about failed uploads yet
return
}
fileEv("postprocessing-finished", e.FileRef)
case events.ItemTrashed:
evType = "item-trashed"

View File

@@ -22,6 +22,7 @@ type Postprocessing struct {
Status Status
Failures int
InitiatorID string
Finished bool
config config.Postprocessing
}

View File

@@ -150,8 +150,14 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
next = pp.Delay()
case events.UploadReady:
if ev.Failed {
// the upload failed - let's keep it around for a while
return nil
// the upload failed - let's keep it around for a while - but mark it as finished
pp, err = pps.getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", ErrEvent)
}
pp.Finished = true
return storePP(pps.store, pp)
}
// the storage provider thinks the upload is done - so no need to keep it any more
@@ -261,6 +267,11 @@ func (pps *PostprocessingService) resumePP(ctx context.Context, uploadID string)
return fmt.Errorf("cannot get upload: %w", err)
}
if pp.Finished {
// dont retry finished uploads
return nil
}
return events.Publish(ctx, pps.pub, pp.CurrentStep())
}

View File

@@ -191,6 +191,7 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
if c.Bool("json") {
for _, u := range uploads {
ref := u.Reference()
sr, sd := u.ScanData()
s := struct {
ID string `json:"id"`
Space string `json:"space"`
@@ -201,6 +202,8 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
SpaceOwner *userpb.UserId `json:"spaceowner,omitempty"`
Expires time.Time `json:"expires"`
Processing bool `json:"processing"`
ScanDate time.Time `json:"virus_scan_date"`
ScanResult string `json:"virus_scan_result"`
}{
Space: ref.GetResourceId().GetSpaceId(),
ID: u.ID(),
@@ -211,6 +214,8 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
SpaceOwner: u.SpaceOwner(),
Expires: u.Expires(),
Processing: u.IsProcessing(),
ScanDate: sd,
ScanResult: sr,
}
j, err := json.Marshal(s)
if err != nil {
@@ -236,10 +241,11 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
// start a table
table = tw.NewWriter(os.Stdout)
table.SetHeader([]string{"Space", "Upload Id", "Name", "Offset", "Size", "Executant", "Owner", "Expires", "Processing"})
table.SetHeader([]string{"Space", "Upload Id", "Name", "Offset", "Size", "Executant", "Owner", "Expires", "Processing", "Scan Date", "Scan Result"})
table.SetAutoFormatHeaders(false)
for _, u := range uploads {
sr, sd := u.ScanData()
table.Append([]string{
u.Reference().ResourceId.GetSpaceId(),
u.ID(),
@@ -250,6 +256,8 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
u.SpaceOwner().GetOpaqueId(),
u.Expires().Format(time.RFC3339),
strconv.FormatBool(u.IsProcessing()),
sd.Format(time.RFC3339),
sr,
})
if c.Bool("restart") {

View File

@@ -20,6 +20,7 @@ package gateway
import (
"context"
"slices"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
@@ -158,6 +159,9 @@ func (s *svc) updateShare(ctx context.Context, req *collaboration.UpdateShareReq
}
func (s *svc) updateSpaceShare(ctx context.Context, req *collaboration.UpdateShareRequest) (*collaboration.UpdateShareResponse, error) {
if req.GetShare().GetGrantee() == nil {
return &collaboration.UpdateShareResponse{Status: status.NewInvalid(ctx, "updating requires a received grantee object")}, nil
}
// If the share is a denial we call denyGrant instead.
var st *rpc.Status
var err error
@@ -169,32 +173,47 @@ func (s *svc) updateSpaceShare(ctx context.Context, req *collaboration.UpdateSha
}
utils.AppendPlainToOpaque(opaque, "spacetype", utils.ReadPlainFromOpaque(req.Opaque, "spacetype"))
creator := ctxpkg.ContextMustGetUser(ctx)
grant := &provider.Grant{
Grantee: req.GetShare().GetGrantee(),
Permissions: req.GetShare().GetPermissions().GetPermissions(),
Expiration: req.GetShare().GetExpiration(),
Creator: creator.GetId(),
}
if grants.PermissionsEqual(req.Share.GetPermissions().GetPermissions(), &provider.ResourcePermissions{}) {
st, err = s.denyGrant(ctx, req.GetShare().GetResourceId(), req.GetShare().GetGrantee(), opaque)
if err != nil {
return nil, errors.Wrap(err, "gateway: error denying grant in storage")
}
} else {
if !grant.Permissions.RemoveGrant {
listGrantRes, err := s.listGrants(ctx, req.GetShare().GetResourceId())
if err != nil {
return nil, errors.Wrap(err, "gateway: error getting grant to remove from storage")
}
existsGrant := s.getGranteeGrant(listGrantRes.GetGrants(), req.GetShare().GetGrantee())
if !slices.Contains(req.GetUpdateMask().GetPaths(), "permissions") {
req.Share.Permissions = &collaboration.SharePermissions{Permissions: existsGrant.GetPermissions()}
}
if !slices.Contains(req.GetUpdateMask().GetPaths(), "expiration") {
req.Share.Expiration = existsGrant.GetExpiration()
}
grant := &provider.Grant{
Grantee: req.GetShare().GetGrantee(),
Permissions: req.GetShare().GetPermissions().GetPermissions(),
Expiration: req.GetShare().GetExpiration(),
Creator: ctxpkg.ContextMustGetUser(ctx).GetId(),
}
if grant.GetPermissions() == nil {
return &collaboration.UpdateShareResponse{Status: status.NewInvalid(ctx, "updating requires a received permission object")}, nil
}
if !grant.GetPermissions().GetRemoveGrant() {
// this request might remove Manager Permissions so we need to
// check if there is at least one manager remaining of the
// resource.
listGrantRes, err := s.listGrants(ctx, req.GetShare().GetResourceId())
if err != nil {
return nil, errors.Wrap(err, "gateway: error getting grant to remove from storage")
}
if !isSpaceManagerRemaining(listGrantRes.GetGrants(), grant.GetGrantee()) {
return &collaboration.UpdateShareResponse{
Status: status.NewPermissionDenied(ctx, errtypes.PermissionDenied(""), "can't remove the last manager"),
}, nil
}
}
st, err = s.updateGrant(ctx, req.GetShare().GetResourceId(), grant, opaque)
if err != nil {
@@ -523,6 +542,15 @@ func (s *svc) listGrants(ctx context.Context, id *provider.ResourceId) (*provide
return grantRes, nil
}
func (s *svc) getGranteeGrant(grants []*provider.Grant, grantee *provider.Grantee) *provider.Grant {
for _, g := range grants {
if isEqualGrantee(g.Grantee, grantee) {
return g
}
}
return nil
}
func (s *svc) addShare(ctx context.Context, req *collaboration.CreateShareRequest) (*collaboration.CreateShareResponse, error) {
c, err := pool.GetUserShareProviderClient(s.c.UserShareProviderEndpoint)
if err != nil {

View File

@@ -41,6 +41,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/fieldmaskpb"
)
func (h *Handler) getGrantee(ctx context.Context, name string) (provider.Grantee, error) {
@@ -108,8 +109,10 @@ func (h *Handler) addSpaceMember(w http.ResponseWriter, r *http.Request, info *p
// The viewer role doesn't have the ListGrants permission so we set it here.
permissions.ListGrants = true
fieldmask := []string{}
expireDate := r.PostFormValue("expireDate")
var expirationTs *types.Timestamp
fieldmask = append(fieldmask, "expiration")
if expireDate != "" {
expiration, err := time.Parse(_iso8601, expireDate)
if err != nil {
@@ -125,6 +128,7 @@ func (h *Handler) addSpaceMember(w http.ResponseWriter, r *http.Request, info *p
Seconds: uint64(expiration.UnixNano() / int64(time.Second)),
Nanos: uint32(expiration.UnixNano() % int64(time.Second)),
}
fieldmask = append(fieldmask, "expiration")
}
ref := provider.Reference{ResourceId: info.GetId()}
@@ -154,6 +158,9 @@ func (h *Handler) addSpaceMember(w http.ResponseWriter, r *http.Request, info *p
// we have to send the update request to the gateway to give it a chance to invalidate its cache
// TODO the gateway no longer should cache stuff because invalidation is to expensive. The decomposedfs already has a better cache.
if granteeExists(lgRes.Grants, grantee) {
if permissions != nil {
fieldmask = append(fieldmask, "permissions")
}
updateShareReq := &collaborationv1beta1.UpdateShareRequest{
// TODO: change CS3 APIs
Opaque: &types.Opaque{
@@ -169,6 +176,9 @@ func (h *Handler) addSpaceMember(w http.ResponseWriter, r *http.Request, info *p
Grantee: &grantee,
Expiration: expirationTs,
},
UpdateMask: &fieldmaskpb.FieldMask{
Paths: fieldmask,
},
}
updateShareReq.Opaque = utils.AppendPlainToOpaque(updateShareReq.Opaque, "spacetype", info.GetSpace().GetSpaceType())
updateShareRes, err := client.UpdateShare(ctx, updateShareReq)

View File

@@ -76,6 +76,9 @@ type UploadSession interface {
// Purge allows completely removing an upload. Should emit a PostprocessingFinished event with a Delete outcome
Purge(ctx context.Context) error
// ScanData returns the scan data for the UploadSession
ScanData() (string, time.Time)
}
// UploadSessionFilter can be used to filter upload sessions

View File

@@ -496,6 +496,11 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
continue
}
sublog = log.With().Str("spaceid", session.SpaceID()).Str("nodeid", session.NodeID()).Logger()
session.SetScanData(res.Description, res.Scandate)
if err := session.Persist(ctx); err != nil {
sublog.Error().Err(err).Msg("Failed to persist scan results")
}
}
if err := n.SetScanData(ctx, res.Description, res.Scandate); err != nil {

View File

@@ -298,7 +298,8 @@ func (s *OcisSession) MTime() time.Time {
// IsProcessing returns true if all bytes have been received. The session then has entered postprocessing state.
func (s *OcisSession) IsProcessing() bool {
return s.info.Size == s.info.Offset
// We might need a more sophisticated way to determine processing status soon
return s.info.Size == s.info.Offset && s.info.MetaData["scanResult"] == ""
}
// binPath returns the path to the file storing the binary data.
@@ -311,6 +312,22 @@ func (s *OcisSession) InitiatorID() string {
return s.info.MetaData["initiatorid"]
}
// SetScanData sets virus scan data to the upload session
func (s *OcisSession) SetScanData(result string, date time.Time) {
s.info.MetaData["scanResult"] = result
s.info.MetaData["scanDate"] = date.Format(time.RFC3339)
}
// ScanData returns the virus scan data
func (s *OcisSession) ScanData() (string, time.Time) {
date := s.info.MetaData["scanDate"]
if date == "" {
return "", time.Time{}
}
d, _ := time.Parse(time.RFC3339, date)
return s.info.MetaData["scanResult"], d
}
// sessionPath returns the path to the .info file storing the file's info.
func sessionPath(root, id string) string {
return filepath.Join(root, "uploads", id+".info")

2
vendor/modules.txt vendored
View File

@@ -370,7 +370,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
# github.com/cs3org/reva/v2 v2.19.2-0.20240422150349-51ab7655f858
# github.com/cs3org/reva/v2 v2.19.2-0.20240426071117-7ed0671f9e0b
## explicit; go 1.21
github.com/cs3org/reva/v2/cmd/revad/internal/grace
github.com/cs3org/reva/v2/cmd/revad/runtime