diff --git a/pkg/service/v0/service.go b/pkg/service/v0/service.go index 5ebd11c9b8..913196a24e 100644 --- a/pkg/service/v0/service.go +++ b/pkg/service/v0/service.go @@ -6,7 +6,6 @@ import ( "io/ioutil" "os" "path/filepath" - "strings" "github.com/blevesearch/bleve" "github.com/blevesearch/bleve/analysis/analyzer/keyword" @@ -19,9 +18,9 @@ import ( // BleveDocument wraps the generated Record.Metadata and adds a property that is used to distinguish documents in the index. type BleveDocument struct { - Metadata map[string]*proto.Field - Database string `json:"database"` - Table string `json:"table"` + Metadata map[string]*proto.Field `json:"metadata"` + Database string `json:"database"` + Table string `json:"table"` } // New returns a new instance of Service @@ -63,9 +62,9 @@ func New(opts ...Option) (s *Service, err error) { if s.index, err = bleve.New(indexDir, indexMapping); err != nil { return } - // if err = s.indexRecords(recordsDir); err != nil { - // return nil, err - // } + if err = s.indexRecords(recordsDir); err != nil { + return nil, err + } return } @@ -80,7 +79,8 @@ type Service struct { // Read implements the StoreHandler interface. func (s *Service) Read(c context.Context, rreq *proto.ReadRequest, rres *proto.ReadResponse) error { if len(rreq.Key) != 0 { - file := filepath.Join(s.Config.Datapath, "databases", rreq.Options.Database, rreq.Options.Table, rreq.Key) + id := getID(rreq.Options.Database, rreq.Options.Table, rreq.Key) + file := filepath.Join(s.Config.Datapath, "databases", id) var data []byte rec := &proto.Record{} @@ -97,7 +97,7 @@ func (s *Service) Read(c context.Context, rreq *proto.ReadRequest, rres *proto.R return nil } - s.log.Info().Interface("requeest", rreq).Msg("read request") + s.log.Info().Interface("request", rreq).Msg("read request") if rreq.Options.Where != nil { // build bleve query // execute search @@ -149,9 +149,8 @@ func (s *Service) Read(c context.Context, rreq *proto.ReadRequest, rres *proto.R // Write implements the StoreHandler interface. func (s *Service) Write(c context.Context, wreq *proto.WriteRequest, wres *proto.WriteResponse) error { - // TODO sanitize key. As it may contain invalid characters, such as slashes. - // file: /var/tmp/ocis-store/databases/{database}/{table}/{record.key}. - file := filepath.Join(s.Config.Datapath, "databases", wreq.Options.Database, wreq.Options.Table, wreq.Record.Key) + id := getID(wreq.Options.Database, wreq.Options.Table, wreq.Record.Key) + file := filepath.Join(s.Config.Datapath, "databases", id) var bytes []byte bytes, err := protojson.Marshal(wreq.Record) @@ -173,8 +172,7 @@ func (s *Service) Write(c context.Context, wreq *proto.WriteRequest, wres *proto Database: wreq.Options.Database, Table: wreq.Options.Table, } - // TODO sanitize input. - if err := s.index.Index(strings.Join([]string{wreq.Options.Database, wreq.Options.Table, wreq.Record.Key}, "/"), doc); err != nil { + if err := s.index.Index(id, doc); err != nil { s.log.Error().Err(err).Interface("document", doc).Msg("could not index record metadata") return err } @@ -184,7 +182,8 @@ func (s *Service) Write(c context.Context, wreq *proto.WriteRequest, wres *proto // Delete implements the StoreHandler interface. func (s *Service) Delete(c context.Context, dreq *proto.DeleteRequest, dres *proto.DeleteResponse) error { - file := filepath.Join(s.Config.Datapath, "databases", dreq.Options.Database, dreq.Options.Table, dreq.Key) + id := getID(dreq.Options.Database, dreq.Options.Table, dreq.Key) + file := filepath.Join(s.Config.Datapath, "databases", id) if err := os.Remove(file); err != nil { if os.IsNotExist(err) { return merrors.NotFound(s.id, "could not find record") @@ -192,6 +191,12 @@ func (s *Service) Delete(c context.Context, dreq *proto.DeleteRequest, dres *pro return merrors.InternalServerError(s.id, "could not delete record") } + + if err := s.index.Delete(id); err != nil { + s.log.Error().Err(err).Str("id", id).Msg("could not remove record from index") + return merrors.InternalServerError(s.id, "could not remove record from index") + } + return nil } @@ -235,3 +240,93 @@ func (s *Service) Tables(ctx context.Context, in *proto.TablesRequest, out *prot out.Tables = tnames return nil } + +// TODO sanitize key. As it may contain invalid characters, such as slashes. +// file: /var/tmp/ocis-store/databases/{database}/{table}/{record.key}. +func getID(database string, table string, key string) string { + // TODO sanitize input. + return filepath.Join(database, table, key) +} + +func (s Service) indexRecords(recordsDir string) (err error) { + + // TODO use filepath.Walk to clean up code + rh, err := os.Open(recordsDir) + if err != nil { + return merrors.InternalServerError(s.id, "could not open database directory") + } + defer rh.Close() + + dbs, err := rh.Readdirnames(0) + if err != nil { + return merrors.InternalServerError(s.id, "could not read databases directory") + } + + for i := range dbs { + tp := filepath.Join(s.Config.Datapath, "databases", dbs[i]) + th, err := os.Open(tp) + if err != nil { + s.log.Error().Err(err).Str("database", dbs[i]).Msg("could not open database directory") + continue + } + defer th.Close() + + tables, err := th.Readdirnames(0) + if err != nil { + s.log.Error().Err(err).Str("database", dbs[i]).Msg("could not read database directory") + continue + } + + for j := range tables { + + tp := filepath.Join(s.Config.Datapath, "databases", dbs[i], tables[j]) + kh, err := os.Open(tp) + if err != nil { + s.log.Error().Err(err).Str("database", dbs[i]).Str("table", tables[i]).Msg("could not open table directory") + continue + } + defer kh.Close() + + keys, err := kh.Readdirnames(0) + if err != nil { + s.log.Error().Err(err).Str("database", dbs[i]).Str("table", tables[i]).Msg("could not read table directory") + continue + } + + for k := range keys { + + id := getID(dbs[i], tables[j], keys[k]) + kp := filepath.Join(s.Config.Datapath, "databases", id) + + // read record + var data []byte + rec := &proto.Record{} + data, err = ioutil.ReadFile(kp) + if err != nil { + s.log.Error().Err(err).Str("id", id).Msg("could not read record") + continue + } + + if err = protojson.Unmarshal(data, rec); err != nil { + s.log.Error().Err(err).Str("id", id).Msg("could not unmarshal record") + continue + } + + // index record + doc := BleveDocument{ + Metadata: rec.Metadata, + Database: dbs[i], + Table: tables[j], + } + if err := s.index.Index(id, doc); err != nil { + s.log.Error().Err(err).Interface("document", doc).Str("id", id).Msg("could not index record metadata") + continue + } + + s.log.Debug().Str("id", id).Msg("indexed record") + } + } + } + + return +}