Add killsignal and killtimeout to exec/rtsp

This commit is contained in:
Alex X
2024-06-16 19:03:57 +03:00
parent a56d335380
commit da5f060741
3 changed files with 70 additions and 78 deletions
+31 -22
View File
@@ -1,10 +1,12 @@
package exec
import (
"bufio"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
"net/url"
"os"
"os/exec"
@@ -49,8 +51,10 @@ func Init() {
}
func execHandle(rawURL string) (core.Producer, error) {
rawURL, rawQuery, _ := strings.Cut(rawURL, "#")
query := streams.ParseQuery(rawQuery)
var path string
var query url.Values
// RTSP flow should have `{output}` inside URL
// pipe flow may have `#{params}` inside URL
@@ -62,9 +66,6 @@ func execHandle(rawURL string) (core.Producer, error) {
sum := md5.Sum([]byte(rawURL))
path = "/" + hex.EncodeToString(sum[:])
rawURL = rawURL[:i] + "rtsp://127.0.0.1:" + rtsp.Port + path + rawURL[i+8:]
} else if i = strings.IndexByte(rawURL, '#'); i > 0 {
query = streams.ParseQuery(rawURL[i+1:])
rawURL = rawURL[:i]
}
args := shell.QuoteSplit(rawURL[5:]) // remove `exec:`
@@ -74,23 +75,34 @@ func execHandle(rawURL string) (core.Producer, error) {
debug: log.Debug().Enabled(),
}
if path == "" {
return handlePipe(rawURL, cmd, query)
}
return handleRTSP(rawURL, cmd, path)
}
func handlePipe(source string, cmd *exec.Cmd, query url.Values) (core.Producer, error) {
if query.Get("backchannel") == "1" {
return stdin.NewClient(cmd)
}
r, err := PipeCloser(cmd, query)
cl := &closer{cmd: cmd, query: query}
if path == "" {
return handlePipe(rawURL, cmd, cl)
}
return handleRTSP(rawURL, cmd, cl, path)
}
func handlePipe(source string, cmd *exec.Cmd, cl io.Closer) (core.Producer, error) {
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
rc := struct {
io.Reader
io.Closer
}{
// add buffer for pipe reader to reduce syscall
bufio.NewReaderSize(stdout, core.BufferSize),
cl,
}
log.Debug().Strs("args", cmd.Args).Msg("[exec] run pipe")
ts := time.Now()
@@ -99,9 +111,9 @@ func handlePipe(source string, cmd *exec.Cmd, query url.Values) (core.Producer,
return nil, err
}
prod, err := magic.Open(r)
prod, err := magic.Open(rc)
if err != nil {
_ = r.Close()
_ = rc.Close()
return nil, fmt.Errorf("exec/pipe: %w\n%s", err, cmd.Stderr)
}
@@ -115,7 +127,7 @@ func handlePipe(source string, cmd *exec.Cmd, query url.Values) (core.Producer,
return prod, nil
}
func handleRTSP(source string, cmd *exec.Cmd, path string) (core.Producer, error) {
func handleRTSP(source string, cmd *exec.Cmd, cl io.Closer, path string) (core.Producer, error) {
if log.Trace().Enabled() {
cmd.Stdout = os.Stdout
}
@@ -147,9 +159,9 @@ func handleRTSP(source string, cmd *exec.Cmd, path string) (core.Producer, error
}()
select {
case <-time.After(time.Second * 60):
_ = cmd.Process.Kill()
case <-time.After(time.Minute):
log.Error().Str("source", source).Msg("[exec] timeout")
_ = cl.Close()
return nil, errors.New("exec: timeout")
case <-done:
// limit message size
@@ -157,10 +169,7 @@ func handleRTSP(source string, cmd *exec.Cmd, path string) (core.Producer, error
case prod := <-waiter:
log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run rtsp")
setRemoteInfo(prod, source, cmd.Args)
prod.OnClose = func() error {
log.Debug().Msgf("[exec] kill rtsp")
return errors.Join(cmd.Process.Kill(), cmd.Wait())
}
prod.OnClose = cl.Close
return prod, nil
}
}