Initial commit

This commit is contained in:
Alexey Khit
2022-08-18 09:19:00 +03:00
commit 3e77835583
65 changed files with 6372 additions and 0 deletions
+4
View File
@@ -0,0 +1,4 @@
**Project layout**
- https://github.com/golang-standards/project-layout
- https://github.com/micro/micro
+119
View File
@@ -0,0 +1,119 @@
package api
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/cmd/app"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/gorilla/websocket"
"github.com/rs/zerolog"
"net"
"net/http"
)
func Init() {
var cfg struct {
Mod struct {
Listen string `yaml:"listen"`
BasePath string `yaml:"base_path"`
StaticDir string `yaml:"static_dir"`
} `yaml:"api"`
}
// default config
cfg.Mod.Listen = ":3000"
cfg.Mod.StaticDir = "www"
// load config from YAML
app.LoadConfig(&cfg)
if cfg.Mod.Listen == "" {
return
}
basePath = cfg.Mod.BasePath
log = app.GetLogger("api")
if cfg.Mod.StaticDir != "" {
fileServer = http.FileServer(http.Dir(cfg.Mod.StaticDir))
HandleFunc("/", fileServerHandlder)
}
HandleFunc("/api/stack", stackHandler)
HandleFunc("/api/stats", statsHandler)
HandleFunc("/api/ws", apiWS)
// ensure we can listen without errors
listener, err := net.Listen("tcp", cfg.Mod.Listen)
if err != nil {
log.Fatal().Err(err).Msg("[api] listen")
}
log.Info().Str("addr", cfg.Mod.Listen).Msg("[api] listen")
go func() {
s := http.Server{}
if err = s.Serve(listener); err != nil {
log.Fatal().Err(err).Msg("[api] Serve")
}
}()
}
func HandleFunc(pattern string, handler http.HandlerFunc) {
http.HandleFunc(basePath+pattern, handler)
}
func HandleWS(msgType string, handler WSHandler) {
wsHandlers[msgType] = handler
}
var basePath string
var fileServer http.Handler
var log zerolog.Logger
var wsHandlers = make(map[string]WSHandler)
func fileServerHandlder(w http.ResponseWriter, r *http.Request) {
if basePath != "" {
r.URL.Path = r.URL.Path[len(basePath):]
}
fileServer.ServeHTTP(w, r)
}
func statsHandler(w http.ResponseWriter, _ *http.Request) {
v := map[string]interface{}{
"streams": streams.Streams,
}
data, err := json.Marshal(v)
if err != nil {
log.Error().Err(err).Msg("[api.stats] marshal")
}
if _, err = w.Write(data); err != nil {
log.Error().Err(err).Msg("[api.stats] write")
}
}
func apiWS(w http.ResponseWriter, r *http.Request) {
ctx := new(Context)
if err := ctx.Upgrade(w, r); err != nil {
log.Error().Err(err).Msg("[api.ws] upgrade")
return
}
defer ctx.Close()
for {
msg := new(streamer.Message)
if err := ctx.Conn.ReadJSON(msg); err != nil {
if websocket.IsUnexpectedCloseError(
err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure,
) {
log.Error().Err(err).Msg("[api.ws] readJSON")
}
return
}
handler := wsHandlers[msg.Type]
if handler != nil {
handler(ctx, msg)
}
}
}
+52
View File
@@ -0,0 +1,52 @@
package api
import (
"bytes"
"fmt"
"net/http"
"runtime"
)
var stackSkip = [][]byte{
// debug.go
[]byte("github.com/AlexxIT/go2rtc/cmd/debug.handler"),
// cmd.go
[]byte("github.com/AlexxIT/go2rtc/cmd.Run"),
[]byte("created by os/signal.Notify"),
// api.go
[]byte("created by github.com/AlexxIT/go2rtc/cmd/api.Init"),
[]byte("created by net/http.(*connReader).startBackgroundRead"),
[]byte("created by net/http.(*Server).Serve"),
[]byte("created by github.com/AlexxIT/go2rtc/cmd/rtsp.Init"),
}
func stackHandler(w http.ResponseWriter, r *http.Request) {
sep := []byte("\n\n")
buf := make([]byte, 65535)
i := 0
n := runtime.Stack(buf, true)
skipped := 0
for _, item := range bytes.Split(buf[:n], sep) {
for _, skip := range stackSkip {
if bytes.Contains(item, skip) {
item = nil
skipped++
break
}
}
if item != nil {
i += copy(buf[i:], item)
i += copy(buf[i:], sep)
}
}
i += copy(buf[i:], fmt.Sprintf(
"Total: %d, Skipped: %d", runtime.NumGoroutine(), skipped),
)
if _, err := w.Write(buf[:i]); err != nil {
panic(err)
}
}
+67
View File
@@ -0,0 +1,67 @@
package api
import (
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/gorilla/websocket"
"net/http"
"sync"
)
type WSHandler func(ctx *Context, msg *streamer.Message)
var apiWsUp = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 512000,
}
type Context struct {
Conn *websocket.Conn
Request *http.Request
Consumer interface{} // TODO: rewrite
onClose []func()
mu sync.Mutex
}
func (ctx *Context) Upgrade(w http.ResponseWriter, r *http.Request) (err error) {
ctx.Conn, err = apiWsUp.Upgrade(w, r, nil)
ctx.Request = r
return
}
func (ctx *Context) Close() {
for _, f := range ctx.onClose {
f()
}
_ = ctx.Conn.Close()
}
func (ctx *Context) Write(msg interface{}) {
ctx.mu.Lock()
defer ctx.mu.Unlock()
var err error
switch msg := msg.(type) {
case *streamer.Message:
err = ctx.Conn.WriteJSON(msg)
case []byte:
err = ctx.Conn.WriteMessage(websocket.BinaryMessage, msg)
default:
return
}
if err != nil {
//panic(err) // TODO: fix panic
}
}
func (ctx *Context) Error(err error) {
ctx.Write(&streamer.Message{
Type: "error", Value: err.Error(),
})
}
func (ctx *Context) OnClose(f func()) {
ctx.onClose = append(ctx.onClose, f)
}
+68
View File
@@ -0,0 +1,68 @@
package app
import (
"github.com/rs/zerolog"
"gopkg.in/yaml.v3"
"io"
"os"
"runtime"
)
func Init() {
data, _ = os.ReadFile("go2rtc.yaml")
var cfg struct {
Mod map[string]string `yaml:"log"`
}
LoadConfig(&cfg)
var writer io.Writer = os.Stdout
// styles
format := cfg.Mod["format"]
if format != "json" {
writer = zerolog.ConsoleWriter{
Out: writer, TimeFormat: "15:04:05.000",
NoColor: writer != os.Stdout || format == "text",
}
}
zerolog.TimeFieldFormat = zerolog.TimeFormatUnixMs
lvl, err := zerolog.ParseLevel(cfg.Mod["level"])
if err != nil || lvl == zerolog.NoLevel {
lvl = zerolog.InfoLevel
}
log = zerolog.New(writer).With().Timestamp().Logger().Level(lvl)
modules = cfg.Mod
log.Info().Msgf("go2rtc %s/%s", runtime.GOOS, runtime.GOARCH)
}
func LoadConfig(v interface{}) {
if data != nil {
_ = yaml.Unmarshal(data, v)
}
}
func GetLogger(module string) zerolog.Logger {
lvl, err := zerolog.ParseLevel(modules[module])
if err != nil {
return log
}
return log.Level(lvl)
}
// internal
// data - config content
var data []byte
// log - main logger
var log zerolog.Logger
// modules log levels
var modules map[string]string
+41
View File
@@ -0,0 +1,41 @@
package cmd
import (
"github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/app"
"github.com/AlexxIT/go2rtc/cmd/exec"
"github.com/AlexxIT/go2rtc/cmd/ffmpeg"
"github.com/AlexxIT/go2rtc/cmd/hass"
"github.com/AlexxIT/go2rtc/cmd/mse"
"github.com/AlexxIT/go2rtc/cmd/ngrok"
"github.com/AlexxIT/go2rtc/cmd/rtmp"
"github.com/AlexxIT/go2rtc/cmd/rtsp"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/cmd/webrtc"
"os"
"os/signal"
)
func Run() {
app.Init() // init config and logs
streams.Init() // load streams list
rtsp.Init() // add support RTSP client and RTSP server
rtmp.Init() // add support RTMP client
exec.Init() // add support exec scheme (depends on RTSP server)
ffmpeg.Init() // add support ffmpeg scheme (depends on exec scheme)
hass.Init() // add support hass scheme
api.Init() // init HTTP API server
webrtc.Init()
mse.Init()
ngrok.Init()
c := make(chan os.Signal)
signal.Notify(c)
<-c
println("exit OK")
}
+85
View File
@@ -0,0 +1,85 @@
package exec
import (
"crypto/md5"
"encoding/hex"
"errors"
"github.com/AlexxIT/go2rtc/cmd/app"
"github.com/AlexxIT/go2rtc/cmd/rtsp"
"github.com/AlexxIT/go2rtc/cmd/streams"
pkg "github.com/AlexxIT/go2rtc/pkg/rtsp"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/rs/zerolog"
"os"
"os/exec"
"strings"
"time"
)
func Init() {
// depends on RTSP server
if rtsp.Port == "" {
return
}
rtsp.OnProducer = func(prod streamer.Producer) bool {
if conn := prod.(*pkg.Conn); conn != nil {
if waiter := waiters[conn.URL.Path]; waiter != nil {
waiter <- prod
return true
}
}
return false
}
streams.HandleFunc("exec", Handle)
log = app.GetLogger("exec")
// TODO: add sync.Mutex
waiters = map[string]chan streamer.Producer{}
}
func Handle(url string) (streamer.Producer, error) {
sum := md5.Sum([]byte(url))
path := "/" + hex.EncodeToString(sum[:])
url = strings.Replace(
url, "{output}", "rtsp://localhost:"+rtsp.Port+path, 1,
)
// remove `exec:`
args := strings.Split(url[5:], " ")
cmd := exec.Command(args[0], args[1:]...)
if log.Trace().Enabled() {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
}
ch := make(chan streamer.Producer)
waiters[path] = ch
defer delete(waiters, path)
log.Debug().Str("url", url).Msg("[exec] run")
if err := cmd.Start(); err != nil {
log.Error().Err(err).Str("url", url).Msg("[exec]")
return nil, err
}
select {
case <-time.After(time.Second * 10):
_ = cmd.Process.Kill()
log.Error().Str("url", url).Msg("[exec] timeout")
return nil, errors.New("timeout")
case prod := <-ch:
return prod, nil
}
}
// internal
var log zerolog.Logger
var waiters map[string]chan streamer.Producer
+6
View File
@@ -0,0 +1,6 @@
## Useful links
- https://superuser.com/questions/564402/explanation-of-x264-tune
- https://stackoverflow.com/questions/33624016/why-sliced-thread-affect-so-much-on-realtime-encoding-using-ffmpeg-x264
- https://codec.fandom.com/ru/wiki/X264_-_%D0%BE%D0%BF%D0%B8%D1%81%D0%B0%D0%BD%D0%B8%D0%B5_%D0%BA%D0%BB%D1%8E%D1%87%D0%B5%D0%B9_%D0%BA%D0%BE%D0%B4%D0%B8%D1%80%D0%BE%D0%B2%D0%B0%D0%BD%D0%B8%D1%8F
- https://html5test.com/
+112
View File
@@ -0,0 +1,112 @@
package ffmpeg
import (
"github.com/AlexxIT/go2rtc/cmd/app"
"github.com/AlexxIT/go2rtc/cmd/exec"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"net/url"
"strings"
)
func Init() {
var cfg struct {
Mod map[string]string `yaml:"ffmpeg"`
}
// defaults
cfg.Mod = map[string]string{
"bin": "ffmpeg",
// inputs
"link": "-hide_banner -i {input}",
"rtsp": "-hide_banner -fflags nobuffer -flags low_delay -rtsp_transport tcp -i {input}",
"file": "-hide_banner -re -stream_loop -1 -i {input}",
// output
"out": "-rtsp_transport tcp -f rtsp {output}",
// `-g 30` - group of picture, GOP, keyframe interval
// `-preset superfast` - we can't use ultrafast because it doesn't support `-profile main -level 4.1`
// `-tune zerolatency` - for minimal latency
// `-profile main -level 4.1` - most used streaming profile
"h264": "-codec:v libx264 -g 30 -preset superfast -tune zerolatency -profile main -level 4.1",
"h264/ultra": "-codec:v libx264 -g 30 -preset ultrafast -tune zerolatency",
"h264/high": "-codec:v libx264 -g 30 -preset superfast -tune zerolatency",
"h265": "-codec:v libx265 -g 30 -preset ultrafast -tune zerolatency",
"opus": "-codec:a libopus -ar 48000 -ac 2",
"pcmu": "-codec:a pcm_mulaw -ar 8000 -ac 1",
"pcmu/16000": "-codec:a pcm_mulaw -ar 16000 -ac 1",
"pcmu/48000": "-codec:a pcm_mulaw -ar 48000 -ac 1",
"pcma": "-codec:a pcm_alaw -ar 8000 -ac 1",
"pcma/16000": "-codec:a pcm_alaw -ar 16000 -ac 1",
"pcma/48000": "-codec:a pcm_alaw -ar 48000 -ac 1",
"aac/16000": "-codec:a aac -ar 16000 -ac 1",
}
app.LoadConfig(&cfg)
tpl := cfg.Mod
streams.HandleFunc("ffmpeg", func(s string) (streamer.Producer, error) {
s = s[7:] // remove `ffmpeg:`
var query url.Values
if i := strings.IndexByte(s, '#'); i > 0 {
query, _ = url.ParseQuery(s[i+1:])
s = s[:i]
}
var template string
switch {
case strings.HasPrefix(s, "rtsp"):
template = tpl["rtsp"]
case strings.Contains(s, "://"):
template = tpl["link"]
default:
template = tpl["file"]
}
s = "exec:" + tpl["bin"] + " " +
strings.Replace(template, "{input}", s, 1)
if query != nil {
for _, raw := range query["raw"] {
s += " " + raw
}
// TODO: multiple codecs via -map
// s += fmt.Sprintf(" -map 0:v:0 -c:v:%d copy", i)
for _, video := range query["video"] {
if video == "copy" {
s += " -codec:v copy"
} else {
s += " " + tpl[video]
}
}
for _, audio := range query["audio"] {
if audio == "copy" {
s += " -codec:v copy"
} else {
s += " " + tpl[audio]
}
}
if query["video"] == nil {
s += " -vn"
}
if query["audio"] == nil {
s += " -an"
}
} else {
s += " -c copy"
}
s += " " + tpl["out"]
return exec.Handle(s)
})
}
+71
View File
@@ -0,0 +1,71 @@
package hass
import (
"encoding/json"
"fmt"
"github.com/AlexxIT/go2rtc/cmd/app"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"os"
"path"
)
func Init() {
var conf struct {
Mod struct {
Config string `yaml:"config"`
} `yaml:"hass"`
}
app.LoadConfig(&conf)
filename := path.Join(conf.Mod.Config, ".storage/core.config_entries")
data, err := os.ReadFile(filename)
if err != nil {
return
}
ent := new(entries)
if err = json.Unmarshal(data, ent); err != nil {
return
}
urls := map[string]string{}
for _, entrie := range ent.Data.Entries {
switch entrie.Domain {
case "generic":
if entrie.Options.StreamSource != "" {
urls[entrie.Title] = entrie.Options.StreamSource
}
}
}
streams.HandleFunc("hass", func(url string) (streamer.Producer, error) {
if hurl := urls[url[5:]]; hurl != "" {
return streams.GetProducer(hurl)
}
return nil, fmt.Errorf("can't get url: %s", url)
})
}
type entries struct {
Data struct {
Entries []struct {
Title string `json:"title"`
Domain string `json:"domain"`
Data struct {
ClientID string `json:"iOSPairingId"`
ClientPrivate string `json:"iOSDeviceLTSK"`
ClientPublic string `json:"iOSDeviceLTPK"`
DeviceID string `json:"AccessoryPairingID"`
DevicePublic string `json:"AccessoryLTPK"`
DeviceHost string `json:"AccessoryIP"`
DevicePort uint16 `json:"AccessoryPort"`
} `json:"data"`
Options struct {
StreamSource string `json:"stream_source"`
}
} `json:"entries"`
} `json:"data"`
}
+42
View File
@@ -0,0 +1,42 @@
package mse
import (
"github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/mse"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/rs/zerolog/log"
)
func Init() {
api.HandleWS("mse", handler)
}
func handler(ctx *api.Context, msg *streamer.Message) {
url := ctx.Request.URL.Query().Get("url")
stream := streams.Get(url)
if stream == nil {
return
}
cons := new(mse.Consumer)
cons.UserAgent = ctx.Request.UserAgent()
cons.RemoteAddr = ctx.Request.RemoteAddr
cons.Listen(func(msg interface{}) {
switch msg.(type) {
case *streamer.Message, []byte:
ctx.Write(msg)
}
})
if err := stream.AddConsumer(cons); err != nil {
log.Warn().Err(err).Msg("[api.mse] Add consumer")
ctx.Error(err)
return
}
ctx.OnClose(func() {
stream.RemoveConsumer(cons)
})
cons.Init()
}
+83
View File
@@ -0,0 +1,83 @@
package ngrok
import (
"fmt"
"github.com/AlexxIT/go2rtc/cmd/app"
"github.com/AlexxIT/go2rtc/cmd/webrtc"
"github.com/AlexxIT/go2rtc/pkg/ngrok"
"github.com/rs/zerolog"
"net"
"strings"
)
func Init() {
var cfg struct {
Log struct {
Level string `yaml:"ngrok"`
} `yaml:"log"`
Mod struct {
Cmd string `yaml:"command"`
} `yaml:"ngrok"`
}
app.LoadConfig(&cfg)
if cfg.Mod.Cmd == "" {
return
}
log = app.GetLogger(cfg.Log.Level)
ngr, err := ngrok.NewNgrok(cfg.Mod.Cmd)
if err != nil {
log.Error().Err(err).Msg("[ngrok] start")
}
ngr.Listen(func(msg interface{}) {
if msg := msg.(*ngrok.Message); msg != nil {
if strings.HasPrefix(msg.Line, "ERROR:") {
log.Warn().Msg("[ngrok] " + msg.Line)
} else {
log.Debug().Msg("[ngrok] " + msg.Line)
}
// Addr: "//localhost:8555", URL: "tcp://1.tcp.eu.ngrok.io:12345"
if msg.Addr == "//localhost:"+webrtc.Port && strings.HasPrefix(msg.URL, "tcp://") {
// don't know if really necessary use IP
address, err := ConvertHostToIP(msg.URL[6:])
if err != nil {
log.Warn().Err(err).Msg("[ngrok] add candidate")
return
}
webrtc.AddCandidate(address)
}
}
})
go func() {
if err = ngr.Serve(); err != nil {
log.Error().Err(err).Msg("[ngrok] run")
}
}()
}
var log zerolog.Logger
func ConvertHostToIP(address string) (string, error) {
host, port, err := net.SplitHostPort(address)
if err != nil {
return "", err
}
ip, err := net.LookupIP(host)
if err != nil {
return "", err
}
if len(ip) == 0 {
return "", fmt.Errorf("can't resolve: %s", host)
}
return ip[0].String() + ":" + port, nil
}
+19
View File
@@ -0,0 +1,19 @@
package rtmp
import (
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/rtmp"
"github.com/AlexxIT/go2rtc/pkg/streamer"
)
func Init() {
streams.HandleFunc("rtmp", handle)
}
func handle(url string) (streamer.Producer, error) {
conn := rtmp.NewClient(url)
if err := conn.Dial(); err != nil {
return nil, err
}
return conn, nil
}
+201
View File
@@ -0,0 +1,201 @@
package rtsp
import (
"github.com/AlexxIT/go2rtc/cmd/app"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/rtsp"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog"
"net"
)
func Init() {
var conf struct {
Mod struct {
Listen string `yaml:"listen"`
} `yaml:"rtsp"`
}
// default config
conf.Mod.Listen = ":554"
app.LoadConfig(&conf)
log = app.GetLogger("rtsp")
// RTSP client support
streams.HandleFunc("rtsp", rtspHandler)
streams.HandleFunc("rtsps", rtspHandler)
// RTSP server support
address := conf.Mod.Listen
if address != "" {
_, Port, _ = net.SplitHostPort(address)
go worker(address)
}
}
var Port string
var OnProducer func(conn streamer.Producer) bool // TODO: maybe rewrite...
// internal
var log zerolog.Logger
func rtspHandler(url string) (streamer.Producer, error) {
conn, err := rtsp.NewClient(url)
if err != nil {
return nil, err
}
if log.Trace().Enabled() {
conn.Listen(func(msg interface{}) {
switch msg := msg.(type) {
case *tcp.Request:
log.Trace().Msgf("[rtsp] client request:\n%s", msg)
case *tcp.Response:
log.Trace().Msgf("[rtsp] client response:\n%s", msg)
}
})
}
if err = conn.Dial(); err != nil {
return nil, err
}
if err = conn.Describe(); err != nil {
return nil, err
}
return conn, nil
}
func worker(address string) {
srv, err := tcp.NewServer(address)
if err != nil {
log.Error().Err(err).Msg("[rtsp] listen")
return
}
log.Info().Str("addr", address).Msg("[rtsp] listen")
srv.Listen(func(msg interface{}) {
switch msg.(type) {
case net.Conn:
var name string
var onDisconnect func()
trace := log.Trace().Enabled()
conn := rtsp.NewServer(msg.(net.Conn))
conn.Listen(func(msg interface{}) {
if trace {
switch msg := msg.(type) {
case *tcp.Request:
log.Trace().Msgf("[rtsp] server request:\n%s", msg)
case *tcp.Response:
log.Trace().Msgf("[rtsp] server response:\n%s", msg)
}
}
switch msg {
case rtsp.MethodDescribe:
name = conn.URL.Path[1:]
log.Debug().Str("stream", name).Msg("[rtsp] new consumer")
stream := streams.Get(name) // TODO: rewrite
if stream == nil {
return
}
initMedias(conn)
if err = stream.AddConsumer(conn); err != nil {
log.Warn().Err(err).Str("stream", name).Msg("[rtsp]")
return
}
onDisconnect = func() {
stream.RemoveConsumer(conn)
}
case rtsp.MethodAnnounce:
if OnProducer != nil {
if OnProducer(conn) {
return
}
}
name = conn.URL.Path[1:]
log.Debug().Str("stream", name).Msg("[rtsp] new producer")
str := streams.Get(conn.URL.Path[1:])
if str == nil {
return
}
str.AddProducer(conn)
onDisconnect = func() {
str.RemoveProducer(conn)
}
case streamer.StatePlaying:
log.Debug().Str("stream", name).Msg("[rtsp] start")
}
})
if err = conn.Accept(); err != nil {
log.Warn().Err(err).Msg("[rtsp] accept")
return
}
if err = conn.Handle(); err != nil {
//log.Warn().Err(err).Msg("[rtsp] handle server")
}
if onDisconnect != nil {
onDisconnect()
}
log.Debug().Str("stream", name).Msg("[rtsp] disconnect")
}
})
srv.Serve()
}
func initMedias(conn *rtsp.Conn) {
// set media candidates from query list
for key, value := range conn.URL.Query() {
switch key {
case streamer.KindVideo, streamer.KindAudio:
for _, value := range value {
media := &streamer.Media{
Kind: key, Direction: streamer.DirectionRecvonly,
}
switch value {
case "", "copy": // pass empty codecs list
default:
codec := streamer.NewCodec(value)
media.Codecs = append(media.Codecs, codec)
}
conn.Medias = append(conn.Medias, media)
}
}
}
// set default media candidates if query is empty
if conn.Medias == nil {
conn.Medias = []*streamer.Media{
{Kind: streamer.KindVideo, Direction: streamer.DirectionRecvonly},
{Kind: streamer.KindAudio, Direction: streamer.DirectionRecvonly},
}
}
}
+27
View File
@@ -0,0 +1,27 @@
package streams
import (
"fmt"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"strings"
)
type Handler func(url string) (streamer.Producer, error)
var handlers map[string]Handler
func HandleFunc(scheme string, handler Handler) {
if handlers == nil {
handlers = make(map[string]Handler)
}
handlers[scheme] = handler
}
func GetProducer(url string) (streamer.Producer, error) {
i := strings.IndexByte(url, ':')
handler := handlers[url[:i]]
if handler == nil {
return nil, fmt.Errorf("unsupported scheme: %s", url)
}
return handler(url)
}
+89
View File
@@ -0,0 +1,89 @@
package streams
import (
"github.com/AlexxIT/go2rtc/pkg/streamer"
"strings"
)
type state byte
const (
stateNone state = iota
stateMedias
stateTracks
stateStart
)
type Producer struct {
streamer.Element
url string
element streamer.Producer
tracks []*streamer.Track
state state
}
func (p *Producer) GetMedias() []*streamer.Media {
if p.state == stateNone {
i := strings.IndexByte(p.url, ':')
handler := handlers[p.url[:i]]
if handler == nil {
log.Warn().Str("url", p.url).Msg("[streams] unsupported scheme")
return nil
}
log.Debug().Str("url", p.url).Msg("[streams] probe producer")
var err error
p.element, err = handler(p.url)
if err != nil {
log.Error().Err(err).Str("url", p.url).Msg("[streams] probe producer")
return nil
}
p.state = stateMedias
}
return p.element.GetMedias()
}
func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track {
if p.state == stateMedias {
p.state = stateTracks
}
track := p.element.GetTrack(media, codec)
for _, t := range p.tracks {
if track == t {
return track
}
}
p.tracks = append(p.tracks, track)
return track
}
// internals
func (p *Producer) start() {
if p.state != stateTracks {
return
}
log.Debug().Str("url", p.url).Msg("[streams] start producer")
p.state = stateStart
go p.element.Start()
}
func (p *Producer) stop() {
log.Debug().Str("url", p.url).Msg("[streams] stop producer")
_ = p.element.Stop()
p.element = nil
p.tracks = nil
p.state = stateNone
}
+164
View File
@@ -0,0 +1,164 @@
package streams
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/streamer"
)
type Consumer struct {
element streamer.Consumer
tracks []*streamer.Track
}
type Stream struct {
producers []*Producer
consumers []*Consumer
}
func newStream(source interface{}) *Stream {
s := new(Stream)
switch source := source.(type) {
case string:
prod := &Producer{url: source}
s.producers = append(s.producers, prod)
case []interface{}:
for _, source := range source {
prod := &Producer{url: source.(string)}
s.producers = append(s.producers, prod)
}
case map[string]interface{}:
return newStream(source["url"])
default:
panic("wrong source type")
}
return s
}
func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
ic := len(s.consumers)
consumer := &Consumer{element: cons}
// Step 1. Get consumer medias
for icc, consMedia := range cons.GetMedias() {
log.Trace().Stringer("media", consMedia).
Msgf("[streams] consumer:%d:%d candidate", ic, icc)
producers:
for ip, prod := range s.producers {
// Step 2. Get producer medias (not tracks yet)
for ipc, prodMedia := range prod.GetMedias() {
log.Trace().Stringer("media", prodMedia).
Msgf("[streams] producer:%d:%d candidate", ip, ipc)
// Step 3. Match consumer/producer codecs list
prodCodec := prodMedia.MatchMedia(consMedia)
if prodCodec != nil {
log.Trace().Stringer("codec", prodCodec).
Msgf("[streams] match producer:%d:%d => consumer:%d:%d", ip, ipc, ic, icc)
// Step 4. Get producer track
prodTrack := prod.GetTrack(prodMedia, prodCodec)
// Step 5. Add track to consumer and get new track
consTrack := consumer.element.AddTrack(consMedia, prodTrack)
consumer.tracks = append(consumer.tracks, consTrack)
break producers
}
}
}
}
// can't match tracks for consumer
if len(consumer.tracks) == 0 {
return nil
}
s.consumers = append(s.consumers, consumer)
for _, prod := range s.producers {
prod.start()
}
return nil
}
func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
for i, consumer := range s.consumers {
if consumer.element == cons {
// remove consumer pads from all producers
for _, track := range consumer.tracks {
track.Unbind()
}
// remove consumer from slice
s.removeConsumer(i)
break
}
}
for _, producer := range s.producers {
var sink bool
for _, track := range producer.tracks {
if len(track.Sink) > 0 {
sink = true
}
}
if !sink {
producer.stop()
}
}
}
func (s *Stream) AddProducer(prod streamer.Producer) {
panic("not implemented")
}
func (s *Stream) RemoveProducer(prod streamer.Producer) {
panic("not implemented")
}
func (s *Stream) MarshalJSON() ([]byte, error) {
var v []interface{}
for _, prod := range s.producers {
if prod.element != nil {
v = append(v, prod.element)
}
}
for _, cons := range s.consumers {
// cons.element always not nil
v = append(v, cons.element)
}
if len(v) == 0 {
v = nil
}
return json.Marshal(v)
}
func (s *Stream) removeConsumer(i int) {
switch {
case len(s.consumers) == 1: // only one element
s.consumers = nil
case i == 0: // first element
s.consumers = s.consumers[1:]
case i == len(s.consumers)-1: // last element
s.consumers = s.consumers[:i]
default: // middle element
s.consumers = append(s.consumers[:i], s.consumers[i+1:]...)
}
}
func (s *Stream) removeProducer(i int) {
switch {
case len(s.producers) == 1: // only one element
s.producers = nil
case i == 0: // first element
s.producers = s.producers[1:]
case i == len(s.producers)-1: // last element
s.producers = s.producers[:i]
default: // middle element
s.producers = append(s.producers[:i], s.producers[i+1:]...)
}
}
+134
View File
@@ -0,0 +1,134 @@
package streams
import (
"github.com/AlexxIT/go2rtc/pkg/fake"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
// Google Chrome 104.0.5112.79
const chrome = `v=0
o=- 0 0 IN IP4 0.0.0.0
s=-
t=0 0
m=audio 9 UDP/TLS/RTP/SAVPF 111 63 103 104 9 0 8 110 112 113 126
a=sendrecv
a=rtpmap:111 opus/48000/2
a=rtpmap:63 red/48000/2
a=rtpmap:103 ISAC/16000
a=rtpmap:104 ISAC/32000
a=rtpmap:9 G722/8000
a=rtpmap:0 PCMU/8000
a=rtpmap:8 PCMA/8000
a=rtpmap:110 telephone-event/48000
a=rtpmap:112 telephone-event/32000
a=rtpmap:113 telephone-event/16000
a=rtpmap:126 telephone-event/8000
m=video 9 UDP/TLS/RTP/SAVPF 96 97 98 99 100 101 102 122 127 121 125 107 108 109 124 120 123 119 35 36 37 38 39 40 41 42 114 115 116 117 118 43
a=recvonly
a=rtpmap:96 VP8/90000
a=rtpmap:97 rtx/90000
a=rtpmap:98 VP9/90000
a=rtpmap:99 rtx/90000
a=rtpmap:100 VP9/90000
a=rtpmap:101 rtx/90000
a=rtpmap:102 VP9/90000
a=rtpmap:122 rtx/90000
a=rtpmap:127 H264/90000
a=fmtp:127 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f
a=rtpmap:121 rtx/90000
a=rtpmap:125 H264/90000
a=fmtp:125 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f
a=rtpmap:107 rtx/90000
a=rtpmap:108 H264/90000
a=fmtp:108 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f
a=rtpmap:109 rtx/90000
a=rtpmap:124 H264/90000
a=fmtp:124 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42e01f
a=rtpmap:120 rtx/90000
a=rtpmap:123 H264/90000
a=fmtp:123 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=4d001f
a=rtpmap:119 rtx/90000
a=rtpmap:35 H264/90000
a=fmtp:35 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=4d001f
a=rtpmap:36 rtx/90000
a=rtpmap:37 H264/90000
a=fmtp:37 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=f4001f
a=rtpmap:38 rtx/90000
a=rtpmap:39 H264/90000
a=fmtp:39 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=f4001f
a=rtpmap:40 rtx/90000
a=rtpmap:41 AV1/90000
a=rtpmap:42 rtx/90000
a=rtpmap:114 H264/90000
a=fmtp:114 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=64001f
a=rtpmap:115 rtx/90000
a=rtpmap:116 red/90000
a=rtpmap:117 rtx/90000
a=rtpmap:118 ulpfec/90000
a=rtpmap:43 flexfec-03/90000
`
const dahuaSimple = `v=0
o=- 0 0 IN IP4 0.0.0.0
s=-
t=0 0
m=video 0 RTP/AVP 96
a=control:trackID=0
a=rtpmap:96 H264/90000
a=fmtp:96 packetization-mode=1;profile-level-id=42401E;sprop-parameter-sets=Z0JAHqaAoD2QAA==,aM48gAA=
a=recvonly
m=audio 0 RTP/AVP 97
a=control:trackID=1
a=rtpmap:97 MPEG4-GENERIC/16000
a=fmtp:97 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1408
a=recvonly
m=audio 0 RTP/AVP 8
a=control:trackID=5
a=rtpmap:8 PCMA/8000
a=sendonly
`
const ffmpegPCMU48000 = `v=0
o=- 0 0 IN IP4 127.0.0.1
s=-
t=0 0
m=audio 0 RTP/AVP 96
b=AS:384
a=rtpmap:96 PCMU/48000/1
a=control:streamid=0
`
func TestRouting(t *testing.T) {
prod := &fake.Producer{}
prod.Medias, _ = streamer.UnmarshalRTSPSDP([]byte(dahuaSimple))
assert.Len(t, prod.Medias, 3)
HandleFunc("fake", func(url string) (streamer.Producer, error) {
return prod, nil
})
cons := &fake.Consumer{}
cons.Medias, _ = streamer.UnmarshalSDP([]byte(chrome))
assert.Len(t, cons.Medias, 3)
// setup stream with one producer
stream := newStream("fake:")
// main check:
err := stream.AddConsumer(cons)
assert.Nil(t, err)
assert.Len(t, prod.Tracks, 2)
assert.Len(t, cons.Tracks, 2)
time.Sleep(time.Second)
assert.Greater(t, prod.SendPackets,0)
assert.Greater(t, cons.RecvPackets,0)
assert.Greater(t, prod.RecvPackets,0)
assert.Greater(t, cons.SendPackets,0)
}
+28
View File
@@ -0,0 +1,28 @@
package streams
import (
"github.com/AlexxIT/go2rtc/cmd/app"
"github.com/rs/zerolog"
)
var Streams = map[string]*Stream{}
func Init() {
var cfg struct {
Mod map[string]interface{} `yaml:"streams"`
}
app.LoadConfig(&cfg)
log = app.GetLogger("streams")
for name, item := range cfg.Mod {
Streams[name] = newStream(item)
}
}
func Get(name string) *Stream {
return Streams[name]
}
var log zerolog.Logger
+265
View File
@@ -0,0 +1,265 @@
package webrtc
import (
"github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/app"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/AlexxIT/go2rtc/pkg/webrtc"
pion "github.com/pion/webrtc/v3"
"github.com/rs/zerolog"
"io/ioutil"
"net"
"net/http"
"strings"
)
func Init() {
var cfg struct {
Mod struct {
Listen string `yaml:"listen"`
Candidates []string `yaml:"candidates"`
IceServers []pion.ICEServer `yaml:"ice_servers"`
} `yaml:"webrtc"`
}
cfg.Mod.IceServers = []pion.ICEServer{
{URLs: []string{"stun:stun.l.google.com:19302"}},
}
app.LoadConfig(&cfg)
log = app.GetLogger("webrtc")
address := cfg.Mod.Listen
pionAPI, err := webrtc.NewAPI(address)
if pionAPI == nil {
log.Error().Err(err).Msg("[webrtc] Init API")
return
}
if err != nil {
log.Warn().Err(err).Msg("[webrtc] Listen")
} else if address != "" {
log.Info().Str("addr", address).Msg("[webrtc] Listen")
_, Port, _ = net.SplitHostPort(address)
}
pionConf := pion.Configuration{
ICEServers: cfg.Mod.IceServers,
SDPSemantics: pion.SDPSemanticsUnifiedPlanWithFallback,
}
NewPConn = func() (*pion.PeerConnection, error) {
return pionAPI.NewPeerConnection(pionConf)
}
candidates = cfg.Mod.Candidates
api.HandleFunc("/api/webrtc", apiHandler)
api.HandleFunc("/api/webrtc/camera", cameraHandler)
api.HandleWS(webrtc.MsgTypeOffer, offerHandler)
api.HandleWS(webrtc.MsgTypeCandidate, candidateHandler)
}
func AddCandidate(address string) {
log.Info().Str("addr", address).Msg("[webrtc] new candidate")
candidates = append(candidates, address)
}
var Port string
var log zerolog.Logger
var candidates []string
var NewPConn func() (*pion.PeerConnection, error)
func apiHandler(w http.ResponseWriter, r *http.Request) {
url := r.URL.Query().Get("url")
stream := streams.Get(url)
if stream == nil {
return
}
// get offer
offer, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Error().Err(err).Msg("[webrtc] read offer")
return
}
// create new webrtc instance
cons := new(webrtc.Conn)
cons.Conn, err = NewPConn()
if err != nil {
log.Error().Err(err).Msg("[webrtc] new conn")
return
}
cons.UserAgent = r.UserAgent()
cons.Listen(func(msg interface{}) {
if msg == streamer.StateNull {
stream.RemoveConsumer(cons)
}
})
if err = stream.AddConsumer(cons); err != nil {
log.Warn().Err(err).Msg("[api.webrtc] add consumer")
return
}
cons.Init()
// exchange sdp with waiting all candidates
answer, err := cons.ExchangeSDP(string(offer), true)
// send SDP to client
if _, err = w.Write([]byte(answer)); err != nil {
log.Error().Err(err).Msg("[api.webrtc] send answer")
}
}
func cameraHandler(w http.ResponseWriter, r *http.Request) {
url := r.URL.Query().Get("url")
stream := streams.Get(url)
if stream == nil {
return
}
// get offer
offer, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Error().Err(err).Msg("[webrtc] read offer")
return
}
// create new webrtc instance
conn := new(webrtc.Conn)
conn.Conn, err = NewPConn()
if err != nil {
log.Error().Err(err).Msg("[webrtc] new conn")
return
}
conn.UserAgent = r.UserAgent()
conn.Listen(func(msg interface{}) {
switch msg.(type) {
case pion.PeerConnectionState:
if msg == pion.PeerConnectionStateDisconnected {
stream.RemoveConsumer(conn)
}
case streamer.Track:
//stream.AddProducer(conn)
}
})
conn.Init()
// exchange sdp with waiting all candidates
answer, err := conn.ExchangeSDP(string(offer), true)
// send SDP to client
if _, err = w.Write([]byte(answer)); err != nil {
log.Error().Err(err).Msg("[api.webrtc] send answer")
}
}
func offerHandler(ctx *api.Context, msg *streamer.Message) {
name := ctx.Request.URL.Query().Get("url")
stream := streams.Get(name)
if stream == nil {
return
}
log.Debug().Str("stream", name).Msg("[webrtc] new consumer")
var err error
// create new webrtc instance
conn := new(webrtc.Conn)
conn.Conn, err = NewPConn()
if err != nil {
log.Error().Err(err).Msg("[webrtc] new conn")
return
}
conn.UserAgent = ctx.Request.UserAgent()
conn.Listen(func(msg interface{}) {
switch msg := msg.(type) {
case streamer.EventType:
if msg == streamer.StateNull {
stream.RemoveConsumer(conn)
}
case *streamer.Message:
// subscribe on webrtc server candidates
log.Trace().Str("candidate", msg.Value.(string)).Msg("[webrtc] local")
ctx.Write(msg)
}
})
// 1. SetOffer, so we can get remote client codecs
offer := msg.Value.(string)
log.Trace().Msgf("[webrtc] offer:\n%s", offer)
if err = conn.SetOffer(offer); err != nil {
log.Warn().Err(err).Msg("[api.webrtc] set offer")
ctx.Error(err)
return
}
// 2. AddConsumer, so we get new tracks
if err = stream.AddConsumer(conn); err != nil {
log.Warn().Err(err).Msg("[api.webrtc] add consumer")
ctx.Error(err)
return
}
conn.Init()
// exchange sdp without waiting all candidates
//answer, err := conn.ExchangeSDP(offer, false)
answer, err := conn.GetAnswer()
log.Trace().Msgf("[webrtc] answer\n%s", answer)
if err != nil {
log.Error().Err(err).Msg("[webrtc] get answer")
ctx.Error(err)
return
}
ctx.Write(&streamer.Message{
Type: webrtc.MsgTypeAnswer, Value: answer,
})
for _, address := range candidates {
if strings.HasPrefix(address, "stun:") {
ip, err := webrtc.GetPublicIP()
if err != nil {
log.Warn().Err(err).Msg("[webrtc] public IP")
continue
}
address = ip.String() + address[4:]
}
cand, err := webrtc.NewCandidate(address)
if err != nil {
log.Warn().Err(err).Msg("[webrtc] candidate")
continue
}
conn.Fire(&streamer.Message{
Type: webrtc.MsgTypeCandidate, Value: cand,
})
}
ctx.Consumer = conn
}
func candidateHandler(ctx *api.Context, msg *streamer.Message) {
if ctx.Consumer == nil {
return
}
if conn := ctx.Consumer.(*webrtc.Conn); conn != nil {
log.Trace().Str("candidate", msg.Value.(string)).Msg("[webrtc] Remote")
conn.Push(msg)
}
}