mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-04-27 10:48:35 -04:00
@@ -6,7 +6,6 @@ import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/blevesearch/bleve"
|
||||
"github.com/blevesearch/bleve/analysis/analyzer/keyword"
|
||||
@@ -19,9 +18,9 @@ import (
|
||||
|
||||
// BleveDocument wraps the generated Record.Metadata and adds a property that is used to distinguish documents in the index.
|
||||
type BleveDocument struct {
|
||||
Metadata map[string]*proto.Field
|
||||
Database string `json:"database"`
|
||||
Table string `json:"table"`
|
||||
Metadata map[string]*proto.Field `json:"metadata"`
|
||||
Database string `json:"database"`
|
||||
Table string `json:"table"`
|
||||
}
|
||||
|
||||
// New returns a new instance of Service
|
||||
@@ -63,9 +62,9 @@ func New(opts ...Option) (s *Service, err error) {
|
||||
if s.index, err = bleve.New(indexDir, indexMapping); err != nil {
|
||||
return
|
||||
}
|
||||
// if err = s.indexRecords(recordsDir); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
if err = s.indexRecords(recordsDir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -80,7 +79,8 @@ type Service struct {
|
||||
// Read implements the StoreHandler interface.
|
||||
func (s *Service) Read(c context.Context, rreq *proto.ReadRequest, rres *proto.ReadResponse) error {
|
||||
if len(rreq.Key) != 0 {
|
||||
file := filepath.Join(s.Config.Datapath, "databases", rreq.Options.Database, rreq.Options.Table, rreq.Key)
|
||||
id := getID(rreq.Options.Database, rreq.Options.Table, rreq.Key)
|
||||
file := filepath.Join(s.Config.Datapath, "databases", id)
|
||||
|
||||
var data []byte
|
||||
rec := &proto.Record{}
|
||||
@@ -97,7 +97,7 @@ func (s *Service) Read(c context.Context, rreq *proto.ReadRequest, rres *proto.R
|
||||
return nil
|
||||
}
|
||||
|
||||
s.log.Info().Interface("requeest", rreq).Msg("read request")
|
||||
s.log.Info().Interface("request", rreq).Msg("read request")
|
||||
if rreq.Options.Where != nil {
|
||||
// build bleve query
|
||||
// execute search
|
||||
@@ -149,9 +149,8 @@ func (s *Service) Read(c context.Context, rreq *proto.ReadRequest, rres *proto.R
|
||||
|
||||
// Write implements the StoreHandler interface.
|
||||
func (s *Service) Write(c context.Context, wreq *proto.WriteRequest, wres *proto.WriteResponse) error {
|
||||
// TODO sanitize key. As it may contain invalid characters, such as slashes.
|
||||
// file: /var/tmp/ocis-store/databases/{database}/{table}/{record.key}.
|
||||
file := filepath.Join(s.Config.Datapath, "databases", wreq.Options.Database, wreq.Options.Table, wreq.Record.Key)
|
||||
id := getID(wreq.Options.Database, wreq.Options.Table, wreq.Record.Key)
|
||||
file := filepath.Join(s.Config.Datapath, "databases", id)
|
||||
|
||||
var bytes []byte
|
||||
bytes, err := protojson.Marshal(wreq.Record)
|
||||
@@ -173,8 +172,7 @@ func (s *Service) Write(c context.Context, wreq *proto.WriteRequest, wres *proto
|
||||
Database: wreq.Options.Database,
|
||||
Table: wreq.Options.Table,
|
||||
}
|
||||
// TODO sanitize input.
|
||||
if err := s.index.Index(strings.Join([]string{wreq.Options.Database, wreq.Options.Table, wreq.Record.Key}, "/"), doc); err != nil {
|
||||
if err := s.index.Index(id, doc); err != nil {
|
||||
s.log.Error().Err(err).Interface("document", doc).Msg("could not index record metadata")
|
||||
return err
|
||||
}
|
||||
@@ -184,7 +182,8 @@ func (s *Service) Write(c context.Context, wreq *proto.WriteRequest, wres *proto
|
||||
|
||||
// Delete implements the StoreHandler interface.
|
||||
func (s *Service) Delete(c context.Context, dreq *proto.DeleteRequest, dres *proto.DeleteResponse) error {
|
||||
file := filepath.Join(s.Config.Datapath, "databases", dreq.Options.Database, dreq.Options.Table, dreq.Key)
|
||||
id := getID(dreq.Options.Database, dreq.Options.Table, dreq.Key)
|
||||
file := filepath.Join(s.Config.Datapath, "databases", id)
|
||||
if err := os.Remove(file); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return merrors.NotFound(s.id, "could not find record")
|
||||
@@ -192,6 +191,12 @@ func (s *Service) Delete(c context.Context, dreq *proto.DeleteRequest, dres *pro
|
||||
|
||||
return merrors.InternalServerError(s.id, "could not delete record")
|
||||
}
|
||||
|
||||
if err := s.index.Delete(id); err != nil {
|
||||
s.log.Error().Err(err).Str("id", id).Msg("could not remove record from index")
|
||||
return merrors.InternalServerError(s.id, "could not remove record from index")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -235,3 +240,93 @@ func (s *Service) Tables(ctx context.Context, in *proto.TablesRequest, out *prot
|
||||
out.Tables = tnames
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO sanitize key. As it may contain invalid characters, such as slashes.
|
||||
// file: /var/tmp/ocis-store/databases/{database}/{table}/{record.key}.
|
||||
func getID(database string, table string, key string) string {
|
||||
// TODO sanitize input.
|
||||
return filepath.Join(database, table, key)
|
||||
}
|
||||
|
||||
func (s Service) indexRecords(recordsDir string) (err error) {
|
||||
|
||||
// TODO use filepath.Walk to clean up code
|
||||
rh, err := os.Open(recordsDir)
|
||||
if err != nil {
|
||||
return merrors.InternalServerError(s.id, "could not open database directory")
|
||||
}
|
||||
defer rh.Close()
|
||||
|
||||
dbs, err := rh.Readdirnames(0)
|
||||
if err != nil {
|
||||
return merrors.InternalServerError(s.id, "could not read databases directory")
|
||||
}
|
||||
|
||||
for i := range dbs {
|
||||
tp := filepath.Join(s.Config.Datapath, "databases", dbs[i])
|
||||
th, err := os.Open(tp)
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Str("database", dbs[i]).Msg("could not open database directory")
|
||||
continue
|
||||
}
|
||||
defer th.Close()
|
||||
|
||||
tables, err := th.Readdirnames(0)
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Str("database", dbs[i]).Msg("could not read database directory")
|
||||
continue
|
||||
}
|
||||
|
||||
for j := range tables {
|
||||
|
||||
tp := filepath.Join(s.Config.Datapath, "databases", dbs[i], tables[j])
|
||||
kh, err := os.Open(tp)
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Str("database", dbs[i]).Str("table", tables[i]).Msg("could not open table directory")
|
||||
continue
|
||||
}
|
||||
defer kh.Close()
|
||||
|
||||
keys, err := kh.Readdirnames(0)
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Str("database", dbs[i]).Str("table", tables[i]).Msg("could not read table directory")
|
||||
continue
|
||||
}
|
||||
|
||||
for k := range keys {
|
||||
|
||||
id := getID(dbs[i], tables[j], keys[k])
|
||||
kp := filepath.Join(s.Config.Datapath, "databases", id)
|
||||
|
||||
// read record
|
||||
var data []byte
|
||||
rec := &proto.Record{}
|
||||
data, err = ioutil.ReadFile(kp)
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Str("id", id).Msg("could not read record")
|
||||
continue
|
||||
}
|
||||
|
||||
if err = protojson.Unmarshal(data, rec); err != nil {
|
||||
s.log.Error().Err(err).Str("id", id).Msg("could not unmarshal record")
|
||||
continue
|
||||
}
|
||||
|
||||
// index record
|
||||
doc := BleveDocument{
|
||||
Metadata: rec.Metadata,
|
||||
Database: dbs[i],
|
||||
Table: tables[j],
|
||||
}
|
||||
if err := s.index.Index(id, doc); err != nil {
|
||||
s.log.Error().Err(err).Interface("document", doc).Str("id", id).Msg("could not index record metadata")
|
||||
continue
|
||||
}
|
||||
|
||||
s.log.Debug().Str("id", id).Msg("indexed record")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user