Merge remote-tracking branch 'upstream/master' into refactr-syscall-more-generic

This commit is contained in:
Sergey Krashevich
2024-05-07 14:45:48 +03:00
32 changed files with 721 additions and 320 deletions
+5 -1
View File
@@ -11,7 +11,7 @@ import (
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/app"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
"github.com/rs/zerolog"
)
func Init() {
@@ -23,11 +23,15 @@ func Init() {
app.LoadConfig(&cfg)
log = app.GetLogger("api")
initWS(cfg.Mod.Origin)
api.HandleFunc("api/ws", apiWS)
}
var log zerolog.Logger
// Message - struct for data exchange in Web API
type Message struct {
Type string `json:"type"`
+1 -1
View File
@@ -15,7 +15,7 @@ import (
"github.com/rs/zerolog/log"
)
var Version = "1.8.5"
var Version = "1.9.1"
var UserAgent = "go2rtc/" + Version
var ConfigPath string
+57 -30
View File
@@ -5,6 +5,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"io"
"net/url"
"os"
"os/exec"
@@ -19,6 +20,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/magic"
pkg "github.com/AlexxIT/go2rtc/pkg/rtsp"
"github.com/AlexxIT/go2rtc/pkg/shell"
"github.com/AlexxIT/go2rtc/pkg/stdin"
"github.com/rs/zerolog"
)
@@ -48,37 +50,41 @@ func Init() {
func execHandle(rawURL string) (core.Producer, error) {
var path string
var query url.Values
rawURL, rawQuery, _ := strings.Cut(rawURL, "#")
args := shell.QuoteSplit(rawURL[5:]) // remove `exec:`
for i, arg := range args {
if arg == "{output}" {
if rtsp.Port == "" {
return nil, errors.New("rtsp module disabled")
}
sum := md5.Sum([]byte(rawURL))
path = "/" + hex.EncodeToString(sum[:])
args[i] = "rtsp://127.0.0.1:" + rtsp.Port + path
break
// RTSP flow should have `{output}` inside URL
// pipe flow may have `#{params}` inside URL
if i := strings.Index(rawURL, "{output}"); i > 0 {
if rtsp.Port == "" {
return nil, errors.New("exec: rtsp module disabled")
}
sum := md5.Sum([]byte(rawURL))
path = "/" + hex.EncodeToString(sum[:])
rawURL = rawURL[:i] + "rtsp://127.0.0.1:" + rtsp.Port + path + rawURL[i+8:]
} else if i = strings.IndexByte(rawURL, '#'); i > 0 {
query = streams.ParseQuery(rawURL[i+1:])
rawURL = rawURL[:i]
}
args := shell.QuoteSplit(rawURL[5:]) // remove `exec:`
cmd := exec.Command(args[0], args[1:]...)
if log.Debug().Enabled() {
cmd.Stderr = os.Stderr
}
if path == "" {
query := streams.ParseQuery(rawQuery)
return handlePipe(rawURL, cmd, query)
}
return handleRTSP(rawURL, path, cmd)
return handleRTSP(rawURL, cmd, path)
}
func handlePipe(_ string, cmd *exec.Cmd, query url.Values) (core.Producer, error) {
if query.Get("backchannel") == "1" {
return stdin.NewClient(cmd)
}
r, err := PipeCloser(cmd, query)
if err != nil {
return nil, err
@@ -96,15 +102,23 @@ func handlePipe(_ string, cmd *exec.Cmd, query url.Values) (core.Producer, error
return prod, err
}
func handleRTSP(url, path string, cmd *exec.Cmd) (core.Producer, error) {
func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) {
stderr := limitBuffer{buf: make([]byte, 512)}
if cmd.Stderr != nil {
cmd.Stderr = io.MultiWriter(cmd.Stderr, &stderr)
} else {
cmd.Stderr = &stderr
}
if log.Trace().Enabled() {
cmd.Stdout = os.Stdout
}
ch := make(chan core.Producer)
waiter := make(chan core.Producer)
waitersMu.Lock()
waiters[path] = ch
waiters[path] = waiter
waitersMu.Unlock()
defer func() {
@@ -122,16 +136,9 @@ func handleRTSP(url, path string, cmd *exec.Cmd) (core.Producer, error) {
return nil, err
}
chErr := make(chan error)
done := make(chan error, 1)
go func() {
err := cmd.Wait()
// unblocking write to channel
select {
case chErr <- err:
default:
log.Trace().Str("url", url).Msg("[exec] close")
}
done <- cmd.Wait()
}()
select {
@@ -139,9 +146,10 @@ func handleRTSP(url, path string, cmd *exec.Cmd) (core.Producer, error) {
_ = cmd.Process.Kill()
log.Error().Str("url", url).Msg("[exec] timeout")
return nil, errors.New("timeout")
case err := <-chErr:
return nil, fmt.Errorf("exec: %s", err)
case prod := <-ch:
case <-done:
// limit message size
return nil, errors.New("exec: " + stderr.String())
case prod := <-waiter:
log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run")
return prod, nil
}
@@ -154,3 +162,22 @@ var (
waiters = map[string]chan core.Producer{}
waitersMu sync.Mutex
)
type limitBuffer struct {
buf []byte
n int
}
func (l *limitBuffer) String() string {
if l.n == len(l.buf) {
return string(l.buf) + "..."
}
return string(l.buf[:l.n])
}
func (l *limitBuffer) Write(p []byte) (int, error) {
if l.n < cap(l.buf) {
l.n += copy(l.buf[l.n:], p)
}
return len(p), nil
}
+7 -1
View File
@@ -7,6 +7,7 @@ import (
"github.com/AlexxIT/go2rtc/internal/app"
"github.com/AlexxIT/go2rtc/internal/ffmpeg/device"
"github.com/AlexxIT/go2rtc/internal/ffmpeg/hardware"
"github.com/AlexxIT/go2rtc/internal/ffmpeg/virtual"
"github.com/AlexxIT/go2rtc/internal/rtsp"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/ffmpeg"
@@ -145,7 +146,7 @@ func parseArgs(s string) *ffmpeg.Args {
}
var query url.Values
if i := strings.IndexByte(s, '#'); i > 0 {
if i := strings.IndexByte(s, '#'); i >= 0 {
query = streams.ParseQuery(s[i+1:])
args.Video = len(query["video"])
args.Audio = len(query["audio"])
@@ -193,6 +194,11 @@ func parseArgs(s string) *ffmpeg.Args {
if err != nil {
return nil
}
} else if strings.HasPrefix(s, "virtual?") {
var err error
if args.Input, err = virtual.GetInput(s[8:]); err != nil {
return nil
}
} else {
args.Input = inputTemplate("file", s, query)
}
+109 -56
View File
@@ -7,29 +7,48 @@ import (
)
func TestParseArgsFile(t *testing.T) {
// [FILE] all tracks will be copied without transcoding codecs
args := parseArgs("/media/bbb.mp4")
require.Equal(t, `ffmpeg -hide_banner -re -i /media/bbb.mp4 -c copy -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
// [FILE] video will be transcoded to H264, audio will be skipped
args = parseArgs("/media/bbb.mp4#video=h264")
require.Equal(t, `ffmpeg -hide_banner -re -i /media/bbb.mp4 -c:v libx264 -g 50 -profile:v high -level:v 4.1 -preset:v superfast -tune:v zerolatency -pix_fmt:v yuv420p -an -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
// [FILE] video will be copied, audio will be transcoded to pcmu
args = parseArgs("/media/bbb.mp4#video=copy#audio=pcmu")
require.Equal(t, `ffmpeg -hide_banner -re -i /media/bbb.mp4 -c:v copy -c:a pcm_mulaw -ar:a 8000 -ac:a 1 -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
// [FILE] video will be transcoded to H265 and rotate 270º, audio will be skipped
args = parseArgs("/media/bbb.mp4#video=h265#rotate=-90")
require.Equal(t, `ffmpeg -hide_banner -re -i /media/bbb.mp4 -c:v libx265 -g 50 -profile:v main -level:v 5.1 -preset:v superfast -tune:v zerolatency -an -vf "transpose=2" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
// [FILE] video will be output for MJPEG to pipe, audio will be skipped
args = parseArgs("/media/bbb.mp4#video=mjpeg")
require.Equal(t, `ffmpeg -hide_banner -re -i /media/bbb.mp4 -c:v mjpeg -an -f mjpeg -`, args.String())
// https://github.com/AlexxIT/go2rtc/issues/509
args = parseArgs("ffmpeg:test.mp4#raw=-ss 00:00:20")
require.Equal(t, `ffmpeg -hide_banner -re -i ffmpeg:test.mp4 -ss 00:00:20 -c copy -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
tests := []struct {
name string
source string
expect string
}{
{
name: "[FILE] all tracks will be copied without transcoding codecs",
source: "/media/bbb.mp4",
expect: `ffmpeg -hide_banner -re -i /media/bbb.mp4 -c copy -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`,
},
{
name: "[FILE] video will be transcoded to H264, audio will be skipped",
source: "/media/bbb.mp4#video=h264",
expect: `ffmpeg -hide_banner -re -i /media/bbb.mp4 -c:v libx264 -g 50 -profile:v high -level:v 4.1 -preset:v superfast -tune:v zerolatency -pix_fmt:v yuv420p -an -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`,
},
{
name: "[FILE] video will be copied, audio will be transcoded to pcmu",
source: "/media/bbb.mp4#video=copy#audio=pcmu",
expect: `ffmpeg -hide_banner -re -i /media/bbb.mp4 -c:v copy -c:a pcm_mulaw -ar:a 8000 -ac:a 1 -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`,
},
{
name: "[FILE] video will be transcoded to H265 and rotate 270º, audio will be skipped",
source: "/media/bbb.mp4#video=h265#rotate=-90",
expect: `ffmpeg -hide_banner -re -i /media/bbb.mp4 -c:v libx265 -g 50 -profile:v main -level:v 5.1 -preset:v superfast -tune:v zerolatency -an -vf "transpose=2" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`,
},
{
name: "[FILE] video will be output for MJPEG to pipe, audio will be skipped",
source: "/media/bbb.mp4#video=mjpeg",
expect: `ffmpeg -hide_banner -re -i /media/bbb.mp4 -c:v mjpeg -an -f mjpeg -`,
},
{
name: "https://github.com/AlexxIT/go2rtc/issues/509",
source: "ffmpeg:test.mp4#raw=-ss 00:00:20",
expect: `ffmpeg -hide_banner -re -i ffmpeg:test.mp4 -ss 00:00:20 -c copy -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
args := parseArgs(test.source)
require.Equal(t, test.expect, args.String())
})
}
}
func TestParseArgsDevice(t *testing.T) {
@@ -87,7 +106,7 @@ func TestParseArgsAudio(t *testing.T) {
// [AUDIO] audio will be transcoded to OPUS, video will be skipped
args = parseArgs("rtsp:///example.com#audio=opus")
require.Equal(t, `ffmpeg -hide_banner -allowed_media_types audio -fflags nobuffer -flags low_delay -timeout 5000000 -user_agent go2rtc/ffmpeg -rtsp_flags prefer_tcp -i rtsp:///example.com -c:a libopus -application:a lowdelay -frame_duration 20 -min_comp 0 -vn -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
require.Equal(t, `ffmpeg -hide_banner -allowed_media_types audio -fflags nobuffer -flags low_delay -timeout 5000000 -user_agent go2rtc/ffmpeg -rtsp_flags prefer_tcp -i rtsp:///example.com -c:a libopus -application:a lowdelay -min_comp 0 -vn -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
// [AUDIO] audio will be transcoded to PCMU, video will be skipped
args = parseArgs("rtsp:///example.com#audio=pcmu")
@@ -115,28 +134,46 @@ func TestParseArgsAudio(t *testing.T) {
}
func TestParseArgsHwVaapi(t *testing.T) {
// [HTTP-MJPEG] video will be transcoded to H264
args := parseArgs("http:///example.com#video=h264#hardware=vaapi")
require.Equal(t, `ffmpeg -hide_banner -hwaccel vaapi -hwaccel_output_format vaapi -hwaccel_flags allow_profile_mismatch -fflags nobuffer -flags low_delay -i http:///example.com -c:v h264_vaapi -g 50 -bf 0 -profile:v high -level:v 4.1 -sei:v 0 -an -vf "format=vaapi|nv12,hwupload,scale_vaapi=out_range=tv" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
// [RTSP] video with rotation, should be transcoded, so select H264
args = parseArgs("rtsp://example.com#video=h264#rotate=180#hardware=vaapi")
require.Equal(t, `ffmpeg -hide_banner -hwaccel vaapi -hwaccel_output_format vaapi -hwaccel_flags allow_profile_mismatch -allowed_media_types video -fflags nobuffer -flags low_delay -timeout 5000000 -user_agent go2rtc/ffmpeg -rtsp_flags prefer_tcp -i rtsp://example.com -c:v h264_vaapi -g 50 -bf 0 -profile:v high -level:v 4.1 -sei:v 0 -an -vf "format=vaapi|nv12,hwupload,transpose_vaapi=4,scale_vaapi=out_range=tv" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
// [RTSP] video with resize to 1280x720, should be transcoded, so select H265
args = parseArgs("rtsp://example.com#video=h265#width=1280#height=720#hardware=vaapi")
require.Equal(t, `ffmpeg -hide_banner -hwaccel vaapi -hwaccel_output_format vaapi -hwaccel_flags allow_profile_mismatch -allowed_media_types video -fflags nobuffer -flags low_delay -timeout 5000000 -user_agent go2rtc/ffmpeg -rtsp_flags prefer_tcp -i rtsp://example.com -c:v hevc_vaapi -g 50 -bf 0 -profile:v high -level:v 5.1 -sei:v 0 -an -vf "format=vaapi|nv12,hwupload,scale_vaapi=1280:720:out_range=tv" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
// [FILE] video will be output for MJPEG to pipe, audio will be skipped
args = parseArgs("/media/bbb.mp4#video=mjpeg#hardware=vaapi")
require.Equal(t, `ffmpeg -hide_banner -hwaccel vaapi -hwaccel_output_format vaapi -hwaccel_flags allow_profile_mismatch -re -i /media/bbb.mp4 -c:v mjpeg_vaapi -an -vf "format=vaapi|nv12,hwupload,scale_vaapi=out_range=tv" -f mjpeg -`, args.String())
// [DEVICE] MJPEG video with size 1920x1080 will be transcoded to H265
args = parseArgs("device?video=0&video_size=1920x1080#video=h265#hardware=vaapi")
require.Equal(t, `ffmpeg -hide_banner -hwaccel vaapi -hwaccel_output_format vaapi -hwaccel_flags allow_profile_mismatch -f dshow -video_size 1920x1080 -i video="0" -c:v hevc_vaapi -g 50 -bf 0 -profile:v high -level:v 5.1 -sei:v 0 -an -vf "format=vaapi|nv12,hwupload,scale_vaapi=out_range=tv" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
tests := []struct {
name string
source string
expect string
}{
{
name: "[HTTP-MJPEG] video will be transcoded to H264",
source: "http:///example.com#video=h264#hardware=vaapi",
expect: `ffmpeg -hide_banner -hwaccel vaapi -hwaccel_output_format vaapi -hwaccel_flags allow_profile_mismatch -fflags nobuffer -flags low_delay -i http:///example.com -c:v h264_vaapi -g 50 -bf 0 -profile:v high -level:v 4.1 -sei:v 0 -an -vf "format=vaapi|nv12,hwupload,scale_vaapi=out_color_matrix=bt709:out_range=tv:format=nv12" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`,
},
{
name: "[RTSP] video with rotation, should be transcoded, so select H264",
source: "rtsp://example.com#video=h264#rotate=180#hardware=vaapi",
expect: `ffmpeg -hide_banner -hwaccel vaapi -hwaccel_output_format vaapi -hwaccel_flags allow_profile_mismatch -allowed_media_types video -fflags nobuffer -flags low_delay -timeout 5000000 -user_agent go2rtc/ffmpeg -rtsp_flags prefer_tcp -i rtsp://example.com -c:v h264_vaapi -g 50 -bf 0 -profile:v high -level:v 4.1 -sei:v 0 -an -vf "format=vaapi|nv12,hwupload,transpose_vaapi=4,scale_vaapi=out_color_matrix=bt709:out_range=tv:format=nv12" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`,
},
{
name: "[RTSP] video with resize to 1280x720, should be transcoded, so select H265",
source: "rtsp://example.com#video=h265#width=1280#height=720#hardware=vaapi",
expect: `ffmpeg -hide_banner -hwaccel vaapi -hwaccel_output_format vaapi -hwaccel_flags allow_profile_mismatch -allowed_media_types video -fflags nobuffer -flags low_delay -timeout 5000000 -user_agent go2rtc/ffmpeg -rtsp_flags prefer_tcp -i rtsp://example.com -c:v hevc_vaapi -g 50 -bf 0 -profile:v main -level:v 5.1 -sei:v 0 -an -vf "format=vaapi|nv12,hwupload,scale_vaapi=1280:720" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`,
},
{
name: "[FILE] video will be output for MJPEG to pipe, audio will be skipped",
source: "/media/bbb.mp4#video=mjpeg#hardware=vaapi",
expect: `ffmpeg -hide_banner -hwaccel vaapi -hwaccel_output_format vaapi -hwaccel_flags allow_profile_mismatch -re -i /media/bbb.mp4 -c:v mjpeg_vaapi -an -vf "format=vaapi|nv12,hwupload" -f mjpeg -`,
},
{
name: "[DEVICE] MJPEG video with size 1920x1080 will be transcoded to H265",
source: "device?video=0&video_size=1920x1080#video=h265#hardware=vaapi",
expect: `ffmpeg -hide_banner -hwaccel vaapi -hwaccel_output_format vaapi -hwaccel_flags allow_profile_mismatch -f dshow -video_size 1920x1080 -i "video=0" -c:v hevc_vaapi -g 50 -bf 0 -profile:v main -level:v 5.1 -sei:v 0 -an -vf "format=vaapi|nv12,hwupload" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
args := parseArgs(test.source)
require.Equal(t, test.expect, args.String())
})
}
}
func TestParseArgsHwV4l2m2m(t *testing.T) {
func _TestParseArgsHwV4l2m2m(t *testing.T) {
// [HTTP-MJPEG] video will be transcoded to H264
args := parseArgs("http:///example.com#video=h264#hardware=v4l2m2m")
require.Equal(t, `ffmpeg -hide_banner -fflags nobuffer -flags low_delay -i http:///example.com -c:v h264_v4l2m2m -g 50 -bf 0 -an -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
@@ -166,7 +203,7 @@ func TestParseArgsHwRKMPP(t *testing.T) {
require.Equal(t, `ffmpeg -hide_banner -fflags nobuffer -flags low_delay -i http://example.com -c:v h264_rkmpp_encoder -g 50 -bf 0 -profile:v high -level:v 4.1 -height 320 -an -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
}
func TestParseArgsHwCuda(t *testing.T) {
func _TestParseArgsHwCuda(t *testing.T) {
// [HTTP-MJPEG] video will be transcoded to H264
args := parseArgs("http:///example.com#video=h264#hardware=cuda")
require.Equal(t, `ffmpeg -hide_banner -hwaccel cuda -hwaccel_output_format cuda -fflags nobuffer -flags low_delay -i http:///example.com -c:v h264_nvenc -g 50 -bf 0 -profile:v high -level:v auto -preset:v p2 -tune:v ll -an -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
@@ -184,7 +221,7 @@ func TestParseArgsHwCuda(t *testing.T) {
require.Equal(t, `ffmpeg -hide_banner -hwaccel cuda -hwaccel_output_format cuda -f dshow -video_size 1920x1080 -i video="0" -c:v hevc_nvenc -g 50 -bf 0 -profile:v high -level:v auto -an -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
}
func TestParseArgsHwDxva2(t *testing.T) {
func _TestParseArgsHwDxva2(t *testing.T) {
// [HTTP-MJPEG] video will be transcoded to H264
args := parseArgs("http:///example.com#video=h264#hardware=dxva2")
require.Equal(t, `ffmpeg -hide_banner -hwaccel dxva2 -hwaccel_output_format dxva2_vld -fflags nobuffer -flags low_delay -i http:///example.com -c:v h264_qsv -g 50 -bf 0 -profile:v high -level:v 4.1 -async_depth:v 1 -an -vf "hwmap=derive_device=qsv,format=qsv" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
@@ -206,7 +243,7 @@ func TestParseArgsHwDxva2(t *testing.T) {
require.Equal(t, `ffmpeg -hide_banner -hwaccel dxva2 -hwaccel_output_format dxva2_vld -f dshow -video_size 1920x1080 -i video="0" -c:v hevc_qsv -g 50 -bf 0 -profile:v high -level:v 5.1 -async_depth:v 1 -an -vf "hwmap=derive_device=qsv,format=qsv" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
}
func TestParseArgsHwVideotoolbox(t *testing.T) {
func _TestParseArgsHwVideotoolbox(t *testing.T) {
// [HTTP-MJPEG] video will be transcoded to H264
args := parseArgs("http:///example.com#video=h264#hardware=videotoolbox")
require.Equal(t, `ffmpeg -hide_banner -hwaccel videotoolbox -hwaccel_output_format videotoolbox_vld -fflags nobuffer -flags low_delay -i http:///example.com -c:v h264_videotoolbox -g 50 -bf 0 -profile:v high -level:v 4.1 -an -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
@@ -226,16 +263,32 @@ func TestParseArgsHwVideotoolbox(t *testing.T) {
func TestDeckLink(t *testing.T) {
args := parseArgs(`DeckLink SDI (2)#video=h264#hardware=vaapi#input=-format_code Hp29 -f decklink -i "{input}"`)
require.Equal(t, `ffmpeg -hide_banner -hwaccel vaapi -hwaccel_output_format vaapi -hwaccel_flags allow_profile_mismatch -format_code Hp29 -f decklink -i "DeckLink SDI (2)" -c:v h264_vaapi -g 50 -bf 0 -profile:v high -level:v 4.1 -sei:v 0 -an -vf "format=vaapi|nv12,hwupload,scale_vaapi=out_range=tv" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
require.Equal(t, `ffmpeg -hide_banner -hwaccel vaapi -hwaccel_output_format vaapi -hwaccel_flags allow_profile_mismatch -format_code Hp29 -f decklink -i "DeckLink SDI (2)" -c:v h264_vaapi -g 50 -bf 0 -profile:v high -level:v 4.1 -sei:v 0 -an -vf "format=vaapi|nv12,hwupload,scale_vaapi=out_color_matrix=bt709:out_range=tv:format=nv12" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
}
func TestDrawText(t *testing.T) {
args := parseArgs("http:///example.com#video=h264#drawtext=fontsize=12")
require.Equal(t, `ffmpeg -hide_banner -fflags nobuffer -flags low_delay -i http:///example.com -c:v libx264 -g 50 -profile:v high -level:v 4.1 -preset:v superfast -tune:v zerolatency -pix_fmt:v yuv420p -an -vf "drawtext=fontsize=12:text='%{localtime\:%Y-%m-%d %X}'" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
args = parseArgs("http:///example.com#video=h264#width=640#drawtext=fontsize=12")
require.Equal(t, `ffmpeg -hide_banner -fflags nobuffer -flags low_delay -i http:///example.com -c:v libx264 -g 50 -profile:v high -level:v 4.1 -preset:v superfast -tune:v zerolatency -pix_fmt:v yuv420p -an -vf "scale=640:-1,drawtext=fontsize=12:text='%{localtime\:%Y-%m-%d %X}'" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
args = parseArgs("http:///example.com#video=h264#width=640#drawtext=fontsize=12#hardware=vaapi")
require.Equal(t, `ffmpeg -hide_banner -hwaccel vaapi -hwaccel_output_format nv12 -hwaccel_flags allow_profile_mismatch -fflags nobuffer -flags low_delay -i http:///example.com -c:v h264_vaapi -g 50 -bf 0 -profile:v high -level:v 4.1 -sei:v 0 -an -vf "scale=640:-1:out_color_matrix=bt709:out_range=tv,drawtext=fontsize=12:text='%{localtime\:%Y-%m-%d %X}',hwupload" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`, args.String())
tests := []struct {
name string
source string
expect string
}{
{
source: "http:///example.com#video=h264#drawtext=fontsize=12",
expect: `ffmpeg -hide_banner -fflags nobuffer -flags low_delay -i http:///example.com -c:v libx264 -g 50 -profile:v high -level:v 4.1 -preset:v superfast -tune:v zerolatency -pix_fmt:v yuv420p -an -vf "drawtext=fontsize=12:text='%{localtime\:%Y-%m-%d %X}'" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`,
},
{
source: "http:///example.com#video=h264#width=640#drawtext=fontsize=12",
expect: `ffmpeg -hide_banner -fflags nobuffer -flags low_delay -i http:///example.com -c:v libx264 -g 50 -profile:v high -level:v 4.1 -preset:v superfast -tune:v zerolatency -pix_fmt:v yuv420p -an -vf "scale=640:-1,drawtext=fontsize=12:text='%{localtime\:%Y-%m-%d %X}'" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`,
},
{
source: "http:///example.com#video=h264#width=640#drawtext=fontsize=12#hardware=vaapi",
expect: `ffmpeg -hide_banner -hwaccel vaapi -hwaccel_output_format nv12 -hwaccel_flags allow_profile_mismatch -fflags nobuffer -flags low_delay -i http:///example.com -c:v h264_vaapi -g 50 -bf 0 -profile:v high -level:v 4.1 -sei:v 0 -an -vf "scale=640:-1,drawtext=fontsize=12:text='%{localtime\:%Y-%m-%d %X}',hwupload" -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}`,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
args := parseArgs(test.source)
require.Equal(t, test.expect, args.String())
})
}
}
+1 -1
View File
@@ -19,5 +19,5 @@ func TestParseQuery(t *testing.T) {
query, err = url.ParseQuery("hw=vaapi")
require.Nil(t, err)
args = parseQuery(query)
require.Equal(t, `ffmpeg -hide_banner -hwaccel vaapi -hwaccel_output_format vaapi -i - -c:v mjpeg_vaapi -vf "format=vaapi|nv12,hwupload" -f mjpeg -`, args.String())
require.Equal(t, `ffmpeg -hide_banner -hwaccel vaapi -hwaccel_output_format vaapi -hwaccel_flags allow_profile_mismatch -i - -c:v mjpeg_vaapi -vf "format=vaapi|nv12,hwupload" -f mjpeg -`, args.String())
}
+59
View File
@@ -0,0 +1,59 @@
package virtual
import (
"net/url"
)
func GetInput(src string) (string, error) {
query, err := url.ParseQuery(src)
if err != nil {
return "", err
}
// set defaults (using Add instead of Set)
query.Add("source", "testsrc")
query.Add("size", "1920x1080")
query.Add("decimals", "2")
// https://ffmpeg.org/ffmpeg-filters.html
source := query.Get("source")
input := "-re -f lavfi -i " + source
sep := "=" // first separator
for key, values := range query {
value := values[0]
// https://ffmpeg.org/ffmpeg-utils.html#video-size-syntax
switch key {
case "color", "rate", "duration", "sar":
case "size":
switch value {
case "720":
value = "1280x720"
case "1080":
value = "1920x1080"
case "2K":
value = "2560x1440"
case "4K":
value = "3840x2160"
case "8K":
value = "7680x4230" // https://reolink.com/blog/8k-resolution/
}
case "decimals":
if source != "testsrc" {
continue
}
default:
continue
}
input += sep + key + "=" + value
sep = ":" // next separator
}
if s := query.Get("format"); s != "" {
input += ",format=" + s
}
return input, nil
}
+3 -1
View File
@@ -57,6 +57,8 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
return
}
log.Debug().Msgf("[mjpeg] transcoding time=%s", time.Since(ts))
case core.CodecJPEG:
b = mjpeg.FixJPEG(b)
}
h := w.Header()
@@ -163,7 +165,7 @@ func handlerWS(tr *ws.Transport, _ *ws.Message) error {
cons.UserAgent = tr.Request.UserAgent()
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()
log.Debug().Err(err).Msg("[mjpeg] add consumer")
return err
}
+1 -1
View File
@@ -239,7 +239,7 @@ func tcpHandler(conn *rtsp.Conn) {
if closer != nil {
if err := conn.Handle(); err != nil {
log.Debug().Msgf("[rtsp] handle=%s", err)
log.Debug().Err(err).Msg("[rtsp] handle")
}
closer()
+34 -23
View File
@@ -3,18 +3,17 @@ package streams
import (
"errors"
"strings"
"sync/atomic"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
// support for multiple simultaneous requests from different consumers
consN := atomic.AddInt32(&s.requests, 1) - 1
// support for multiple simultaneous pending from different consumers
consN := s.pending.Add(1) - 1
var prodErrors []error
var prodErrors = make([]error, len(s.producers))
var prodMedias []*core.Media
var prods []*Producer // matched producers for consumer
var prodStarts []*Producer
// Step 1. Get consumer medias
consMedias := cons.GetMedias()
@@ -23,15 +22,20 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
producers:
for prodN, prod := range s.producers {
if prodErrors[prodN] != nil {
log.Trace().Msgf("[streams] skip cons=%d prod=%d", consN, prodN)
continue
}
if err = prod.Dial(); err != nil {
log.Trace().Err(err).Msgf("[streams] skip prod=%s", prod.url)
prodErrors = append(prodErrors, err)
log.Trace().Err(err).Msgf("[streams] dial cons=%d prod=%d", consN, prodN)
prodErrors[prodN] = err
continue
}
// Step 2. Get producer medias (not tracks yet)
for _, prodMedia := range prod.GetMedias() {
log.Trace().Msgf("[streams] check prod=%d media=%s", prodN, prodMedia)
log.Trace().Msgf("[streams] check cons=%d prod=%d media=%s", consN, prodN, prodMedia)
prodMedias = append(prodMedias, prodMedia)
// Step 3. Match consumer/producer codecs list
@@ -44,11 +48,12 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
switch prodMedia.Direction {
case core.DirectionRecvonly:
log.Trace().Msgf("[streams] match prod=%d => cons=%d", prodN, consN)
log.Trace().Msgf("[streams] match cons=%d <= prod=%d", consN, prodN)
// Step 4. Get recvonly track from producer
if track, err = prod.GetTrack(prodMedia, prodCodec); err != nil {
log.Info().Err(err).Msg("[streams] can't get track")
prodErrors[prodN] = err
continue
}
// Step 5. Add track to consumer
@@ -68,11 +73,12 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
// Step 5. Add track to producer
if err = prod.AddTrack(prodMedia, prodCodec, track); err != nil {
log.Info().Err(err).Msg("[streams] can't add track")
prodErrors[prodN] = err
continue
}
}
prods = append(prods, prod)
prodStarts = append(prodStarts, prod)
if !consMedia.MatchAll() {
break producers
@@ -82,11 +88,11 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
}
// stop producers if they don't have readers
if atomic.AddInt32(&s.requests, -1) == 0 {
if s.pending.Add(-1) == 0 {
s.stopProducers()
}
if len(prods) == 0 {
if len(prodStarts) == 0 {
return formatError(consMedias, prodMedias, prodErrors)
}
@@ -95,7 +101,7 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
s.mu.Unlock()
// there may be duplicates, but that's not a problem
for _, prod := range prods {
for _, prod := range prodStarts {
prod.start()
}
@@ -103,6 +109,20 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
}
func formatError(consMedias, prodMedias []*core.Media, prodErrors []error) error {
// 1. Return errors if any not nil
var text string
for _, err := range prodErrors {
if err != nil {
text = appendString(text, err.Error())
}
}
if len(text) != 0 {
return errors.New("streams: " + text)
}
// 2. Return "codecs not matched"
if prodMedias != nil {
var prod, cons string
@@ -125,16 +145,7 @@ func formatError(consMedias, prodMedias []*core.Media, prodErrors []error) error
return errors.New("streams: codecs not matched: " + prod + " => " + cons)
}
if prodErrors != nil {
var text string
for _, err := range prodErrors {
text = appendString(text, err.Error())
}
return errors.New("streams: " + text)
}
// 3. Return unknown error
return errors.New("streams: unknown error")
}
+2 -2
View File
@@ -245,10 +245,10 @@ func (p *Producer) stop() {
switch p.state {
case stateExternal:
log.Debug().Msgf("[streams] can't stop external producer")
log.Trace().Msgf("[streams] skip stop external producer")
return
case stateNone:
log.Debug().Msgf("[streams] can't stop none producer")
log.Trace().Msgf("[streams] skip stop none producer")
return
case stateStart:
p.workerID++
+2 -1
View File
@@ -3,6 +3,7 @@ package streams
import (
"encoding/json"
"sync"
"sync/atomic"
"github.com/AlexxIT/go2rtc/pkg/core"
)
@@ -11,7 +12,7 @@ type Stream struct {
producers []*Producer
consumers []core.Consumer
mu sync.Mutex
requests int32
pending atomic.Int32
}
func NewStream(source any) *Stream {
+1 -1
View File
@@ -10,7 +10,7 @@ import (
func TestRecursion(t *testing.T) {
// create stream with some source
stream1 := New("from_yaml", "does not matter")
stream1 := New("from_yaml", "does_not_matter")
require.Len(t, streams, 1)
// ask another unnamed stream that links go2rtc
+7 -3
View File
@@ -7,6 +7,7 @@ import (
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/AlexxIT/go2rtc/internal/api/ws"
@@ -82,6 +83,7 @@ func go2rtcClient(url string) (core.Producer, error) {
// waiter will wait PC error or WS error or nil (connection OK)
var connState core.Waiter
var connMu sync.Mutex
prod := webrtc.NewConn(pc)
prod.Desc = "WebRTC/WebSocket async"
@@ -91,7 +93,9 @@ func go2rtcClient(url string) (core.Producer, error) {
case *pion.ICECandidate:
s := msg.ToJSON().Candidate
log.Trace().Str("candidate", s).Msg("[webrtc] local ")
connMu.Lock()
_ = conn.WriteJSON(&ws.Message{Type: "webrtc/candidate", Value: s})
connMu.Unlock()
case pion.PeerConnectionState:
switch msg {
@@ -118,9 +122,9 @@ func go2rtcClient(url string) (core.Producer, error) {
// 4. Send offer
msg := &ws.Message{Type: "webrtc/offer", Value: offer}
if err = conn.WriteJSON(msg); err != nil {
return nil, err
}
connMu.Lock()
_ = conn.WriteJSON(msg)
connMu.Unlock()
// 5. Get answer
if err = conn.ReadJSON(msg); err != nil {