diff --git a/changelog/unreleased/move-pman-to-ocis.md b/changelog/unreleased/move-pman-to-ocis.md new file mode 100644 index 000000000..4df47a489 --- /dev/null +++ b/changelog/unreleased/move-pman-to-ocis.md @@ -0,0 +1,7 @@ +Change: Move runtime code on refs/pman over to owncloud/ocis/ocis + +Tags: ocis, runtime + +Currently, the runtime is under the private account of an oCIS developer. For future-proofing we don't want oCIS mission critical components to depend on external repositories, so we're including refs/pman module as an oCIS package instead. + +https://github.com/owncloud/ocis/pull/1483 diff --git a/ocis/go.mod b/ocis/go.mod index bda423aeb..036c6840a 100644 --- a/ocis/go.mod +++ b/ocis/go.mod @@ -31,10 +31,13 @@ require ( github.com/owncloud/ocis/thumbnails v0.1.6 github.com/owncloud/ocis/web v0.0.0-00010101000000-000000000000 github.com/owncloud/ocis/webdav v0.0.0-00010101000000-000000000000 - github.com/refs/pman v0.0.0-20201214134707-9ce4dcebbbf8 github.com/restic/calens v0.2.0 + github.com/rs/zerolog v1.20.0 + github.com/spf13/cobra v1.0.0 github.com/spf13/viper v1.7.1 + github.com/stretchr/testify v1.7.0 go.opencensus.io v0.22.5 + golang.org/x/sys v0.0.0-20201101102859-da207088b7d1 ) replace ( diff --git a/ocis/go.sum b/ocis/go.sum index 55a388755..7186bb8c8 100644 --- a/ocis/go.sum +++ b/ocis/go.sum @@ -1064,6 +1064,7 @@ github.com/ovh/go-ovh v0.0.0-20181109152953-ba5adb4cf014/go.mod h1:joRatxRJaZBsY github.com/owncloud/flaex v0.0.0-20200411150708-dce59891a203/go.mod h1:jip86t4OVURJTf8CM/0e2qcji/Y4NG3l2lR8kex4JWw= github.com/owncloud/flaex v0.2.0 h1:3FLf8oyMgA6HLK7w4+VJ5N1oVA8G7MptLCVjfxxIaww= github.com/owncloud/flaex v0.2.0/go.mod h1:jip86t4OVURJTf8CM/0e2qcji/Y4NG3l2lR8kex4JWw= +github.com/owncloud/ocis v1.0.0 h1:gtQZZSEzbSRucGPl3Ag5BOelZuw2OGv2e6WLAJMvQTQ= github.com/owncloud/ocis-graph v0.0.0-20200318175820-9a5a6e029db7 h1:gT0GyIOoR7XtpZ7sIxVJSckcz/nueGB1Cm1xNaflXQ0= github.com/owncloud/ocis-graph v0.0.0-20200318175820-9a5a6e029db7/go.mod h1:IRm6BBJqyPhYI+3fm5bWkhgFL/yh63ASUznFqN4yXgs= github.com/owncloud/ocis-graph-explorer v0.0.0-20200210111049-017eeb40dc0c h1:8g3u2JwOMP/UE+0B+YjV7UWEQzyCPbnZzwIto6lNc0I= @@ -1159,6 +1160,8 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/refs/pman v0.0.0-20201214134707-9ce4dcebbbf8 h1:7Pz7RrqgDeEp/nTGK55clOWODZ4+2D7J5K0VFOiYHU0= github.com/refs/pman v0.0.0-20201214134707-9ce4dcebbbf8/go.mod h1:EHI7tmxO5Ct3xBLHU43j51o5/U6VQz0TM6ik8RPE570= +github.com/refs/pman v0.0.0-20210125101615-7406747552bc h1:bh42GGXCDC/sVwu/1bYm4BCHBeXeX1qY8WSokQYuykQ= +github.com/refs/pman v0.0.0-20210125101615-7406747552bc/go.mod h1:OQcXtpEk2nPkbM32jtWRIyZHTTFReKIFmTrJb06pVY4= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/restic/calens v0.1.0/go.mod h1:u67f5msOjCTDYNzOf/NoAUSdmXP03YXPCwIQLYADy5M= @@ -1279,6 +1282,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/studio-b12/gowebdav v0.0.0-20200303150724-9380631c29a1 h1:TPyHV/OgChqNcnYqCoCvIFjR9TU60gFXXBKnhOBzVEI= github.com/studio-b12/gowebdav v0.0.0-20200303150724-9380631c29a1/go.mod h1:gCcfDlA1Y7GqOaeEKw5l9dOGx1VLdc/HuQSlQAaZ30s= github.com/subosito/gotenv v1.1.1/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= diff --git a/ocis/pkg/config/config.go b/ocis/pkg/config/config.go index f37fe87c3..7130fec29 100644 --- a/ocis/pkg/config/config.go +++ b/ocis/pkg/config/config.go @@ -7,6 +7,7 @@ import ( accounts "github.com/owncloud/ocis/accounts/pkg/config" glauth "github.com/owncloud/ocis/glauth/pkg/config" konnectd "github.com/owncloud/ocis/konnectd/pkg/config" + pman "github.com/owncloud/ocis/ocis/pkg/runtime/config" ocs "github.com/owncloud/ocis/ocs/pkg/config" onlyoffice "github.com/owncloud/ocis/onlyoffice/pkg/config" proxy "github.com/owncloud/ocis/proxy/pkg/config" @@ -16,7 +17,6 @@ import ( thumbnails "github.com/owncloud/ocis/thumbnails/pkg/config" web "github.com/owncloud/ocis/web/pkg/config" webdav "github.com/owncloud/ocis/webdav/pkg/config" - pman "github.com/refs/pman/pkg/config" ) // Log defines the available logging configuration. diff --git a/ocis/pkg/runtime/README.md b/ocis/pkg/runtime/README.md new file mode 100644 index 000000000..8d8c7ce19 --- /dev/null +++ b/ocis/pkg/runtime/README.md @@ -0,0 +1,68 @@ +# ownCloud Infinite Scale: Runtime + +Pman is a slim utility library for supervising long-running processes. It can be [embedded](https://github.com/owncloud/OCIS/blob/ea2a2b328e7261ed72e65adf48359c0a44e14b40/OCIS/pkg/runtime/runtime.go#L84) or used as a cli command. + +When used as a CLI command it relays actions to a running runtime. + +## Usage + +Start a runtime + +```go +package main +import "github.com/owncloud/ocis/ocis/pkg/runtime/service" + +func main() { + service.Start() +} +``` +![start runtime](https://imgur.com/F67hgQk.gif) + +Start sending messages +![message runtime](https://imgur.com/O71RlsJ.gif) + +## Example + +```go +package main + +import ( + "fmt" + "github.com/owncloud/ocis/ocis/pkg/runtime/process" + "github.com/owncloud/ocis/ocis/pkg/runtime/service" + "github.com/rs/zerolog/log" + "os" + "os/signal" + "syscall" + "time" +) + +func main() { + s := service.NewService() + var c = make(chan os.Signal, 1) + var o int + + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + if err := s.Start(process.NewProcEntry("ocs", nil, "ocs"), &o); err != nil { + os.Exit(1) + } + + time.AfterFunc(3*time.Second, func() { + var acc = "ocs" + fmt.Printf(fmt.Sprintf("shutting down service: %s", acc)) + if err := s.Controller.Kill(&acc); err != nil { + log.Fatal() + } + os.Exit(0) + }) + + for { + select { + case <-c: + return + } + } +} +``` + +Run the above example with `RUNTIME_KEEP_ALIVE=true` and with no `RUNTIME_KEEP_ALIVE` set to see its behavior. It requires an [OCIS binary](https://github.com/owncloud/ocis/releases) present in your `$PATH` for it to work. diff --git a/ocis/pkg/runtime/cmd/kill.go b/ocis/pkg/runtime/cmd/kill.go new file mode 100644 index 000000000..d3a5855b8 --- /dev/null +++ b/ocis/pkg/runtime/cmd/kill.go @@ -0,0 +1,35 @@ +package cmd + +import ( + "fmt" + "log" + "net" + "net/rpc" + + "github.com/owncloud/ocis/ocis/pkg/runtime/config" + "github.com/spf13/cobra" +) + +// Kill an extension. +func Kill(cfg *config.Config) *cobra.Command { + return &cobra.Command{ + Use: "kill", + Aliases: []string{"k"}, + Short: "Kill a running extensions.", + Args: cobra.MinimumNArgs(1), + Run: func(cmd *cobra.Command, args []string) { + client, err := rpc.DialHTTP("tcp", net.JoinHostPort(cfg.Hostname, cfg.Port)) + if err != nil { + log.Fatal("dialing:", err) + } + + var arg1 int + + if err := client.Call("Service.Kill", &args[0], &arg1); err != nil { + log.Fatal(err) + } + + fmt.Println(arg1) + }, + } +} diff --git a/ocis/pkg/runtime/cmd/list.go b/ocis/pkg/runtime/cmd/list.go new file mode 100644 index 000000000..1965acf26 --- /dev/null +++ b/ocis/pkg/runtime/cmd/list.go @@ -0,0 +1,34 @@ +package cmd + +import ( + "fmt" + "log" + "net" + "net/rpc" + + "github.com/owncloud/ocis/ocis/pkg/runtime/config" + "github.com/spf13/cobra" +) + +// List running extensions. +func List(cfg *config.Config) *cobra.Command { + return &cobra.Command{ + Use: "list", + Aliases: []string{"r"}, + Short: "List running extensions", + Run: func(cmd *cobra.Command, args []string) { + client, err := rpc.DialHTTP("tcp", net.JoinHostPort(cfg.Hostname, cfg.Port)) + if err != nil { + log.Fatal("dialing:", err) + } + + var arg1 string + + if err := client.Call("Service.List", struct{}{}, &arg1); err != nil { + log.Fatal(err) + } + + fmt.Println(arg1) + }, + } +} diff --git a/ocis/pkg/runtime/cmd/root.go b/ocis/pkg/runtime/cmd/root.go new file mode 100644 index 000000000..0c505b2ba --- /dev/null +++ b/ocis/pkg/runtime/cmd/root.go @@ -0,0 +1,30 @@ +package cmd + +import ( + "github.com/owncloud/ocis/ocis/pkg/runtime/config" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +var ( + rootCmd = &cobra.Command{ + Use: "pman", + Short: "RPC Process Manager", + } +) + +// RootCmd returns a configured root command. +func RootCmd(cfg *config.Config) *cobra.Command { + rootCmd.PersistentFlags().StringVarP(&cfg.Hostname, "hostname", "n", "localhost", "host with a running OCIS runtime.") + rootCmd.PersistentFlags().StringVarP(&cfg.Port, "port", "p", "10666", "port to send messages to the rpc OCIS runtime.") + rootCmd.PersistentFlags().BoolVarP(&cfg.KeepAlive, "keep-alive", "k", false, "restart supervised processes that abruptly die.") + + viper.BindPFlag("hostname", rootCmd.PersistentFlags().Lookup("hostname")) + viper.BindPFlag("port", rootCmd.PersistentFlags().Lookup("port")) + + rootCmd.AddCommand(List(cfg)) + rootCmd.AddCommand(Run(cfg)) + rootCmd.AddCommand(Kill(cfg)) + + return rootCmd +} diff --git a/ocis/pkg/runtime/cmd/run.go b/ocis/pkg/runtime/cmd/run.go new file mode 100644 index 000000000..0312fc6b4 --- /dev/null +++ b/ocis/pkg/runtime/cmd/run.go @@ -0,0 +1,37 @@ +package cmd + +import ( + "fmt" + "log" + "net" + "net/rpc" + "os" + + "github.com/owncloud/ocis/ocis/pkg/runtime/config" + "github.com/owncloud/ocis/ocis/pkg/runtime/process" + "github.com/spf13/cobra" +) + +// Run an extension. +func Run(cfg *config.Config) *cobra.Command { + return &cobra.Command{ + Use: "run", + Short: "Run an extension.", + Args: cobra.MinimumNArgs(1), + Run: func(cmd *cobra.Command, args []string) { + client, err := rpc.DialHTTP("tcp", net.JoinHostPort(cfg.Hostname, cfg.Port)) + if err != nil { + log.Fatal("dialing:", err) + } + + proc := process.NewProcEntry(args[0], os.Environ(), []string{args[0]}...) + var res int + + if err := client.Call("Service.Start", proc, &res); err != nil { + log.Fatal(err) + } + + fmt.Println(res) + }, + } +} diff --git a/ocis/pkg/runtime/config/config.go b/ocis/pkg/runtime/config/config.go new file mode 100644 index 000000000..ab56542c1 --- /dev/null +++ b/ocis/pkg/runtime/config/config.go @@ -0,0 +1,28 @@ +package config + +// Config determines behavior across the tool. +type Config struct { + // Hostname where the runtime is running. When using PMAN in cli mode, it determines where the host runtime is. + // Default is localhost. + Hostname string + + // Port configures the port where a runtime is available. It defaults to 10666. + Port string + + // KeepAlive configures if restart attempts are made if the process supervised terminates. Default is false. + KeepAlive bool +} + +var ( + defaultHostname = "localhost" + defaultPort = "10666" +) + +// NewConfig returns a new config with a set of defaults. +func NewConfig() *Config { + return &Config{ + Hostname: defaultHostname, + Port: defaultPort, + KeepAlive: false, + } +} diff --git a/ocis/pkg/runtime/controller/controller.go b/ocis/pkg/runtime/controller/controller.go new file mode 100644 index 000000000..5c58c4e89 --- /dev/null +++ b/ocis/pkg/runtime/controller/controller.go @@ -0,0 +1,168 @@ +package controller + +import ( + "fmt" + "os" + "os/exec" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/owncloud/ocis/ocis/pkg/runtime/config" + "github.com/owncloud/ocis/ocis/pkg/runtime/process" + "github.com/owncloud/ocis/ocis/pkg/runtime/storage" + "github.com/owncloud/ocis/ocis/pkg/runtime/watcher" + "github.com/rs/zerolog" + + "github.com/olekukonko/tablewriter" +) + +// Controller supervises processes. +type Controller struct { + m *sync.RWMutex + options Options + log zerolog.Logger + Config *config.Config + + Store storage.Storage + + // Bin is the OCIS single binary name. + Bin string + + // BinPath is the OCIS single binary path withing the host machine. + // The Controller needs to know the binary location in order to spawn new extensions. + BinPath string + + // Terminated facilitates communication from Watcher <-> Controller. Writes to this + // channel WILL always attempt to restart the crashed process. + Terminated chan process.ProcEntry +} + +var ( + once = sync.Once{} +) + +// NewController initializes a new controller. +func NewController(o ...Option) Controller { + opts := &Options{} + + for _, f := range o { + f(opts) + } + + c := Controller{ + m: &sync.RWMutex{}, + options: *opts, + log: *opts.Log, + Bin: "ocis", + Terminated: make(chan process.ProcEntry), + Store: storage.NewMapStorage(), + + Config: opts.Config, + } + + if opts.Bin != "" { + c.Bin = opts.Bin + } + + // Get binary location from $PATH lookup. If not present, it uses arg[0] as entry point. + path, err := exec.LookPath(c.Bin) + if err != nil { + c.log.Debug().Msg("OCIS binary not present in PATH, using Args[0]") + path = os.Args[0] + } + c.BinPath = path + return c +} + +// Start and watches a process. +func (c *Controller) Start(pe process.ProcEntry) error { + if pid := c.Store.Load(pe.Extension); pid != 0 { + c.log.Debug().Msg(fmt.Sprintf("extension already running: %s", pe.Extension)) + return nil + } + + w := watcher.NewWatcher() + if err := pe.Start(c.BinPath); err != nil { + return err + } + + // store the spawned child process PID. + if err := c.Store.Store(pe); err != nil { + return err + } + + w.Follow(pe, c.Terminated, c.options.Config.KeepAlive) + + once.Do(func() { + j := janitor{ + time.Second, + c.Store, + } + + go j.run() + go detach(c) + }) + return nil +} + +// Kill a managed process. +// Should a process managed by the runtime be allowed to be killed if the runtime is configured not to? +func (c *Controller) Kill(pe process.ProcEntry) error { + // load stored PID + pid := c.Store.Load(pe.Extension) + + // find process in host by PID + p, err := os.FindProcess(pid) + if err != nil { + return err + } + + if err := c.Store.Delete(pe); err != nil { + return err + } + c.log.Info().Str("package", "watcher").Msgf("terminating %v", pe.Extension) + + // terminate child process + return p.Kill() +} + +// Shutdown a running runtime. +func (c *Controller) Shutdown(ch chan struct{}) error { + entries := c.Store.LoadAll() + for cmd, pid := range entries { + c.log.Info().Str("package", "watcher").Msgf("gracefully terminating %v", cmd) + p, _ := os.FindProcess(pid) + if err := p.Kill(); err != nil { + return err + } + } + + ch <- struct{}{} + return nil +} + +// List managed processes. +func (c *Controller) List() string { + tableString := &strings.Builder{} + table := tablewriter.NewWriter(tableString) + table.SetHeader([]string{"Extension", "PID"}) + + entries := c.Store.LoadAll() + + keys := make([]string, 0, len(entries)) + for k := range entries { + keys = append(keys, k) + } + + sort.Strings(keys) + + for _, v := range keys { + table.Append([]string{v, strconv.Itoa(entries[v])}) + } + + table.Render() + return tableString.String() +} diff --git a/ocis/pkg/runtime/controller/janitor.go b/ocis/pkg/runtime/controller/janitor.go new file mode 100644 index 000000000..dca359cd7 --- /dev/null +++ b/ocis/pkg/runtime/controller/janitor.go @@ -0,0 +1,49 @@ +package controller + +import ( + "os" + "os/signal" + "syscall" + "time" + + "github.com/owncloud/ocis/ocis/pkg/runtime/process" + "github.com/owncloud/ocis/ocis/pkg/runtime/storage" +) + +type janitor struct { + // interval at which db is cleared. + interval time.Duration + + store storage.Storage +} + +func (j *janitor) run() { + ticker := time.NewTicker(j.interval) + work := make(chan os.Signal, 1) + signal.Notify(work, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT) + + for { + select { + case <-work: + return + case <-ticker.C: + j.cleanup() + } + } +} + +// cleanup removes orphaned extension + pid that were killed via SIGKILL given the nature of is being un-catchable, +// the only way to update pman's database is by polling. +func (j *janitor) cleanup() { + for name, pid := range j.store.LoadAll() { + // On unix like systems (linux, freebsd, etc) os.FindProcess will never return an error + if p, err := os.FindProcess(pid); err == nil { + if err := p.Signal(syscall.Signal(0)); err != nil { + j.store.Delete(process.ProcEntry{ + Pid: pid, + Extension: name, + }) + } + } + } +} diff --git a/ocis/pkg/runtime/controller/option.go b/ocis/pkg/runtime/controller/option.go new file mode 100644 index 000000000..1f8bfb81c --- /dev/null +++ b/ocis/pkg/runtime/controller/option.go @@ -0,0 +1,36 @@ +package controller + +import ( + "github.com/owncloud/ocis/ocis/pkg/runtime/config" + "github.com/rs/zerolog" +) + +// Options are the configurable options for a Controller. +type Options struct { + Bin string + Restart bool + Config *config.Config + Log *zerolog.Logger +} + +// Option represents an option. +type Option func(o *Options) + +// NewOptions returns a new Options struct. +func NewOptions() Options { + return Options{} +} + +// WithConfig sets Controller config. +func WithConfig(cfg *config.Config) Option { + return func(o *Options) { + o.Config = cfg + } +} + +// WithLog sets Controller config. +func WithLog(l *zerolog.Logger) Option { + return func(o *Options) { + o.Log = l + } +} diff --git a/ocis/pkg/runtime/controller/util.go b/ocis/pkg/runtime/controller/util.go new file mode 100644 index 000000000..7e157cbfc --- /dev/null +++ b/ocis/pkg/runtime/controller/util.go @@ -0,0 +1,10 @@ +package controller + +// detach will try to restart processes on failures. +func detach(c *Controller) { + for proc := range c.Terminated { + if err := c.Start(proc); err != nil { + c.log.Err(err) + } + } +} diff --git a/ocis/pkg/runtime/log/log.go b/ocis/pkg/runtime/log/log.go new file mode 100644 index 000000000..5baf45f23 --- /dev/null +++ b/ocis/pkg/runtime/log/log.go @@ -0,0 +1,30 @@ +package log + +import ( + "os" + "time" + + "github.com/rs/zerolog" +) + +var ( + // Level sets a project wide log level + Level zerolog.Level = zerolog.InfoLevel +) + +// NewLogger configures a logger. +func NewLogger(options ...Option) zerolog.Logger { + zerolog.SetGlobalLevel(Level) + + o := NewOptions() + for _, f := range options { + f(o) + } + + logger := zerolog.New(os.Stdout).With().Timestamp().Logger() + if o.Pretty { + logger = logger.Output(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.RFC3339}) + } + + return logger +} diff --git a/ocis/pkg/runtime/log/options.go b/ocis/pkg/runtime/log/options.go new file mode 100644 index 000000000..a0035c478 --- /dev/null +++ b/ocis/pkg/runtime/log/options.go @@ -0,0 +1,26 @@ +package log + +import "github.com/rs/zerolog" + +// Options are the configurable options for a Controller. +type Options struct { + Level zerolog.Level + Pretty bool +} + +// Option represents an option. +type Option func(o *Options) + +// NewOptions returns a new Options struct. +func NewOptions() *Options { + return &Options{ + Level: zerolog.DebugLevel, + } +} + +// WithPretty sets the pretty option. +func WithPretty(pretty bool) Option { + return func(o *Options) { + o.Pretty = pretty + } +} diff --git a/ocis/pkg/runtime/process/process.go b/ocis/pkg/runtime/process/process.go new file mode 100644 index 000000000..36766fb47 --- /dev/null +++ b/ocis/pkg/runtime/process/process.go @@ -0,0 +1,60 @@ +// +build !windows + +package process + +import ( + "os" + + sys "golang.org/x/sys/unix" +) + +// ProcEntry is an entry in the File db. +type ProcEntry struct { + Args []string + Env []string + Pid int + Extension string +} + +// NewProcEntry returns a new ProcEntry. +func NewProcEntry(extension string, env []string, args ...string) ProcEntry { + return ProcEntry{ + Extension: extension, + Args: args, + Env: env, + } +} + +// Start a process. +func (e *ProcEntry) Start(binPath string) error { + var argv = []string{binPath} + argv = append(argv, e.Args...) + + p, err := os.StartProcess(binPath, argv, &os.ProcAttr{ + Files: []*os.File{ + os.Stdin, + os.Stdout, + os.Stderr, + }, + Env: e.Env, + Sys: &sys.SysProcAttr{ + Setpgid: true, + }, + }) + if err != nil { + return err + } + e.Pid = p.Pid + + return nil +} + +// Kill the wrapped process. +func (e *ProcEntry) Kill() error { + p, err := os.FindProcess(e.Pid) + if err != nil { + return err + } + + return p.Kill() +} diff --git a/ocis/pkg/runtime/process/process_windows.go b/ocis/pkg/runtime/process/process_windows.go new file mode 100644 index 000000000..32b36c333 --- /dev/null +++ b/ocis/pkg/runtime/process/process_windows.go @@ -0,0 +1,56 @@ +// +build windows + +package process + +import ( + "os" +) + +// ProcEntry is an entry in the File db. +type ProcEntry struct { + Args []string + Env []string + Pid int + Extension string +} + +// NewProcEntry returns a new ProcEntry. +func NewProcEntry(extension string, env []string, args ...string) ProcEntry { + return ProcEntry{ + Extension: extension, + Args: args, + Env: env, + } +} + +// Start a process. +func (e *ProcEntry) Start(binPath string) error { + var argv = []string{binPath} + argv = append(argv, e.Args...) + + p, err := os.StartProcess(binPath, argv, &os.ProcAttr{ + Files: []*os.File{ + os.Stdin, + os.Stdout, + os.Stderr, + }, + Env: e.Env, + }) + if err != nil { + return err + } + + e.Pid = p.Pid + + return nil +} + +// Kill the wrapped process. +func (e *ProcEntry) Kill() error { + p, err := os.FindProcess(e.Pid) + if err != nil { + return err + } + + return p.Kill() +} diff --git a/ocis/pkg/runtime/runtime.go b/ocis/pkg/runtime/runtime.go index de5061a4a..a2849809b 100644 --- a/ocis/pkg/runtime/runtime.go +++ b/ocis/pkg/runtime/runtime.go @@ -15,8 +15,8 @@ import ( "github.com/micro/micro/v2/client/api" "github.com/micro/micro/v2/service/registry" - "github.com/refs/pman/pkg/process" - "github.com/refs/pman/pkg/service" + "github.com/owncloud/ocis/ocis/pkg/runtime/process" + "github.com/owncloud/ocis/ocis/pkg/runtime/service" ) var ( @@ -80,7 +80,9 @@ func New(cfg *config.Config) Runtime { // Start rpc runtime func (r *Runtime) Start() error { go r.Launch() - return service.Start() + return service.Start( + service.WithLogPretty(r.c.Log.Pretty), + ) } // Launch ocis default ocis extensions. diff --git a/ocis/pkg/runtime/service/option.go b/ocis/pkg/runtime/service/option.go new file mode 100644 index 000000000..5e8e88f53 --- /dev/null +++ b/ocis/pkg/runtime/service/option.go @@ -0,0 +1,28 @@ +package service + +// Log configures a structure logger. +type Log struct { + Pretty bool +} + +// Options are the configurable options for a Service. +type Options struct { + Log *Log +} + +// Option represents an option. +type Option func(o *Options) + +// NewOptions returns a new Options struct. +func NewOptions() *Options { + return &Options{ + Log: &Log{}, + } +} + +// WithLogPretty sets Controller config. +func WithLogPretty(pretty bool) Option { + return func(o *Options) { + o.Log.Pretty = pretty + } +} diff --git a/ocis/pkg/runtime/service/service.go b/ocis/pkg/runtime/service/service.go new file mode 100644 index 000000000..629761f85 --- /dev/null +++ b/ocis/pkg/runtime/service/service.go @@ -0,0 +1,164 @@ +package service + +import ( + "fmt" + "net" + "net/http" + "net/rpc" + "os" + "os/signal" + "strings" + "sync" + "syscall" + + "github.com/owncloud/ocis/ocis/pkg/runtime/config" + "github.com/owncloud/ocis/ocis/pkg/runtime/controller" + "github.com/owncloud/ocis/ocis/pkg/runtime/log" + "github.com/owncloud/ocis/ocis/pkg/runtime/process" + "github.com/rs/zerolog" + "github.com/spf13/viper" +) + +var ( + halt = make(chan os.Signal, 1) + done = make(chan struct{}, 1) +) + +// Service represents a RPC service. +type Service struct { + Controller controller.Controller + Log zerolog.Logger + wg *sync.WaitGroup + done bool +} + +// loadFromEnv would set cmd global variables. This is a workaround spf13/viper since pman used as a library does not +// parse flags. +func loadFromEnv() *config.Config { + cfg := config.NewConfig() + viper.AutomaticEnv() + + viper.BindEnv("keep-alive", "RUNTIME_KEEP_ALIVE") + viper.BindEnv("port", "RUNTIME_PORT") + + cfg.KeepAlive = viper.GetBool("keep-alive") + + if viper.GetString("port") != "" { + cfg.Port = viper.GetString("port") + } + + return cfg +} + +// NewService returns a configured service with a controller and a default logger. +// When used as a library, flags are not parsed, and in order to avoid introducing a global state with init functions +// calls are done explicitly to loadFromEnv(). +// Since this is the public constructor, options need to be added, at the moment only logging options +// are supported in order to match the running OwnCloud services structured log. +func NewService(options ...Option) *Service { + opts := NewOptions() + + for _, f := range options { + f(opts) + } + + cfg := loadFromEnv() + l := log.NewLogger( + log.WithPretty(opts.Log.Pretty), + ) + + return &Service{ + wg: &sync.WaitGroup{}, + Log: l, + Controller: controller.NewController( + controller.WithConfig(cfg), + controller.WithLog(&l), + ), + } +} + +// Start an rpc service. +func Start(o ...Option) error { + s := NewService(o...) + + if err := rpc.Register(s); err != nil { + s.Log.Fatal().Err(err) + } + rpc.HandleHTTP() + + signal.Notify(halt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP) + + l, err := net.Listen("tcp", fmt.Sprintf("%v:%v", s.Controller.Config.Hostname, s.Controller.Config.Port)) + if err != nil { + s.Log.Fatal().Err(err) + } + + // handle panic within the Service scope. + defer func() { + if r := recover(); r != nil { + reason := strings.Builder{} + if _, err := net.Dial("localhost", s.Controller.Config.Port); err != nil { + reason.WriteString("runtime address already in use") + } + + fmt.Println(reason.String()) + } + }() + + go trap(s) + + return http.Serve(l, nil) +} + +// Start indicates the Service Controller to start a new supervised service as an OS thread. +func (s *Service) Start(args process.ProcEntry, reply *int) error { + if !s.done { + s.wg.Add(1) + s.Log.Info().Str("service", args.Extension).Msgf("%v", "started") + if err := s.Controller.Start(args); err != nil { + *reply = 1 + return err + } + + *reply = 0 + s.wg.Done() + } + + return nil +} + +// List running processes for the Service Controller. +func (s *Service) List(args struct{}, reply *string) error { + *reply = s.Controller.List() + return nil +} + +// Kill a supervised process by subcommand name. +func (s *Service) Kill(args *string, reply *int) error { + pe := process.ProcEntry{ + Extension: *args, + } + if err := s.Controller.Kill(pe); err != nil { + *reply = 1 + return err + } + + *reply = 0 + return nil +} + +// trap blocks on halt channel. When the runtime is interrupted it +// signals the controller to stop any supervised process. +func trap(s *Service) { + <-halt + s.done = true + s.wg.Wait() + s.Log.Debug(). + Str("service", "runtime service"). + Msgf("terminating with signal: %v", s) + if err := s.Controller.Shutdown(done); err != nil { + s.Log.Err(err) + } + close(done) + os.Exit(0) +} diff --git a/ocis/pkg/runtime/storage/map.go b/ocis/pkg/runtime/storage/map.go new file mode 100644 index 000000000..9dfa59337 --- /dev/null +++ b/ocis/pkg/runtime/storage/map.go @@ -0,0 +1,64 @@ +package storage + +import ( + "github.com/owncloud/ocis/ocis/pkg/runtime/process" + + "sync" +) + +// Map synchronizes access to extension+pid tuples. +type Map struct { + c *sync.Map +} + +// NewMapStorage initializes a new Storage. +func NewMapStorage() Storage { + return &Map{ + c: &sync.Map{}, + } +} + +// Store a value on the underlying data structure. +func (m *Map) Store(e process.ProcEntry) error { + m.c.Store(e.Extension, e.Pid) + return nil +} + +// Delete a value on the underlying data structure. +func (m *Map) Delete(e process.ProcEntry) error { + m.c.Delete(e.Extension) + return nil +} + +// Load a single pid. +func (m *Map) Load(name string) int { + var val int + m.c.Range(func(k, v interface{}) bool { + if k.(string) == name { + val = v.(int) + return false + } + return true + }) + return val +} + +// LoadAll values from the underlying data structure. +func (m *Map) LoadAll() Entries { + e := make(map[string]int) + m.c.Range(func(k, v interface{}) bool { + ks, ok := k.(string) + if !ok { + return false + } + + vs, ok := v.(int) + if !ok { + return false + } + + e[ks] = vs + return true + }) + return e +} diff --git a/ocis/pkg/runtime/storage/map_test.go b/ocis/pkg/runtime/storage/map_test.go new file mode 100644 index 000000000..8aaee1574 --- /dev/null +++ b/ocis/pkg/runtime/storage/map_test.go @@ -0,0 +1,43 @@ +package storage + +import ( + "fmt" + "math/rand" + "os" + "strconv" + "testing" + + "github.com/owncloud/ocis/ocis/pkg/runtime/process" + "github.com/stretchr/testify/assert" +) + +func TestMain(m *testing.M) { + loadStore() + os.Exit(m.Run()) +} + +var ( + store = NewMapStorage() +) + +func loadStore() { + for i := 0; i < 20; i++ { + store.Store(process.ProcEntry{ + Pid: rand.Int(), + Extension: fmt.Sprintf("extension-%s", strconv.Itoa(i)), + }) + } +} + +func TestLoadAll(t *testing.T) { + all := store.LoadAll() + assert.NotNil(t, all["extension-1"]) +} + +func TestDelete(t *testing.T) { + store.Delete(process.ProcEntry{ + Extension: "extension-1", + }) + all := store.LoadAll() + assert.Zero(t, all["extension-1"]) +} diff --git a/ocis/pkg/runtime/storage/storage.go b/ocis/pkg/runtime/storage/storage.go new file mode 100644 index 000000000..833084770 --- /dev/null +++ b/ocis/pkg/runtime/storage/storage.go @@ -0,0 +1,21 @@ +package storage + +import "github.com/owncloud/ocis/ocis/pkg/runtime/process" + +// Entries is a tuple of +type Entries map[string]int + +// Storage defines a basic persistence interface layer. +type Storage interface { + // Store a representation of a process. + Store(e process.ProcEntry) error + + // Delete a representation of a process. + Delete(e process.ProcEntry) error + + // Load a single entry. + Load(name string) int + + // LoadAll retrieves a set of entries of running processes on the host machine. + LoadAll() Entries +} diff --git a/ocis/pkg/runtime/watcher/watcher.go b/ocis/pkg/runtime/watcher/watcher.go new file mode 100644 index 000000000..cbe2d46c7 --- /dev/null +++ b/ocis/pkg/runtime/watcher/watcher.go @@ -0,0 +1,55 @@ +package watcher + +import ( + golog "log" + "os" + + "github.com/owncloud/ocis/ocis/pkg/runtime/log" + "github.com/owncloud/ocis/ocis/pkg/runtime/process" + "github.com/rs/zerolog" +) + +// Watcher watches a process and sends messages using channels. +type Watcher struct { + log zerolog.Logger +} + +// NewWatcher initializes a watcher. +func NewWatcher() Watcher { + return Watcher{ + log: log.NewLogger(log.WithPretty(true)), + } +} + +// Follow a process until it dies. If restart is enabled, a new fork of the original process will be automatically spawned. +func (w *Watcher) Follow(pe process.ProcEntry, followerChan chan process.ProcEntry, restart bool) { + state := make(chan *os.ProcessState, 1) + + w.log.Debug().Str("package", "watcher").Msgf("watching %v", pe.Extension) + go func() { + ps, err := watch(pe.Pid) + if err != nil { + golog.Fatal(err) + } + + state <- ps + }() + + go func() { + status := <-state + w.log.Info().Str("package", "watcher").Msgf("%v exited with: %v", pe.Extension, status) + if restart { + followerChan <- pe + } + }() +} + +// watch a process by its pid. This operation blocks. +func watch(pid int) (*os.ProcessState, error) { + p, err := os.FindProcess(pid) + if err != nil { + return nil, err + } + + return p.Wait() +}