Big rewrite for WebRTC processing

This commit is contained in:
Alexey Khit
2023-02-25 17:10:45 +03:00
parent ad3c5440fe
commit 218eea6806
10 changed files with 625 additions and 253 deletions
+3 -3
View File
@@ -56,7 +56,7 @@ func GetCandidates() (candidates []string) {
return
}
func asyncCandidates(tr *api.Transport, cons *webrtc.Server) {
func asyncCandidates(tr *api.Transport, cons *webrtc.Conn) {
tr.WithContext(func(ctx map[any]any) {
if candidates, ok := ctx["candidate"].([]string); ok {
// process candidates that receive before this moment
@@ -117,9 +117,9 @@ func candidateHandler(tr *api.Transport, msg *api.Message) error {
candidate := msg.String()
log.Trace().Str("candidate", candidate).Msg("[webrtc] remote")
if cons, ok := ctx["webrtc"].(*webrtc.Server); ok {
if cons, ok := ctx["webrtc"].(*webrtc.Conn); ok {
// if webrtc.Server already initialized - process candidate
cons.AddCandidate(candidate)
_ = cons.AddCandidate(candidate)
} else {
// or collect candidate and process it later
list, _ := ctx["candidate"].([]string)
+154
View File
@@ -0,0 +1,154 @@
package webrtc
import (
"errors"
"github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/AlexxIT/go2rtc/pkg/webrtc"
"github.com/gorilla/websocket"
pion "github.com/pion/webrtc/v3"
"io"
"net/http"
"strings"
"time"
)
func streamsHandler(url string) (streamer.Producer, error) {
url = url[7:]
if i := strings.Index(url, "://"); i > 0 {
switch url[:i] {
case "ws", "wss":
return asyncClient(url)
case "http", "https":
return syncClient(url)
}
}
return nil, errors.New("unsupported url: " + url)
}
func asyncClient(url string) (streamer.Producer, error) {
// 1. Connect to signalign server
ws, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
_ = ws.Close()
}
}()
// 2. Create PeerConnection
pc, err := newPeerConnection()
if err != nil {
log.Error().Err(err).Caller().Send()
return nil, err
}
prod := webrtc.NewConn(pc)
prod.Listen(func(msg any) {
switch msg := msg.(type) {
case pion.PeerConnectionState:
_ = ws.Close()
case *pion.ICECandidate:
if msg != nil {
s := msg.ToJSON().Candidate
log.Trace().Str("candidate", s).Msg("[webrtc] local")
_ = ws.WriteJSON(&api.Message{Type: "webrtc/candidate", Value: s})
}
}
})
// 3. Create offer
offer, err := prod.CreateOffer()
if err != nil {
return nil, err
}
// 4. Send offer
msg := &api.Message{Type: "webrtc/offer", Value: offer}
if err = ws.WriteJSON(msg); err != nil {
return nil, err
}
// 5. Get answer
if err = ws.ReadJSON(msg); err != nil {
return nil, err
}
if msg.Type != "webrtc/answer" {
return nil, errors.New("wrong answer: " + msg.Type)
}
answer := msg.String()
if err = prod.SetAnswer(answer); err != nil {
return nil, err
}
// 6. Continue to receiving candidates
go func() {
for {
// receive data from remote
msg := new(api.Message)
if err = ws.ReadJSON(msg); err != nil {
if cerr, ok := err.(*websocket.CloseError); ok {
log.Trace().Err(err).Caller().Msgf("[webrtc] ws code=%d", cerr)
}
break
}
switch msg.Type {
case "webrtc/candidate":
if msg.Value != nil {
_ = prod.AddCandidate(msg.String())
}
}
}
_ = ws.Close()
}()
return prod, nil
}
// syncClient - support WebRTC-HTTP Egress Protocol (WHEP)
func syncClient(url string) (streamer.Producer, error) {
// 2. Create PeerConnection
pc, err := newPeerConnection()
if err != nil {
log.Error().Err(err).Caller().Send()
return nil, err
}
prod := webrtc.NewConn(pc)
// 3. Create offer
offer, err := prod.CreateCompleteOffer()
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", url, strings.NewReader(offer))
req.Header.Set("Content-Type", MimeSDP)
if err != nil {
return nil, err
}
client := http.Client{Timeout: time.Second * 5000}
res, err := client.Do(req)
if err != nil {
return nil, err
}
answer, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}
if err = prod.SetAnswer(string(answer)); err != nil {
return nil, err
}
return prod, nil
}
+196
View File
@@ -0,0 +1,196 @@
package webrtc
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/webrtc"
pion "github.com/pion/webrtc/v3"
"io"
"net/http"
"strconv"
"strings"
"time"
)
const MimeSDP = "application/sdp"
var sessions = map[string]*webrtc.Conn{}
func syncHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "POST":
query := r.URL.Query()
if query.Get("src") != "" {
// WHEP or JSON SDP or raw SDP exchange
outputWebRTC(w, r)
} else if query.Get("dst") != "" {
// WHIP SDP exchange
inputWebRTC(w, r)
} else {
http.Error(w, "", http.StatusBadRequest)
}
case "PATCH":
// TODO: WHEP/WHIP
http.Error(w, "", http.StatusMethodNotAllowed)
case "DELETE":
if id := r.URL.Query().Get("id"); id != "" {
if conn, ok := sessions[id]; ok {
delete(sessions, id)
_ = conn.Close()
} else {
http.Error(w, "", http.StatusNotFound)
}
} else {
http.Error(w, "", http.StatusBadRequest)
}
default:
http.Error(w, "", http.StatusMethodNotAllowed)
}
}
// outputWebRTC support API depending on Content-Type:
// 1. application/json - receive {"type":"offer","sdp":"v=0\r\n..."} and response {"type":"answer","sdp":"v=0\r\n..."}
// 2. application/sdp - receive/response SDP via WebRTC-HTTP Egress Protocol (WHEP)
// 3. other - receive/response raw SDP
func outputWebRTC(w http.ResponseWriter, r *http.Request) {
url := r.URL.Query().Get("src")
stream := streams.Get(url)
if stream == nil {
return
}
mediaType := r.Header.Get("Content-Type")
if mediaType != "" {
mediaType, _, _ = strings.Cut(mediaType, ";")
mediaType = strings.ToLower(strings.TrimSpace(mediaType))
}
var offer string
switch mediaType {
case "application/json":
var desc pion.SessionDescription
if err := json.NewDecoder(r.Body).Decode(&desc); err != nil {
log.Error().Err(err).Caller().Send()
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
offer = desc.SDP
default:
body, err := io.ReadAll(r.Body)
if err != nil {
log.Error().Err(err).Caller().Send()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
offer = string(body)
}
answer, err := ExchangeSDP(stream, offer, r.UserAgent())
if err != nil {
log.Error().Err(err).Caller().Send()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
switch mediaType {
case "application/json":
w.Header().Set("Content-Type", mediaType)
v := pion.SessionDescription{
Type: pion.SDPTypeAnswer, SDP: answer,
}
err = json.NewEncoder(w).Encode(v)
case MimeSDP:
w.Header().Set("Content-Type", mediaType)
w.WriteHeader(http.StatusCreated)
_, err = w.Write([]byte(answer))
default:
_, err = w.Write([]byte(answer))
}
if err != nil {
log.Error().Err(err).Caller().Send()
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
func inputWebRTC(w http.ResponseWriter, r *http.Request) {
dst := r.URL.Query().Get("dst")
stream := streams.Get(dst)
if stream == nil {
stream = streams.New(dst, nil)
}
// 1. Get offer
offer, err := io.ReadAll(r.Body)
if err != nil {
log.Error().Err(err).Caller().Send()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Trace().Msgf("[webrtc] WHIP offer\n%s", offer)
pc, err := newPeerConnection()
if err != nil {
log.Error().Err(err).Caller().Send()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// create new webrtc instance
conn := webrtc.NewConn(pc)
conn.UserAgent = r.UserAgent()
if err = conn.SetOffer(string(offer)); err != nil {
log.Warn().Err(err).Caller().Send()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
answer, err := conn.GetCompleteAnswer()
if err == nil {
answer, err = syncCanditates(answer)
}
if err != nil {
log.Warn().Err(err).Caller().Send()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Trace().Msgf("[webrtc] WHIP answer\n%s", answer)
id := strconv.FormatInt(time.Now().UnixNano(), 36)
sessions[id] = conn
conn.Listen(func(msg interface{}) {
switch msg := msg.(type) {
case pion.PeerConnectionState:
if msg == pion.PeerConnectionStateClosed {
stream.RemoveProducer(conn)
if _, ok := sessions[id]; ok {
delete(sessions, id)
}
}
}
})
stream.AddProducer(conn)
w.Header().Set("Content-Type", MimeSDP)
w.Header().Set("Location", "webrtc?id="+id)
w.WriteHeader(http.StatusCreated)
if _, err = w.Write([]byte(answer)); err != nil {
log.Warn().Err(err).Caller().Send()
return
}
}
+7 -65
View File
@@ -1,7 +1,6 @@
package webrtc
import (
"encoding/json"
"errors"
"github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/app"
@@ -9,10 +8,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/webrtc"
pion "github.com/pion/webrtc/v3"
"github.com/rs/zerolog"
"io"
"mime"
"net"
"net/http"
)
func Init() {
@@ -62,11 +58,16 @@ func Init() {
AddCandidate(candidate)
}
// async WebRTC server (two API versions)
api.HandleWS("webrtc", asyncHandler)
api.HandleWS("webrtc/offer", asyncHandler)
api.HandleWS("webrtc/candidate", candidateHandler)
// sync WebRTC server (two API versions)
api.HandleFunc("api/webrtc", syncHandler)
// WebRTC client
streams.HandleFunc("webrtc", streamsHandler)
}
var Port string
@@ -90,7 +91,7 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error {
return err
}
cons := webrtc.NewServer(pc)
cons := webrtc.NewConn(pc)
cons.UserAgent = tr.Request.UserAgent()
cons.Listen(func(msg any) {
switch msg := msg.(type) {
@@ -153,65 +154,6 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error {
return nil
}
func syncHandler(w http.ResponseWriter, r *http.Request) {
url := r.URL.Query().Get("src")
stream := streams.Get(url)
if stream == nil {
return
}
ct := r.Header.Get("Content-Type")
if ct != "" {
ct, _, _ = mime.ParseMediaType(ct)
}
// V2 - json/object exchange, V1 - raw SDP exchange
apiV2 := ct == "application/json"
var offer string
if apiV2 {
var desc pion.SessionDescription
if err := json.NewDecoder(r.Body).Decode(&desc); err != nil {
log.Error().Err(err).Caller().Send()
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
offer = desc.SDP
} else {
body, err := io.ReadAll(r.Body)
if err != nil {
log.Error().Err(err).Caller().Send()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
offer = string(body)
}
answer, err := ExchangeSDP(stream, offer, r.UserAgent())
if err != nil {
log.Error().Err(err).Caller().Send()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// send SDP to client
if apiV2 {
w.Header().Set("Content-Type", ct)
v := pion.SessionDescription{
Type: pion.SDPTypeAnswer, SDP: answer,
}
if err = json.NewEncoder(w).Encode(v); err != nil {
log.Error().Err(err).Caller().Send()
http.Error(w, err.Error(), http.StatusInternalServerError)
}
} else {
if _, err = w.Write([]byte(answer)); err != nil {
log.Error().Err(err).Caller().Send()
}
}
}
func ExchangeSDP(stream *streams.Stream, offer string, userAgent string) (answer string, err error) {
pc, err := newPeerConnection()
if err != nil {
@@ -220,7 +162,7 @@ func ExchangeSDP(stream *streams.Stream, offer string, userAgent string) (answer
}
// create new webrtc instance
conn := webrtc.NewServer(pc)
conn := webrtc.NewConn(pc)
conn.UserAgent = userAgent
conn.Listen(func(msg interface{}) {
switch msg := msg.(type) {