feat: Add signal related params to exec

This commit is contained in:
dadav
2024-01-10 20:20:00 +01:00
parent 4b62a6e34f
commit 152719441e
6 changed files with 358 additions and 156 deletions
+20 -7
View File
@@ -20,6 +20,12 @@ import (
"github.com/rs/zerolog"
)
type Params struct {
KillSignal os.Signal
Command string
KillTimeout time.Duration
}
func Init() {
rtsp.HandleFunc(func(conn *pkg.Conn) bool {
waitersMu.Lock()
@@ -47,7 +53,12 @@ func Init() {
func execHandle(url string) (core.Producer, error) {
var path string
args := shell.QuoteSplit(url[5:]) // remove `exec:`
params, err := parseParams(url)
if err != nil {
return nil, err
}
args := shell.QuoteSplit(params.Command[5:]) // remove `exec:`
for i, arg := range args {
if arg == "{output}" {
if rtsp.Port == "" {
@@ -67,14 +78,14 @@ func execHandle(url string) (core.Producer, error) {
}
if path == "" {
return handlePipe(url, cmd)
return handlePipe(url, cmd, params)
}
return handleRTSP(url, path, cmd)
}
func handlePipe(url string, cmd *exec.Cmd) (core.Producer, error) {
r, err := PipeCloser(cmd)
func handlePipe(_ string, cmd *exec.Cmd, params *Params) (core.Producer, error) {
r, err := PipeCloser(cmd, params)
if err != nil {
return nil, err
}
@@ -144,6 +155,8 @@ func handleRTSP(url, path string, cmd *exec.Cmd) (core.Producer, error) {
// internal
var log zerolog.Logger
var waiters = map[string]chan core.Producer{}
var waitersMu sync.Mutex
var (
log zerolog.Logger
waiters = map[string]chan core.Producer{}
waitersMu sync.Mutex
)
+34
View File
@@ -0,0 +1,34 @@
//go:build !linux
package exec
import (
"fmt"
"net/url"
"runtime"
"strings"
"github.com/AlexxIT/go2rtc/internal/streams"
)
func parseParams(s string) (*Params, error) {
args := &Params{
Command: s,
}
var query url.Values
if i := strings.IndexByte(s, '#'); i > 0 {
query = streams.ParseQuery(s[i+1:])
args.Command = s[:i]
}
if _, ok := query["killsignal"]; ok {
return nil, fmt.Errorf("killsignal is not supported this %s", runtime.GOOS)
}
if _, ok := query["killtimeout"]; ok {
return nil, fmt.Errorf("killtimeout is not supported in %s", runtime.GOOS)
}
return args, nil
}
+88
View File
@@ -0,0 +1,88 @@
package exec
import (
"fmt"
"net/url"
"os"
"strconv"
"strings"
"syscall"
"time"
"github.com/AlexxIT/go2rtc/internal/streams"
)
func parseParams(s string) (*Params, error) {
args := &Params{
KillSignal: syscall.SIGKILL,
KillTimeout: 5 * time.Second,
Command: s,
}
var query url.Values
if i := strings.IndexByte(s, '#'); i > 0 {
query = streams.ParseQuery(s[i+1:])
args.Command = s[:i]
}
if val, ok := query["killsignal"]; ok {
if sig, err := parseSignal(val[0]); err == nil {
args.KillSignal = sig
} else {
return nil, fmt.Errorf("could not parse killsignal param (%s)", val[0])
}
}
if val, ok := query["killtimeout"]; ok {
if i, err := strconv.Atoi(val[0]); err == nil {
args.KillTimeout = time.Duration(i) * time.Second
} else {
return nil, fmt.Errorf("could not convert killtimeout param (%s) to int", val[0])
}
}
return args, nil
}
func parseSignal(signalString string) (os.Signal, error) {
signalMap := map[string]os.Signal{
"sighup": syscall.SIGHUP,
"sigint": syscall.SIGINT,
"sigquit": syscall.SIGQUIT,
"sigill": syscall.SIGILL,
"sigtrap": syscall.SIGTRAP,
"sigabrt": syscall.SIGABRT,
"sigbus": syscall.SIGBUS,
"sigfpe": syscall.SIGFPE,
"sigkill": syscall.SIGKILL,
"sigusr1": syscall.SIGUSR1,
"sigsegv": syscall.SIGSEGV,
"sigusr2": syscall.SIGUSR2,
"sigpipe": syscall.SIGPIPE,
"sigalrm": syscall.SIGALRM,
"sigterm": syscall.SIGTERM,
"sigchld": syscall.SIGCHLD,
"sigcont": syscall.SIGCONT,
"sigstop": syscall.SIGSTOP,
"sigtstp": syscall.SIGTSTP,
"sigttin": syscall.SIGTTIN,
"sigttou": syscall.SIGTTOU,
"sigurg": syscall.SIGURG,
"sigxcpu": syscall.SIGXCPU,
"sigxfsz": syscall.SIGXFSZ,
"sigvtalrm": syscall.SIGVTALRM,
"sigprof": syscall.SIGPROF,
"sigwinch": syscall.SIGWINCH,
"sigio": syscall.SIGIO,
"sigpoll": syscall.SIGPOLL,
"sigpwr": syscall.SIGPWR,
"sigsys": syscall.SIGSYS,
}
signalValue, ok := signalMap[strings.ToLower(signalString)]
if !ok {
return nil, fmt.Errorf("invalid signal: %s", signalString)
}
return signalValue, nil
}
+11 -4
View File
@@ -1,3 +1,5 @@
//go:build !linux
package exec
import (
@@ -9,22 +11,27 @@ import (
)
// PipeCloser - return StdoutPipe that Kill cmd on Close call
func PipeCloser(cmd *exec.Cmd) (io.ReadCloser, error) {
func PipeCloser(cmd *exec.Cmd, params *Params) (io.ReadCloser, error) {
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
// add buffer for pipe reader to reduce syscall
return pipeCloser{bufio.NewReaderSize(stdout, core.BufferSize), stdout, cmd}, nil
return pipeCloser{bufio.NewReaderSize(stdout, core.BufferSize), stdout, cmd, params}, nil
}
type pipeCloser struct {
io.Reader
io.Closer
cmd *exec.Cmd
cmd *exec.Cmd
params *Params
}
func (p pipeCloser) Close() error {
return core.Any(p.Closer.Close(), p.cmd.Process.Kill(), p.cmd.Wait())
finished := make(chan bool)
err := core.Any(p.Closer.Close(), p.cmd.Process.Kill(), p.cmd.Wait())
finished <- true
return err
}
+48
View File
@@ -0,0 +1,48 @@
package exec
import (
"bufio"
"io"
"os/exec"
"syscall"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
)
// PipeCloser - return StdoutPipe that Kill cmd on Close call
func PipeCloser(cmd *exec.Cmd, params *Params) (io.ReadCloser, error) {
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
// add buffer for pipe reader to reduce syscall
return pipeCloser{bufio.NewReaderSize(stdout, core.BufferSize), stdout, cmd, params}, nil
}
type pipeCloser struct {
io.Reader
io.Closer
cmd *exec.Cmd
params *Params
}
func (p pipeCloser) Close() error {
finished := make(chan bool)
if p.params.KillSignal != syscall.SIGKILL {
go func() {
select {
case <-time.After(p.params.KillTimeout):
p.cmd.Process.Kill()
break
case <-finished:
break
}
}()
}
err := core.Any(p.Closer.Close(), p.cmd.Process.Signal(p.params.KillSignal), p.cmd.Wait())
finished <- true
return err
}