Merge pull request #1483 from owncloud/ocis-1485

This commit is contained in:
Alex Unger
2021-01-26 11:13:53 +01:00
committed by GitHub
25 changed files with 1064 additions and 5 deletions

View File

@@ -0,0 +1,7 @@
Change: Move runtime code on refs/pman over to owncloud/ocis/ocis
Tags: ocis, runtime
Currently, the runtime is under the private account of an oCIS developer. For future-proofing we don't want oCIS mission critical components to depend on external repositories, so we're including refs/pman module as an oCIS package instead.
https://github.com/owncloud/ocis/pull/1483

View File

@@ -31,10 +31,13 @@ require (
github.com/owncloud/ocis/thumbnails v0.1.6
github.com/owncloud/ocis/web v0.0.0-00010101000000-000000000000
github.com/owncloud/ocis/webdav v0.0.0-00010101000000-000000000000
github.com/refs/pman v0.0.0-20201214134707-9ce4dcebbbf8
github.com/restic/calens v0.2.0
github.com/rs/zerolog v1.20.0
github.com/spf13/cobra v1.0.0
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.7.0
go.opencensus.io v0.22.5
golang.org/x/sys v0.0.0-20201101102859-da207088b7d1
)
replace (

View File

@@ -1064,6 +1064,7 @@ github.com/ovh/go-ovh v0.0.0-20181109152953-ba5adb4cf014/go.mod h1:joRatxRJaZBsY
github.com/owncloud/flaex v0.0.0-20200411150708-dce59891a203/go.mod h1:jip86t4OVURJTf8CM/0e2qcji/Y4NG3l2lR8kex4JWw=
github.com/owncloud/flaex v0.2.0 h1:3FLf8oyMgA6HLK7w4+VJ5N1oVA8G7MptLCVjfxxIaww=
github.com/owncloud/flaex v0.2.0/go.mod h1:jip86t4OVURJTf8CM/0e2qcji/Y4NG3l2lR8kex4JWw=
github.com/owncloud/ocis v1.0.0 h1:gtQZZSEzbSRucGPl3Ag5BOelZuw2OGv2e6WLAJMvQTQ=
github.com/owncloud/ocis-graph v0.0.0-20200318175820-9a5a6e029db7 h1:gT0GyIOoR7XtpZ7sIxVJSckcz/nueGB1Cm1xNaflXQ0=
github.com/owncloud/ocis-graph v0.0.0-20200318175820-9a5a6e029db7/go.mod h1:IRm6BBJqyPhYI+3fm5bWkhgFL/yh63ASUznFqN4yXgs=
github.com/owncloud/ocis-graph-explorer v0.0.0-20200210111049-017eeb40dc0c h1:8g3u2JwOMP/UE+0B+YjV7UWEQzyCPbnZzwIto6lNc0I=
@@ -1159,6 +1160,8 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/refs/pman v0.0.0-20201214134707-9ce4dcebbbf8 h1:7Pz7RrqgDeEp/nTGK55clOWODZ4+2D7J5K0VFOiYHU0=
github.com/refs/pman v0.0.0-20201214134707-9ce4dcebbbf8/go.mod h1:EHI7tmxO5Ct3xBLHU43j51o5/U6VQz0TM6ik8RPE570=
github.com/refs/pman v0.0.0-20210125101615-7406747552bc h1:bh42GGXCDC/sVwu/1bYm4BCHBeXeX1qY8WSokQYuykQ=
github.com/refs/pman v0.0.0-20210125101615-7406747552bc/go.mod h1:OQcXtpEk2nPkbM32jtWRIyZHTTFReKIFmTrJb06pVY4=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/restic/calens v0.1.0/go.mod h1:u67f5msOjCTDYNzOf/NoAUSdmXP03YXPCwIQLYADy5M=
@@ -1279,6 +1282,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/studio-b12/gowebdav v0.0.0-20200303150724-9380631c29a1 h1:TPyHV/OgChqNcnYqCoCvIFjR9TU60gFXXBKnhOBzVEI=
github.com/studio-b12/gowebdav v0.0.0-20200303150724-9380631c29a1/go.mod h1:gCcfDlA1Y7GqOaeEKw5l9dOGx1VLdc/HuQSlQAaZ30s=
github.com/subosito/gotenv v1.1.1/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=

View File

@@ -7,6 +7,7 @@ import (
accounts "github.com/owncloud/ocis/accounts/pkg/config"
glauth "github.com/owncloud/ocis/glauth/pkg/config"
konnectd "github.com/owncloud/ocis/konnectd/pkg/config"
pman "github.com/owncloud/ocis/ocis/pkg/runtime/config"
ocs "github.com/owncloud/ocis/ocs/pkg/config"
onlyoffice "github.com/owncloud/ocis/onlyoffice/pkg/config"
proxy "github.com/owncloud/ocis/proxy/pkg/config"
@@ -16,7 +17,6 @@ import (
thumbnails "github.com/owncloud/ocis/thumbnails/pkg/config"
web "github.com/owncloud/ocis/web/pkg/config"
webdav "github.com/owncloud/ocis/webdav/pkg/config"
pman "github.com/refs/pman/pkg/config"
)
// Log defines the available logging configuration.

View File

@@ -0,0 +1,68 @@
# ownCloud Infinite Scale: Runtime
Pman is a slim utility library for supervising long-running processes. It can be [embedded](https://github.com/owncloud/OCIS/blob/ea2a2b328e7261ed72e65adf48359c0a44e14b40/OCIS/pkg/runtime/runtime.go#L84) or used as a cli command.
When used as a CLI command it relays actions to a running runtime.
## Usage
Start a runtime
```go
package main
import "github.com/owncloud/ocis/ocis/pkg/runtime/service"
func main() {
service.Start()
}
```
![start runtime](https://imgur.com/F67hgQk.gif)
Start sending messages
![message runtime](https://imgur.com/O71RlsJ.gif)
## Example
```go
package main
import (
"fmt"
"github.com/owncloud/ocis/ocis/pkg/runtime/process"
"github.com/owncloud/ocis/ocis/pkg/runtime/service"
"github.com/rs/zerolog/log"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
s := service.NewService()
var c = make(chan os.Signal, 1)
var o int
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
if err := s.Start(process.NewProcEntry("ocs", nil, "ocs"), &o); err != nil {
os.Exit(1)
}
time.AfterFunc(3*time.Second, func() {
var acc = "ocs"
fmt.Printf(fmt.Sprintf("shutting down service: %s", acc))
if err := s.Controller.Kill(&acc); err != nil {
log.Fatal()
}
os.Exit(0)
})
for {
select {
case <-c:
return
}
}
}
```
Run the above example with `RUNTIME_KEEP_ALIVE=true` and with no `RUNTIME_KEEP_ALIVE` set to see its behavior. It requires an [OCIS binary](https://github.com/owncloud/ocis/releases) present in your `$PATH` for it to work.

View File

@@ -0,0 +1,35 @@
package cmd
import (
"fmt"
"log"
"net"
"net/rpc"
"github.com/owncloud/ocis/ocis/pkg/runtime/config"
"github.com/spf13/cobra"
)
// Kill an extension.
func Kill(cfg *config.Config) *cobra.Command {
return &cobra.Command{
Use: "kill",
Aliases: []string{"k"},
Short: "Kill a running extensions.",
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
client, err := rpc.DialHTTP("tcp", net.JoinHostPort(cfg.Hostname, cfg.Port))
if err != nil {
log.Fatal("dialing:", err)
}
var arg1 int
if err := client.Call("Service.Kill", &args[0], &arg1); err != nil {
log.Fatal(err)
}
fmt.Println(arg1)
},
}
}

View File

@@ -0,0 +1,34 @@
package cmd
import (
"fmt"
"log"
"net"
"net/rpc"
"github.com/owncloud/ocis/ocis/pkg/runtime/config"
"github.com/spf13/cobra"
)
// List running extensions.
func List(cfg *config.Config) *cobra.Command {
return &cobra.Command{
Use: "list",
Aliases: []string{"r"},
Short: "List running extensions",
Run: func(cmd *cobra.Command, args []string) {
client, err := rpc.DialHTTP("tcp", net.JoinHostPort(cfg.Hostname, cfg.Port))
if err != nil {
log.Fatal("dialing:", err)
}
var arg1 string
if err := client.Call("Service.List", struct{}{}, &arg1); err != nil {
log.Fatal(err)
}
fmt.Println(arg1)
},
}
}

View File

@@ -0,0 +1,30 @@
package cmd
import (
"github.com/owncloud/ocis/ocis/pkg/runtime/config"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var (
rootCmd = &cobra.Command{
Use: "pman",
Short: "RPC Process Manager",
}
)
// RootCmd returns a configured root command.
func RootCmd(cfg *config.Config) *cobra.Command {
rootCmd.PersistentFlags().StringVarP(&cfg.Hostname, "hostname", "n", "localhost", "host with a running OCIS runtime.")
rootCmd.PersistentFlags().StringVarP(&cfg.Port, "port", "p", "10666", "port to send messages to the rpc OCIS runtime.")
rootCmd.PersistentFlags().BoolVarP(&cfg.KeepAlive, "keep-alive", "k", false, "restart supervised processes that abruptly die.")
viper.BindPFlag("hostname", rootCmd.PersistentFlags().Lookup("hostname"))
viper.BindPFlag("port", rootCmd.PersistentFlags().Lookup("port"))
rootCmd.AddCommand(List(cfg))
rootCmd.AddCommand(Run(cfg))
rootCmd.AddCommand(Kill(cfg))
return rootCmd
}

View File

@@ -0,0 +1,37 @@
package cmd
import (
"fmt"
"log"
"net"
"net/rpc"
"os"
"github.com/owncloud/ocis/ocis/pkg/runtime/config"
"github.com/owncloud/ocis/ocis/pkg/runtime/process"
"github.com/spf13/cobra"
)
// Run an extension.
func Run(cfg *config.Config) *cobra.Command {
return &cobra.Command{
Use: "run",
Short: "Run an extension.",
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
client, err := rpc.DialHTTP("tcp", net.JoinHostPort(cfg.Hostname, cfg.Port))
if err != nil {
log.Fatal("dialing:", err)
}
proc := process.NewProcEntry(args[0], os.Environ(), []string{args[0]}...)
var res int
if err := client.Call("Service.Start", proc, &res); err != nil {
log.Fatal(err)
}
fmt.Println(res)
},
}
}

View File

@@ -0,0 +1,28 @@
package config
// Config determines behavior across the tool.
type Config struct {
// Hostname where the runtime is running. When using PMAN in cli mode, it determines where the host runtime is.
// Default is localhost.
Hostname string
// Port configures the port where a runtime is available. It defaults to 10666.
Port string
// KeepAlive configures if restart attempts are made if the process supervised terminates. Default is false.
KeepAlive bool
}
var (
defaultHostname = "localhost"
defaultPort = "10666"
)
// NewConfig returns a new config with a set of defaults.
func NewConfig() *Config {
return &Config{
Hostname: defaultHostname,
Port: defaultPort,
KeepAlive: false,
}
}

View File

@@ -0,0 +1,168 @@
package controller
import (
"fmt"
"os"
"os/exec"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/owncloud/ocis/ocis/pkg/runtime/config"
"github.com/owncloud/ocis/ocis/pkg/runtime/process"
"github.com/owncloud/ocis/ocis/pkg/runtime/storage"
"github.com/owncloud/ocis/ocis/pkg/runtime/watcher"
"github.com/rs/zerolog"
"github.com/olekukonko/tablewriter"
)
// Controller supervises processes.
type Controller struct {
m *sync.RWMutex
options Options
log zerolog.Logger
Config *config.Config
Store storage.Storage
// Bin is the OCIS single binary name.
Bin string
// BinPath is the OCIS single binary path withing the host machine.
// The Controller needs to know the binary location in order to spawn new extensions.
BinPath string
// Terminated facilitates communication from Watcher <-> Controller. Writes to this
// channel WILL always attempt to restart the crashed process.
Terminated chan process.ProcEntry
}
var (
once = sync.Once{}
)
// NewController initializes a new controller.
func NewController(o ...Option) Controller {
opts := &Options{}
for _, f := range o {
f(opts)
}
c := Controller{
m: &sync.RWMutex{},
options: *opts,
log: *opts.Log,
Bin: "ocis",
Terminated: make(chan process.ProcEntry),
Store: storage.NewMapStorage(),
Config: opts.Config,
}
if opts.Bin != "" {
c.Bin = opts.Bin
}
// Get binary location from $PATH lookup. If not present, it uses arg[0] as entry point.
path, err := exec.LookPath(c.Bin)
if err != nil {
c.log.Debug().Msg("OCIS binary not present in PATH, using Args[0]")
path = os.Args[0]
}
c.BinPath = path
return c
}
// Start and watches a process.
func (c *Controller) Start(pe process.ProcEntry) error {
if pid := c.Store.Load(pe.Extension); pid != 0 {
c.log.Debug().Msg(fmt.Sprintf("extension already running: %s", pe.Extension))
return nil
}
w := watcher.NewWatcher()
if err := pe.Start(c.BinPath); err != nil {
return err
}
// store the spawned child process PID.
if err := c.Store.Store(pe); err != nil {
return err
}
w.Follow(pe, c.Terminated, c.options.Config.KeepAlive)
once.Do(func() {
j := janitor{
time.Second,
c.Store,
}
go j.run()
go detach(c)
})
return nil
}
// Kill a managed process.
// Should a process managed by the runtime be allowed to be killed if the runtime is configured not to?
func (c *Controller) Kill(pe process.ProcEntry) error {
// load stored PID
pid := c.Store.Load(pe.Extension)
// find process in host by PID
p, err := os.FindProcess(pid)
if err != nil {
return err
}
if err := c.Store.Delete(pe); err != nil {
return err
}
c.log.Info().Str("package", "watcher").Msgf("terminating %v", pe.Extension)
// terminate child process
return p.Kill()
}
// Shutdown a running runtime.
func (c *Controller) Shutdown(ch chan struct{}) error {
entries := c.Store.LoadAll()
for cmd, pid := range entries {
c.log.Info().Str("package", "watcher").Msgf("gracefully terminating %v", cmd)
p, _ := os.FindProcess(pid)
if err := p.Kill(); err != nil {
return err
}
}
ch <- struct{}{}
return nil
}
// List managed processes.
func (c *Controller) List() string {
tableString := &strings.Builder{}
table := tablewriter.NewWriter(tableString)
table.SetHeader([]string{"Extension", "PID"})
entries := c.Store.LoadAll()
keys := make([]string, 0, len(entries))
for k := range entries {
keys = append(keys, k)
}
sort.Strings(keys)
for _, v := range keys {
table.Append([]string{v, strconv.Itoa(entries[v])})
}
table.Render()
return tableString.String()
}

View File

@@ -0,0 +1,49 @@
package controller
import (
"os"
"os/signal"
"syscall"
"time"
"github.com/owncloud/ocis/ocis/pkg/runtime/process"
"github.com/owncloud/ocis/ocis/pkg/runtime/storage"
)
type janitor struct {
// interval at which db is cleared.
interval time.Duration
store storage.Storage
}
func (j *janitor) run() {
ticker := time.NewTicker(j.interval)
work := make(chan os.Signal, 1)
signal.Notify(work, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT)
for {
select {
case <-work:
return
case <-ticker.C:
j.cleanup()
}
}
}
// cleanup removes orphaned extension + pid that were killed via SIGKILL given the nature of is being un-catchable,
// the only way to update pman's database is by polling.
func (j *janitor) cleanup() {
for name, pid := range j.store.LoadAll() {
// On unix like systems (linux, freebsd, etc) os.FindProcess will never return an error
if p, err := os.FindProcess(pid); err == nil {
if err := p.Signal(syscall.Signal(0)); err != nil {
j.store.Delete(process.ProcEntry{
Pid: pid,
Extension: name,
})
}
}
}
}

View File

@@ -0,0 +1,36 @@
package controller
import (
"github.com/owncloud/ocis/ocis/pkg/runtime/config"
"github.com/rs/zerolog"
)
// Options are the configurable options for a Controller.
type Options struct {
Bin string
Restart bool
Config *config.Config
Log *zerolog.Logger
}
// Option represents an option.
type Option func(o *Options)
// NewOptions returns a new Options struct.
func NewOptions() Options {
return Options{}
}
// WithConfig sets Controller config.
func WithConfig(cfg *config.Config) Option {
return func(o *Options) {
o.Config = cfg
}
}
// WithLog sets Controller config.
func WithLog(l *zerolog.Logger) Option {
return func(o *Options) {
o.Log = l
}
}

View File

@@ -0,0 +1,10 @@
package controller
// detach will try to restart processes on failures.
func detach(c *Controller) {
for proc := range c.Terminated {
if err := c.Start(proc); err != nil {
c.log.Err(err)
}
}
}

View File

@@ -0,0 +1,30 @@
package log
import (
"os"
"time"
"github.com/rs/zerolog"
)
var (
// Level sets a project wide log level
Level zerolog.Level = zerolog.InfoLevel
)
// NewLogger configures a logger.
func NewLogger(options ...Option) zerolog.Logger {
zerolog.SetGlobalLevel(Level)
o := NewOptions()
for _, f := range options {
f(o)
}
logger := zerolog.New(os.Stdout).With().Timestamp().Logger()
if o.Pretty {
logger = logger.Output(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.RFC3339})
}
return logger
}

View File

@@ -0,0 +1,26 @@
package log
import "github.com/rs/zerolog"
// Options are the configurable options for a Controller.
type Options struct {
Level zerolog.Level
Pretty bool
}
// Option represents an option.
type Option func(o *Options)
// NewOptions returns a new Options struct.
func NewOptions() *Options {
return &Options{
Level: zerolog.DebugLevel,
}
}
// WithPretty sets the pretty option.
func WithPretty(pretty bool) Option {
return func(o *Options) {
o.Pretty = pretty
}
}

View File

@@ -0,0 +1,60 @@
// +build !windows
package process
import (
"os"
sys "golang.org/x/sys/unix"
)
// ProcEntry is an entry in the File db.
type ProcEntry struct {
Args []string
Env []string
Pid int
Extension string
}
// NewProcEntry returns a new ProcEntry.
func NewProcEntry(extension string, env []string, args ...string) ProcEntry {
return ProcEntry{
Extension: extension,
Args: args,
Env: env,
}
}
// Start a process.
func (e *ProcEntry) Start(binPath string) error {
var argv = []string{binPath}
argv = append(argv, e.Args...)
p, err := os.StartProcess(binPath, argv, &os.ProcAttr{
Files: []*os.File{
os.Stdin,
os.Stdout,
os.Stderr,
},
Env: e.Env,
Sys: &sys.SysProcAttr{
Setpgid: true,
},
})
if err != nil {
return err
}
e.Pid = p.Pid
return nil
}
// Kill the wrapped process.
func (e *ProcEntry) Kill() error {
p, err := os.FindProcess(e.Pid)
if err != nil {
return err
}
return p.Kill()
}

View File

@@ -0,0 +1,56 @@
// +build windows
package process
import (
"os"
)
// ProcEntry is an entry in the File db.
type ProcEntry struct {
Args []string
Env []string
Pid int
Extension string
}
// NewProcEntry returns a new ProcEntry.
func NewProcEntry(extension string, env []string, args ...string) ProcEntry {
return ProcEntry{
Extension: extension,
Args: args,
Env: env,
}
}
// Start a process.
func (e *ProcEntry) Start(binPath string) error {
var argv = []string{binPath}
argv = append(argv, e.Args...)
p, err := os.StartProcess(binPath, argv, &os.ProcAttr{
Files: []*os.File{
os.Stdin,
os.Stdout,
os.Stderr,
},
Env: e.Env,
})
if err != nil {
return err
}
e.Pid = p.Pid
return nil
}
// Kill the wrapped process.
func (e *ProcEntry) Kill() error {
p, err := os.FindProcess(e.Pid)
if err != nil {
return err
}
return p.Kill()
}

View File

@@ -15,8 +15,8 @@ import (
"github.com/micro/micro/v2/client/api"
"github.com/micro/micro/v2/service/registry"
"github.com/refs/pman/pkg/process"
"github.com/refs/pman/pkg/service"
"github.com/owncloud/ocis/ocis/pkg/runtime/process"
"github.com/owncloud/ocis/ocis/pkg/runtime/service"
)
var (
@@ -80,7 +80,9 @@ func New(cfg *config.Config) Runtime {
// Start rpc runtime
func (r *Runtime) Start() error {
go r.Launch()
return service.Start()
return service.Start(
service.WithLogPretty(r.c.Log.Pretty),
)
}
// Launch ocis default ocis extensions.

View File

@@ -0,0 +1,28 @@
package service
// Log configures a structure logger.
type Log struct {
Pretty bool
}
// Options are the configurable options for a Service.
type Options struct {
Log *Log
}
// Option represents an option.
type Option func(o *Options)
// NewOptions returns a new Options struct.
func NewOptions() *Options {
return &Options{
Log: &Log{},
}
}
// WithLogPretty sets Controller config.
func WithLogPretty(pretty bool) Option {
return func(o *Options) {
o.Log.Pretty = pretty
}
}

View File

@@ -0,0 +1,164 @@
package service
import (
"fmt"
"net"
"net/http"
"net/rpc"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"github.com/owncloud/ocis/ocis/pkg/runtime/config"
"github.com/owncloud/ocis/ocis/pkg/runtime/controller"
"github.com/owncloud/ocis/ocis/pkg/runtime/log"
"github.com/owncloud/ocis/ocis/pkg/runtime/process"
"github.com/rs/zerolog"
"github.com/spf13/viper"
)
var (
halt = make(chan os.Signal, 1)
done = make(chan struct{}, 1)
)
// Service represents a RPC service.
type Service struct {
Controller controller.Controller
Log zerolog.Logger
wg *sync.WaitGroup
done bool
}
// loadFromEnv would set cmd global variables. This is a workaround spf13/viper since pman used as a library does not
// parse flags.
func loadFromEnv() *config.Config {
cfg := config.NewConfig()
viper.AutomaticEnv()
viper.BindEnv("keep-alive", "RUNTIME_KEEP_ALIVE")
viper.BindEnv("port", "RUNTIME_PORT")
cfg.KeepAlive = viper.GetBool("keep-alive")
if viper.GetString("port") != "" {
cfg.Port = viper.GetString("port")
}
return cfg
}
// NewService returns a configured service with a controller and a default logger.
// When used as a library, flags are not parsed, and in order to avoid introducing a global state with init functions
// calls are done explicitly to loadFromEnv().
// Since this is the public constructor, options need to be added, at the moment only logging options
// are supported in order to match the running OwnCloud services structured log.
func NewService(options ...Option) *Service {
opts := NewOptions()
for _, f := range options {
f(opts)
}
cfg := loadFromEnv()
l := log.NewLogger(
log.WithPretty(opts.Log.Pretty),
)
return &Service{
wg: &sync.WaitGroup{},
Log: l,
Controller: controller.NewController(
controller.WithConfig(cfg),
controller.WithLog(&l),
),
}
}
// Start an rpc service.
func Start(o ...Option) error {
s := NewService(o...)
if err := rpc.Register(s); err != nil {
s.Log.Fatal().Err(err)
}
rpc.HandleHTTP()
signal.Notify(halt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP)
l, err := net.Listen("tcp", fmt.Sprintf("%v:%v", s.Controller.Config.Hostname, s.Controller.Config.Port))
if err != nil {
s.Log.Fatal().Err(err)
}
// handle panic within the Service scope.
defer func() {
if r := recover(); r != nil {
reason := strings.Builder{}
if _, err := net.Dial("localhost", s.Controller.Config.Port); err != nil {
reason.WriteString("runtime address already in use")
}
fmt.Println(reason.String())
}
}()
go trap(s)
return http.Serve(l, nil)
}
// Start indicates the Service Controller to start a new supervised service as an OS thread.
func (s *Service) Start(args process.ProcEntry, reply *int) error {
if !s.done {
s.wg.Add(1)
s.Log.Info().Str("service", args.Extension).Msgf("%v", "started")
if err := s.Controller.Start(args); err != nil {
*reply = 1
return err
}
*reply = 0
s.wg.Done()
}
return nil
}
// List running processes for the Service Controller.
func (s *Service) List(args struct{}, reply *string) error {
*reply = s.Controller.List()
return nil
}
// Kill a supervised process by subcommand name.
func (s *Service) Kill(args *string, reply *int) error {
pe := process.ProcEntry{
Extension: *args,
}
if err := s.Controller.Kill(pe); err != nil {
*reply = 1
return err
}
*reply = 0
return nil
}
// trap blocks on halt channel. When the runtime is interrupted it
// signals the controller to stop any supervised process.
func trap(s *Service) {
<-halt
s.done = true
s.wg.Wait()
s.Log.Debug().
Str("service", "runtime service").
Msgf("terminating with signal: %v", s)
if err := s.Controller.Shutdown(done); err != nil {
s.Log.Err(err)
}
close(done)
os.Exit(0)
}

View File

@@ -0,0 +1,64 @@
package storage
import (
"github.com/owncloud/ocis/ocis/pkg/runtime/process"
"sync"
)
// Map synchronizes access to extension+pid tuples.
type Map struct {
c *sync.Map
}
// NewMapStorage initializes a new Storage.
func NewMapStorage() Storage {
return &Map{
c: &sync.Map{},
}
}
// Store a value on the underlying data structure.
func (m *Map) Store(e process.ProcEntry) error {
m.c.Store(e.Extension, e.Pid)
return nil
}
// Delete a value on the underlying data structure.
func (m *Map) Delete(e process.ProcEntry) error {
m.c.Delete(e.Extension)
return nil
}
// Load a single pid.
func (m *Map) Load(name string) int {
var val int
m.c.Range(func(k, v interface{}) bool {
if k.(string) == name {
val = v.(int)
return false
}
return true
})
return val
}
// LoadAll values from the underlying data structure.
func (m *Map) LoadAll() Entries {
e := make(map[string]int)
m.c.Range(func(k, v interface{}) bool {
ks, ok := k.(string)
if !ok {
return false
}
vs, ok := v.(int)
if !ok {
return false
}
e[ks] = vs
return true
})
return e
}

View File

@@ -0,0 +1,43 @@
package storage
import (
"fmt"
"math/rand"
"os"
"strconv"
"testing"
"github.com/owncloud/ocis/ocis/pkg/runtime/process"
"github.com/stretchr/testify/assert"
)
func TestMain(m *testing.M) {
loadStore()
os.Exit(m.Run())
}
var (
store = NewMapStorage()
)
func loadStore() {
for i := 0; i < 20; i++ {
store.Store(process.ProcEntry{
Pid: rand.Int(),
Extension: fmt.Sprintf("extension-%s", strconv.Itoa(i)),
})
}
}
func TestLoadAll(t *testing.T) {
all := store.LoadAll()
assert.NotNil(t, all["extension-1"])
}
func TestDelete(t *testing.T) {
store.Delete(process.ProcEntry{
Extension: "extension-1",
})
all := store.LoadAll()
assert.Zero(t, all["extension-1"])
}

View File

@@ -0,0 +1,21 @@
package storage
import "github.com/owncloud/ocis/ocis/pkg/runtime/process"
// Entries is a tuple of <extension:pid>
type Entries map[string]int
// Storage defines a basic persistence interface layer.
type Storage interface {
// Store a representation of a process.
Store(e process.ProcEntry) error
// Delete a representation of a process.
Delete(e process.ProcEntry) error
// Load a single entry.
Load(name string) int
// LoadAll retrieves a set of entries of running processes on the host machine.
LoadAll() Entries
}

View File

@@ -0,0 +1,55 @@
package watcher
import (
golog "log"
"os"
"github.com/owncloud/ocis/ocis/pkg/runtime/log"
"github.com/owncloud/ocis/ocis/pkg/runtime/process"
"github.com/rs/zerolog"
)
// Watcher watches a process and sends messages using channels.
type Watcher struct {
log zerolog.Logger
}
// NewWatcher initializes a watcher.
func NewWatcher() Watcher {
return Watcher{
log: log.NewLogger(log.WithPretty(true)),
}
}
// Follow a process until it dies. If restart is enabled, a new fork of the original process will be automatically spawned.
func (w *Watcher) Follow(pe process.ProcEntry, followerChan chan process.ProcEntry, restart bool) {
state := make(chan *os.ProcessState, 1)
w.log.Debug().Str("package", "watcher").Msgf("watching %v", pe.Extension)
go func() {
ps, err := watch(pe.Pid)
if err != nil {
golog.Fatal(err)
}
state <- ps
}()
go func() {
status := <-state
w.log.Info().Str("package", "watcher").Msgf("%v exited with: %v", pe.Extension, status)
if restart {
followerChan <- pe
}
}()
}
// watch a process by its pid. This operation blocks.
func watch(pid int) (*os.ProcessState, error) {
p, err := os.FindProcess(pid)
if err != nil {
return nil, err
}
return p.Wait()
}