Extract storage layer + cs3impl

This commit is contained in:
Ilja Neumann
2020-09-28 16:19:53 +02:00
committed by Benedikt Kulmann
parent deab8e3f23
commit f6732fd14e
7 changed files with 333 additions and 112 deletions

View File

@@ -2,9 +2,7 @@ package service
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"regexp"
@@ -63,7 +61,7 @@ func (s Service) indexAccount(id string) error {
a := &proto.BleveAccount{
BleveType: "account",
}
if err := s.loadAccount(id, &a.Account); err != nil {
if err := s.repo.LoadAccount(context.Background(), id, &a.Account); err != nil {
s.log.Error().Err(err).Str("account", id).Msg("could not load account")
return err
}
@@ -79,37 +77,6 @@ func (s Service) indexAccount(id string) error {
// login eq \"teddy\" and password eq \"F&1!b90t111!\"
var authQuery = regexp.MustCompile(`^login eq '(.*)' and password eq '(.*)'$`) // TODO how is ' escaped in the password?
func (s Service) loadAccount(id string, a *proto.Account) (err error) {
path := filepath.Join(s.Config.Server.AccountsDataPath, "accounts", id)
var data []byte
if data, err = ioutil.ReadFile(path); err != nil {
return merrors.NotFound(s.id, "could not read account: %v", err.Error())
}
if err = json.Unmarshal(data, a); err != nil {
return merrors.InternalServerError(s.id, "could not unmarshal account: %v", err.Error())
}
return
}
func (s Service) writeAccount(a *proto.Account) (err error) {
// leave only the group id
s.deflateMemberOf(a)
var bytes []byte
if bytes, err = json.Marshal(a); err != nil {
return merrors.InternalServerError(s.id, "could not marshal account: %v", err.Error())
}
path := filepath.Join(s.Config.Server.AccountsDataPath, "accounts", a.Id)
if err = ioutil.WriteFile(path, bytes, 0600); err != nil {
return merrors.InternalServerError(s.id, "could not write account: %v", err.Error())
}
return
}
func (s Service) expandMemberOf(a *proto.Account) {
if a == nil {
return
@@ -118,7 +85,7 @@ func (s Service) expandMemberOf(a *proto.Account) {
for i := range a.MemberOf {
g := &proto.Group{}
// TODO resolve by name, when a create or update is issued they may not have an id? fall back to searching the group id in the index?
if err := s.loadGroup(a.MemberOf[i].Id, g); err == nil {
if err := s.repo.LoadGroup(context.Background(), a.MemberOf[i].Id, g); err == nil {
g.Members = nil // always hide members when expanding
expanded = append(expanded, g)
} else {
@@ -129,23 +96,6 @@ func (s Service) expandMemberOf(a *proto.Account) {
a.MemberOf = expanded
}
// deflateMemberOf replaces the groups of a user with an instance that only contains the id
func (s Service) deflateMemberOf(a *proto.Account) {
if a == nil {
return
}
deflated := []*proto.Group{}
for i := range a.MemberOf {
if a.MemberOf[i].Id != "" {
deflated = append(deflated, &proto.Group{Id: a.MemberOf[i].Id})
} else {
// TODO fetch and use an id when group only has a name but no id
s.log.Error().Str("id", a.Id).Interface("group", a.MemberOf[i]).Msg("resolving groups by name is not implemented yet")
}
}
a.MemberOf = deflated
}
func (s Service) passwordIsValid(hash string, pwd string) (ok bool) {
defer func() {
if r := recover(); r != nil {
@@ -233,7 +183,7 @@ func (s Service) ListAccounts(ctx context.Context, in *proto.ListAccountsRequest
for _, hit := range searchResult.Hits {
a := &proto.Account{}
if err = s.loadAccount(hit.ID, a); err != nil {
if err = s.repo.LoadAccount(ctx, hit.ID, a); err != nil {
s.log.Error().Err(err).Str("account", hit.ID).Msg("could not load account, skipping")
continue
}
@@ -281,7 +231,7 @@ func (s Service) GetAccount(ctx context.Context, in *proto.GetAccountRequest, ou
return merrors.InternalServerError(s.id, "could not clean up account id: %v", err.Error())
}
if err = s.loadAccount(id, out); err != nil {
if err = s.repo.LoadAccount(ctx, id, out); err != nil {
s.log.Error().Err(err).Str("id", id).Msg("could not load account")
return
}
@@ -346,7 +296,7 @@ func (s Service) CreateAccount(ctx context.Context, in *proto.CreateAccountReque
// TODO groups should be ignored during create, use groups.AddMember? return error?
// write and index account - note: don't do anything else in between!
if err = s.writeAccount(acc); err != nil {
if err = s.repo.WriteAccount(ctx, acc); err != nil {
s.log.Error().Err(err).Str("id", id).Msg("could not persist new account")
s.debugLogAccount(acc).Msg("could not persist new account")
return
@@ -407,7 +357,7 @@ func (s Service) UpdateAccount(ctx context.Context, in *proto.UpdateAccountReque
path := filepath.Join(s.Config.Server.AccountsDataPath, "accounts", id)
if err = s.loadAccount(id, out); err != nil {
if err = s.repo.LoadAccount(ctx, id, out); err != nil {
s.log.Error().Err(err).Str("id", id).Msg("could not load account")
return
}
@@ -461,7 +411,7 @@ func (s Service) UpdateAccount(ctx context.Context, in *proto.UpdateAccountReque
out.ExternalUserStateChangeDateTime = tsnow
}
if err = s.writeAccount(out); err != nil {
if err = s.repo.WriteAccount(ctx, out); err != nil {
s.log.Error().Err(err).Str("id", out.Id).Msg("could not persist updated account")
return
}
@@ -513,7 +463,7 @@ func (s Service) DeleteAccount(ctx context.Context, in *proto.DeleteAccountReque
path := filepath.Join(s.Config.Server.AccountsDataPath, "accounts", id)
a := &proto.Account{}
if err = s.loadAccount(id, a); err != nil {
if err = s.repo.LoadAccount(ctx, id, a); err != nil {
s.log.Error().Err(err).Str("id", id).Msg("could not load account")
return
}

View File

@@ -2,8 +2,6 @@ package service
import (
"context"
"encoding/json"
"io/ioutil"
"os"
"path/filepath"
"sync"
@@ -46,7 +44,7 @@ func (s Service) indexGroup(id string) error {
g := &proto.BleveGroup{
BleveType: "group",
}
if err := s.loadGroup(id, &g.Group); err != nil {
if err := s.repo.LoadGroup(context.Background(), id, &g.Group); err != nil {
s.log.Error().Err(err).Str("group", id).Msg("could not load group")
return err
}
@@ -58,43 +56,6 @@ func (s Service) indexGroup(id string) error {
return nil
}
func (s Service) loadGroup(id string, g *proto.Group) (err error) {
path := filepath.Join(s.Config.Server.AccountsDataPath, "groups", id)
groupLock.Lock()
defer groupLock.Unlock()
var data []byte
if data, err = ioutil.ReadFile(path); err != nil {
return merrors.NotFound(s.id, "could not read group: %v", err.Error())
}
if err = json.Unmarshal(data, g); err != nil {
return merrors.InternalServerError(s.id, "could not unmarshal group: %v", err.Error())
}
return
}
func (s Service) writeGroup(g *proto.Group) (err error) {
// leave only the member id
s.deflateMembers(g)
var bytes []byte
if bytes, err = json.Marshal(g); err != nil {
return merrors.InternalServerError(s.id, "could not marshal group: %v", err.Error())
}
path := filepath.Join(s.Config.Server.AccountsDataPath, "groups", g.Id)
groupLock.Lock()
defer groupLock.Unlock()
if err = ioutil.WriteFile(path, bytes, 0600); err != nil {
return merrors.InternalServerError(s.id, "could not write group: %v", err.Error())
}
return
}
func (s Service) expandMembers(g *proto.Group) {
if g == nil {
return
@@ -103,7 +64,7 @@ func (s Service) expandMembers(g *proto.Group) {
for i := range g.Members {
// TODO resolve by name, when a create or update is issued they may not have an id? fall back to searching the group id in the index?
a := &proto.Account{}
if err := s.loadAccount(g.Members[i].Id, a); err == nil {
if err := s.repo.LoadAccount(context.Background(), g.Members[i].Id, a); err == nil {
expanded = append(expanded, a)
} else {
// log errors but continue execution for now
@@ -173,7 +134,7 @@ func (s Service) ListGroups(c context.Context, in *proto.ListGroupsRequest, out
for _, hit := range searchResult.Hits {
g := &proto.Group{}
if err = s.loadGroup(hit.ID, g); err != nil {
if err = s.repo.LoadGroup(c, hit.ID, g); err != nil {
s.log.Error().Err(err).Str("group", hit.ID).Msg("could not load group, skipping")
continue
}
@@ -196,7 +157,7 @@ func (s Service) GetGroup(c context.Context, in *proto.GetGroupRequest, out *pro
return merrors.InternalServerError(s.id, "could not clean up group id: %v", err.Error())
}
if err = s.loadGroup(id, out); err != nil {
if err = s.repo.LoadGroup(c, id, out); err != nil {
s.log.Error().Err(err).Str("id", id).Msg("could not load group")
return
}
@@ -226,7 +187,7 @@ func (s Service) CreateGroup(c context.Context, in *proto.CreateGroupRequest, ou
// extract member id
s.deflateMembers(in.Group)
if err = s.writeGroup(in.Group); err != nil {
if err = s.repo.WriteGroup(c, in.Group); err != nil {
s.log.Error().Err(err).Interface("group", in.Group).Msg("could not persist new group")
return
}
@@ -252,7 +213,7 @@ func (s Service) DeleteGroup(c context.Context, in *proto.DeleteGroupRequest, ou
path := filepath.Join(s.Config.Server.AccountsDataPath, "groups", id)
g := &proto.Group{}
if err = s.loadGroup(id, g); err != nil {
if err = s.repo.LoadGroup(c, id, g); err != nil {
s.log.Error().Err(err).Str("id", id).Msg("could not load account")
return
}
@@ -297,13 +258,13 @@ func (s Service) AddMember(c context.Context, in *proto.AddMemberRequest, out *p
// load structs
a := &proto.Account{}
if err = s.loadAccount(accountID, a); err != nil {
if err = s.repo.LoadAccount(c, accountID, a); err != nil {
s.log.Error().Err(err).Str("id", accountID).Msg("could not load account")
return
}
g := &proto.Group{}
if err = s.loadGroup(groupID, g); err != nil {
if err = s.repo.LoadGroup(c, groupID, g); err != nil {
s.log.Error().Err(err).Str("id", groupID).Msg("could not load group")
return
}
@@ -331,11 +292,11 @@ func (s Service) AddMember(c context.Context, in *proto.AddMemberRequest, out *p
a.MemberOf = append(a.MemberOf, g)
}
if err = s.writeAccount(a); err != nil {
if err = s.repo.WriteAccount(c, a); err != nil {
s.log.Error().Err(err).Interface("account", a).Msg("could not persist account")
return
}
if err = s.writeGroup(g); err != nil {
if err = s.repo.WriteGroup(c, g); err != nil {
s.log.Error().Err(err).Interface("group", g).Msg("could not persist group")
return
}
@@ -362,13 +323,13 @@ func (s Service) RemoveMember(c context.Context, in *proto.RemoveMemberRequest,
// load structs
a := &proto.Account{}
if err = s.loadAccount(accountID, a); err != nil {
if err = s.repo.LoadAccount(c, accountID, a); err != nil {
s.log.Error().Err(err).Str("id", accountID).Msg("could not load account")
return
}
g := &proto.Group{}
if err = s.loadGroup(groupID, g); err != nil {
if err = s.repo.LoadGroup(c, groupID, g); err != nil {
s.log.Error().Err(err).Str("id", groupID).Msg("could not load group")
return
}
@@ -391,11 +352,11 @@ func (s Service) RemoveMember(c context.Context, in *proto.RemoveMemberRequest,
}
a.MemberOf = newGroups
if err = s.writeAccount(a); err != nil {
if err = s.repo.WriteAccount(c, a); err != nil {
s.log.Error().Err(err).Interface("account", a).Msg("could not persist account")
return
}
if err = s.writeGroup(g); err != nil {
if err = s.repo.WriteGroup(c, g); err != nil {
s.log.Error().Err(err).Interface("group", g).Msg("could not persist group")
return
}
@@ -416,7 +377,7 @@ func (s Service) ListMembers(c context.Context, in *proto.ListMembersRequest, ou
}
g := &proto.Group{}
if err = s.loadGroup(groupID, g); err != nil {
if err = s.repo.LoadGroup(c, groupID, g); err != nil {
s.log.Error().Err(err).Str("id", groupID).Msg("could not load group")
return
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/owncloud/ocis/accounts/pkg/storage"
"io/ioutil"
"os"
"path/filepath"
@@ -405,6 +406,7 @@ type Service struct {
index bleve.Index
RoleService settings.RoleService
RoleManager *roles.Manager
repo storage.DiskRepo
}
func cleanupID(id string) (string, error) {

144
accounts/pkg/storage/cs3.go Normal file
View File

@@ -0,0 +1,144 @@
package storage
import (
"bytes"
"context"
"encoding/json"
"fmt"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
v1beta11 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/pkg/token"
"github.com/cs3org/reva/pkg/token/manager/jwt"
merrors "github.com/micro/go-micro/v2/errors"
"github.com/owncloud/ocis/accounts/pkg/proto/v0"
"google.golang.org/grpc/metadata"
"net/http"
)
type CS3Repo struct {
serviceID string
dataPath string
rootPath string
tm token.Manager
storageClient provider.ProviderAPIClient
}
func NewCS3Repo(secret string) (Repo, error) {
tokenManager, err := jwt.New(map[string]interface{}{
"secret": "Pive-Fumkiu4",
})
if err != nil {
return nil, err
}
client, err := pool.GetStorageProviderServiceClient("localhost:9185")
if err != nil {
return nil, err
}
return CS3Repo{tm: tokenManager, storageClient: client}, nil
}
func (r CS3Repo) WriteAccount(ctx context.Context, a *proto.Account) (err error) {
t, err := r.authenticate(ctx)
if err != nil {
return err
}
ctx = metadata.AppendToOutgoingContext(ctx, token.TokenHeader, t)
if err := r.makeRootDirIfNotExist(ctx); err != nil {
return err
}
var by []byte
if by, err = json.Marshal(a); err != nil {
return merrors.InternalServerError(r.serviceID, "could not marshal account: %v", err.Error())
}
ureq, err := http.NewRequest("PUT", fmt.Sprintf("http://localhost:9187/data/accounts/%s", a.Id), bytes.NewReader(by))
if err != nil {
return err
}
ureq.Header.Add("x-access-token", t)
cl := http.Client{
Transport: http.DefaultTransport,
}
if _, err := cl.Do(ureq); err != nil {
return err
}
return nil
}
func (r CS3Repo) LoadAccount(ctx context.Context, id string, a *proto.Account) (err error) {
t, err := r.authenticate(ctx)
if err != nil {
return err
}
ctx = metadata.AppendToOutgoingContext(ctx, token.TokenHeader, t)
ureq, err := http.NewRequest("GET", fmt.Sprintf("http://localhost:9187/data/accounts/%s", id), nil)
if err != nil {
return err
}
ureq.Header.Add("x-access-token", t)
cl := http.Client{
Transport: http.DefaultTransport,
}
if _, err = cl.Do(ureq); err != nil {
return err
}
return nil
}
func (r CS3Repo) WriteGroup(ctx context.Context, g *proto.Group) (err error) {
panic("implement me")
}
func (r CS3Repo) LoadGroup(ctx context.Context, id string, g *proto.Group) (err error) {
panic("implement me")
}
func (r CS3Repo) authenticate(ctx context.Context) (token string, err error) {
return r.tm.MintToken(ctx, &user.User{
Id: &user.UserId{},
Groups: []string{},
})
}
func (r CS3Repo) makeRootDirIfNotExist(ctx context.Context) error {
var rootPathRef = &provider.Reference{
Spec: &provider.Reference_Path{Path: "/meta/accounts"},
}
resp, err := r.storageClient.Stat(ctx, &provider.StatRequest{
Ref: rootPathRef,
})
if err != nil {
return err
}
if resp.Status.Code == v1beta11.Code_CODE_NOT_FOUND {
_, err := r.storageClient.CreateContainer(ctx, &provider.CreateContainerRequest{
Ref: rootPathRef,
})
if err != nil {
return err
}
}
return nil
}

View File

@@ -0,0 +1,18 @@
package storage
/*
func TestFoo(t *testing.T) {
r, err := NewCS3Repo("fooo")
assert.NoError(t, err)
err = r.WriteAccount(context.Background(), &proto.Account{
Id: "fefef-egegweg-gegeg",
AccountEnabled: true,
DisplayName: "Mike Jones",
Mail: "mike@example.com",
})
assert.NoError(t, err)
}
*/

View File

@@ -0,0 +1,133 @@
package storage
import (
"context"
"encoding/json"
merrors "github.com/micro/go-micro/v2/errors"
"github.com/owncloud/ocis/accounts/pkg/proto/v0"
"github.com/rs/zerolog"
"io/ioutil"
"path/filepath"
"sync"
)
var groupLock sync.Mutex
type DiskRepo struct {
serviceID string
dataPath string
log zerolog.Logger
}
func New(serviceID string, dataPath string, log zerolog.Logger) DiskRepo {
return DiskRepo{
serviceID: serviceID,
dataPath: dataPath,
log: log.With().Str("id", serviceID).Logger(),
}
}
// WriteAccount to the storage
func (r DiskRepo) WriteAccount(ctx context.Context, a *proto.Account) (err error) {
// leave only the group id
r.deflateMemberOf(a)
var bytes []byte
if bytes, err = json.Marshal(a); err != nil {
return merrors.InternalServerError(r.serviceID, "could not marshal account: %v", err.Error())
}
path := filepath.Join(r.dataPath, "accounts", a.Id)
if err = ioutil.WriteFile(path, bytes, 0600); err != nil {
return merrors.InternalServerError(r.serviceID, "could not write account: %v", err.Error())
}
return
}
// LoadAccount from the storage
func (r DiskRepo) LoadAccount(ctx context.Context, id string, a *proto.Account) (err error) {
path := filepath.Join(r.dataPath, "accounts", id)
var data []byte
if data, err = ioutil.ReadFile(path); err != nil {
return merrors.NotFound(r.serviceID, "could not read account: %v", err.Error())
}
if err = json.Unmarshal(data, a); err != nil {
return merrors.InternalServerError(r.serviceID, "could not unmarshal account: %v", err.Error())
}
return
}
// WriteGroup persists a given group to the storage
func (r DiskRepo) WriteGroup(ctx context.Context, g *proto.Group) (err error) {
// leave only the member id
r.deflateMembers(g)
var bytes []byte
if bytes, err = json.Marshal(g); err != nil {
return merrors.InternalServerError(r.serviceID, "could not marshal group: %v", err.Error())
}
path := filepath.Join(r.dataPath, "groups", g.Id)
groupLock.Lock()
defer groupLock.Unlock()
if err = ioutil.WriteFile(path, bytes, 0600); err != nil {
return merrors.InternalServerError(r.serviceID, "could not write group: %v", err.Error())
}
return
}
// LoadGroup from the storage
func (r DiskRepo) LoadGroup(ctx context.Context, id string, g *proto.Group) (err error) {
path := filepath.Join(r.dataPath, "groups", id)
groupLock.Lock()
defer groupLock.Unlock()
var data []byte
if data, err = ioutil.ReadFile(path); err != nil {
return merrors.NotFound(r.serviceID, "could not read group: %v", err.Error())
}
if err = json.Unmarshal(data, g); err != nil {
return merrors.InternalServerError(r.serviceID, "could not unmarshal group: %v", err.Error())
}
return
}
// deflateMemberOf replaces the groups of a user with an instance that only contains the id
func (r DiskRepo) deflateMemberOf(a *proto.Account) {
if a == nil {
return
}
deflated := []*proto.Group{}
for i := range a.MemberOf {
if a.MemberOf[i].Id != "" {
deflated = append(deflated, &proto.Group{Id: a.MemberOf[i].Id})
} else {
// TODO fetch and use an id when group only has a name but no id
r.log.Error().Str("id", a.Id).Interface("group", a.MemberOf[i]).Msg("resolving groups by name is not implemented yet")
}
}
a.MemberOf = deflated
}
// deflateMembers replaces the users of a group with an instance that only contains the id
func (r DiskRepo) deflateMembers(g *proto.Group) {
if g == nil {
return
}
deflated := []*proto.Account{}
for i := range g.Members {
if g.Members[i].Id != "" {
deflated = append(deflated, &proto.Account{Id: g.Members[i].Id})
} else {
// TODO fetch and use an id when group only has a name but no id
r.log.Error().Str("id", g.Id).Interface("account", g.Members[i]).Msg("resolving members by name is not implemented yet")
}
}
g.Members = deflated
}

View File

@@ -0,0 +1,13 @@
package storage
import (
"context"
"github.com/owncloud/ocis/accounts/pkg/proto/v0"
)
type Repo interface {
WriteAccount(ctx context.Context, a *proto.Account) (err error)
LoadAccount(ctx context.Context, id string, a *proto.Account) (err error)
WriteGroup(ctx context.Context, g *proto.Group) (err error)
LoadGroup(ctx context.Context, id string, g *proto.Group) (err error)
}