BIG core logic rewrite

This commit is contained in:
Alexey Khit
2023-03-17 06:48:02 +03:00
parent 2146ea470b
commit 12a7b96289
107 changed files with 3000 additions and 3024 deletions
-15
View File
@@ -1,15 +0,0 @@
package streams
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/streamer"
)
type Consumer struct {
element streamer.Consumer
tracks []*streamer.Track
}
func (c *Consumer) MarshalJSON() ([]byte, error) {
return json.Marshal(c.element)
}
+3 -3
View File
@@ -2,12 +2,12 @@ package streams
import (
"fmt"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/AlexxIT/go2rtc/pkg/core"
"strings"
"sync"
)
type Handler func(url string) (streamer.Producer, error)
type Handler func(url string) (core.Producer, error)
var handlers = map[string]Handler{}
var handlersMu sync.Mutex
@@ -32,7 +32,7 @@ func HasProducer(url string) bool {
return getHandler(url) != nil
}
func GetProducer(url string) (streamer.Producer, error) {
func GetProducer(url string) (core.Producer, error) {
handler := getHandler(url)
if handler == nil {
return nil, fmt.Errorf("unsupported scheme: %s", url)
+25 -26
View File
@@ -2,14 +2,14 @@ package streams
import (
"errors"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func (s *Stream) Play(source string) error {
s.mu.Lock()
for _, producer := range s.producers {
if producer.state == stateInternal && producer.element != nil {
_ = producer.element.Stop()
if producer.state == stateInternal && producer.conn != nil {
_ = producer.conn.Stop()
}
}
s.mu.Unlock()
@@ -18,14 +18,14 @@ func (s *Stream) Play(source string) error {
return nil
}
var src streamer.Producer
var src core.Producer
for _, producer := range s.producers {
if producer.element == nil {
if producer.conn == nil {
continue
}
cons, ok := producer.element.(streamer.Consumer)
cons, ok := producer.conn.(core.Consumer)
if !ok {
continue
}
@@ -59,7 +59,7 @@ func (s *Stream) Play(source string) error {
}
// check if client support consumer interface
cons, ok := dst.(streamer.Consumer)
cons, ok := dst.(core.Consumer)
if !ok {
_ = dst.Stop()
continue
@@ -98,50 +98,49 @@ func (s *Stream) Play(source string) error {
return errors.New("can't find consumer")
}
func (s *Stream) AddInternalProducer(prod streamer.Producer) {
producer := &Producer{element: prod, state: stateInternal}
func (s *Stream) AddInternalProducer(conn core.Producer) {
producer := &Producer{conn: conn, state: stateInternal}
s.mu.Lock()
s.producers = append(s.producers, producer)
s.mu.Unlock()
}
func (s *Stream) AddInternalConsumer(cons streamer.Consumer) {
consumer := &Consumer{element: cons}
func (s *Stream) AddInternalConsumer(conn core.Consumer) {
s.mu.Lock()
s.consumers = append(s.consumers, consumer)
s.consumers = append(s.consumers, conn)
s.mu.Unlock()
}
func (s *Stream) RemoveInternalConsumer(cons streamer.Consumer) {
func (s *Stream) RemoveInternalConsumer(conn core.Consumer) {
s.mu.Lock()
for i, consumer := range s.consumers {
if consumer.element == cons {
s.removeConsumer(i)
if consumer == conn {
s.consumers = append(s.consumers[:i], s.consumers[i+1:]...)
break
}
}
s.mu.Unlock()
}
func matchMedia(prod streamer.Producer, cons streamer.Consumer) bool {
func matchMedia(prod core.Producer, cons core.Consumer) bool {
for _, consMedia := range cons.GetMedias() {
for _, prodMedia := range prod.GetMedias() {
// codec negotiation
prodCodec := prodMedia.MatchMedia(consMedia)
if prodMedia.Direction != core.DirectionRecvonly {
continue
}
prodCodec, consCodec := prodMedia.MatchMedia(consMedia)
if prodCodec == nil {
continue
}
// setup producer track
prodTrack := prod.GetTrack(prodMedia, prodCodec)
if prodTrack == nil {
return false
track, err := prod.GetTrack(prodMedia, prodCodec)
if err != nil {
continue
}
// setup consumer track
consTrack := cons.AddTrack(consMedia, prodTrack)
if consTrack == nil {
return false
if err = cons.AddTrack(consMedia, consCodec, track); err != nil {
continue
}
return true
+117 -83
View File
@@ -2,7 +2,8 @@ package streams
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"errors"
"github.com/AlexxIT/go2rtc/pkg/core"
"strings"
"sync"
"time"
@@ -20,20 +21,95 @@ const (
)
type Producer struct {
streamer.Element
core.Listener
url string
template string
element streamer.Producer
conn core.Producer
receivers []*core.Receiver
senders []*core.Receiver
lastErr error
tracks []*streamer.Track
state state
mu sync.Mutex
workerID int
}
func (p *Producer) Dial() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.state == stateNone {
conn, err := GetProducer(p.url)
if err != nil {
return err
}
p.conn = conn
p.state = stateMedias
}
return nil
}
func (p *Producer) GetMedias() []*core.Media {
p.mu.Lock()
defer p.mu.Unlock()
return p.conn.GetMedias()
}
func (p *Producer) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.state == stateNone {
return nil, errors.New("get track from none state")
}
for _, track := range p.receivers {
if track.Codec == codec {
return track, nil
}
}
track, err := p.conn.GetTrack(media, codec)
if err != nil {
return nil, err
}
p.receivers = append(p.receivers, track)
if p.state == stateMedias {
p.state = stateTracks
}
return track, nil
}
func (p *Producer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.state == stateNone {
return errors.New("add track from none state")
}
if err := p.conn.(core.Consumer).AddTrack(media, codec, track); err != nil {
return err
}
p.senders = append(p.senders, track)
if p.state == stateMedias {
p.state = stateTracks
}
return nil
}
func (p *Producer) SetSource(s string) {
if p.template == "" {
p.template = p.url
@@ -41,64 +117,12 @@ func (p *Producer) SetSource(s string) {
p.url = strings.Replace(p.template, "{input}", s, 1)
}
func (p *Producer) GetMedias() []*streamer.Media {
p.mu.Lock()
defer p.mu.Unlock()
if p.state == stateNone {
log.Debug().Msgf("[streams] probe producer url=%s", p.url)
p.element, p.lastErr = GetProducer(p.url)
if p.lastErr != nil || p.element == nil {
log.Error().Err(p.lastErr).Str("url", p.url).Caller().Send()
return nil
}
p.state = stateMedias
}
// if element in reconnect state
if p.element == nil {
return nil
}
return p.element.GetMedias()
}
func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track {
p.mu.Lock()
defer p.mu.Unlock()
if p.state == stateNone {
return nil
}
for _, track := range p.tracks {
if track.Codec == codec {
return track
}
}
track := p.element.GetTrack(media, codec)
if track == nil {
return nil
}
p.tracks = append(p.tracks, track)
if p.state == stateMedias {
p.state = stateTracks
}
return track
}
func (p *Producer) MarshalJSON() ([]byte, error) {
if p.element != nil {
return json.Marshal(p.element)
if p.conn != nil {
return json.Marshal(p.conn)
}
info := streamer.Info{URL: p.url}
info := core.Info{URL: p.url}
return json.Marshal(info)
}
@@ -117,11 +141,11 @@ func (p *Producer) start() {
p.state = stateStart
p.workerID++
go p.worker(p.element, p.workerID)
go p.worker(p.conn, p.workerID)
}
func (p *Producer) worker(element streamer.Producer, workerID int) {
if err := element.Start(); err != nil {
func (p *Producer) worker(conn core.Producer, workerID int) {
if err := conn.Start(); err != nil {
p.mu.Lock()
closed := p.workerID != workerID
p.mu.Unlock()
@@ -147,9 +171,8 @@ func (p *Producer) reconnect(workerID int) {
log.Debug().Msgf("[streams] reconnect to url=%s", p.url)
p.element, p.lastErr = GetProducer(p.url)
if p.lastErr != nil || p.element == nil {
log.Debug().Msgf("[streams] producer=%s", p.lastErr)
if err := p.Dial(); err != nil {
log.Debug().Msgf("[streams] producer=%s", err)
// TODO: dynamic timeout
time.AfterFunc(30*time.Second, func() {
p.reconnect(workerID)
@@ -157,27 +180,37 @@ func (p *Producer) reconnect(workerID int) {
return
}
medias := p.element.GetMedias()
for _, media := range p.conn.GetMedias() {
switch media.Direction {
case core.DirectionRecvonly:
for _, receiver := range p.receivers {
codec := media.MatchCodec(receiver.Codec)
if codec == nil {
continue
}
// convert all old producer tracks to new tracks
for i, oldTrack := range p.tracks {
// match new element medias with old track codec
for _, media := range medias {
codec := media.MatchCodec(oldTrack.Codec)
if codec == nil {
continue
track, err := p.conn.GetTrack(media, codec)
if err != nil {
continue
}
receiver.Replace(track)
break
}
// move sink from old track to new track
newTrack := p.element.GetTrack(media, codec)
newTrack.GetSink(oldTrack)
p.tracks[i] = newTrack
case core.DirectionSendonly:
for _, sender := range p.senders {
codec := media.MatchCodec(sender.Codec)
if codec == nil {
continue
}
break
_ = p.conn.(core.Consumer).AddTrack(media, codec, sender)
}
}
}
go p.worker(p.element, workerID)
go p.worker(p.conn, workerID)
}
func (p *Producer) stop() {
@@ -197,11 +230,12 @@ func (p *Producer) stop() {
log.Debug().Msgf("[streams] stop producer url=%s", p.url)
if p.element != nil {
_ = p.element.Stop()
p.element = nil
if p.conn != nil {
_ = p.conn.Stop()
p.conn = nil
}
p.state = stateNone
p.tracks = nil
p.receivers = nil
p.senders = nil
}
+63 -92
View File
@@ -4,7 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/AlexxIT/go2rtc/pkg/core"
"strings"
"sync"
"sync/atomic"
@@ -12,12 +12,12 @@ import (
type Stream struct {
producers []*Producer
consumers []*Consumer
consumers []core.Consumer
mu sync.Mutex
requests int32
}
func NewStream(source interface{}) *Stream {
func NewStream(source any) *Stream {
switch source := source.(type) {
case string:
s := new(Stream)
@@ -38,7 +38,7 @@ func NewStream(source interface{}) *Stream {
case nil:
return new(Stream)
default:
panic("wrong source type")
panic(core.Caller())
}
}
@@ -48,57 +48,71 @@ func (s *Stream) SetSource(source string) {
}
}
func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
// support for multiple simultaneous requests from different consumers
atomic.AddInt32(&s.requests, 1)
ic := len(s.consumers)
consumer := &Consumer{element: cons}
var producers []*Producer // matched producers for consumer
var codecs string
// Step 1. Get consumer medias
for icc, consMedia := range cons.GetMedias() {
log.Trace().Stringer("media", consMedia).
Msgf("[streams] consumer=%d candidate=%d", ic, icc)
for _, consMedia := range cons.GetMedias() {
producers:
for ip, prod := range s.producers {
// Step 2. Get producer medias (not tracks yet)
for ipc, prodMedia := range prod.GetMedias() {
log.Trace().Stringer("media", prodMedia).
Msgf("[streams] producer=%d candidate=%d", ip, ipc)
for _, prod := range s.producers {
if err = prod.Dial(); err != nil {
continue
}
// Step 2. Get producer medias (not tracks yet)
for _, prodMedia := range prod.GetMedias() {
collectCodecs(prodMedia, &codecs)
// Step 3. Match consumer/producer codecs list
prodCodec := prodMedia.MatchMedia(consMedia)
if prodCodec != nil {
log.Trace().Stringer("codec", prodCodec).
Msgf("[streams] match producer:%d:%d => consumer:%d:%d", ip, ipc, ic, icc)
prodCodec, consCodec := prodMedia.MatchMedia(consMedia)
if prodCodec == nil {
continue
}
// Step 4. Get producer track
prodTrack := prod.GetTrack(prodMedia, prodCodec)
if prodTrack == nil {
log.Warn().Str("url", prod.url).Msg("[streams] can't get track")
var track *core.Receiver
switch prodMedia.Direction {
case core.DirectionRecvonly:
// Step 4. Get recvonly track from producer
if track, err = prod.GetTrack(prodMedia, prodCodec); err != nil {
log.Info().Err(err).Msg("[streams] can't get track")
continue
}
// Step 5. Add track to consumer
if err = cons.AddTrack(consMedia, consCodec, track); err != nil {
log.Info().Err(err).Msg("[streams] can't add track")
continue
}
// Step 5. Add track to consumer and get new track
consTrack := consumer.element.AddTrack(consMedia, prodTrack)
consumer.tracks = append(consumer.tracks, consTrack)
producers = append(producers, prod)
if !consMedia.MatchAll() {
break producers
case core.DirectionSendonly:
// Step 4. Get recvonly track from consumer (backchannel)
if track, err = cons.(core.Producer).GetTrack(consMedia, consCodec); err != nil {
log.Info().Err(err).Msg("[streams] can't get track")
continue
}
// Step 5. Add track to producer
if err = prod.AddTrack(prodMedia, prodCodec, track); err != nil {
log.Info().Err(err).Msg("[streams] can't add track")
continue
}
}
producers = append(producers, prod)
if !consMedia.MatchAll() {
break producers
}
}
}
}
// stop producers if they don't have readers
if atomic.AddInt32(&s.requests, -1) == 0 {
s.stopProducers()
}
@@ -118,7 +132,7 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
}
s.mu.Lock()
s.consumers = append(s.consumers, consumer)
s.consumers = append(s.consumers, cons)
s.mu.Unlock()
// there may be duplicates, but that's not a problem
@@ -129,16 +143,13 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
return nil
}
func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
func (s *Stream) RemoveConsumer(cons core.Consumer) {
_ = cons.Stop()
s.mu.Lock()
for i, consumer := range s.consumers {
if consumer.element == cons {
// remove consumer pads from all producers
for _, track := range consumer.tracks {
track.Unbind()
}
// remove consumer from slice
s.removeConsumer(i)
if consumer == cons {
s.consumers = append(s.consumers[:i], s.consumers[i+1:]...)
break
}
}
@@ -147,18 +158,18 @@ func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
s.stopProducers()
}
func (s *Stream) AddProducer(prod streamer.Producer) {
producer := &Producer{element: prod, state: stateExternal}
func (s *Stream) AddProducer(prod core.Producer) {
producer := &Producer{conn: prod, state: stateExternal}
s.mu.Lock()
s.producers = append(s.producers, producer)
s.mu.Unlock()
}
func (s *Stream) RemoveProducer(prod streamer.Producer) {
func (s *Stream) RemoveProducer(prod core.Producer) {
s.mu.Lock()
for i, producer := range s.producers {
if producer.element == prod {
s.removeProducer(i)
if producer.conn == prod {
s.producers = append(s.producers[:i], s.producers[i+1:]...)
break
}
}
@@ -169,8 +180,8 @@ func (s *Stream) stopProducers() {
s.mu.Lock()
producers:
for _, producer := range s.producers {
for _, track := range producer.tracks {
if track.HasSink() {
for _, track := range producer.receivers {
if len(track.Senders()) > 0 {
continue producers
}
}
@@ -179,20 +190,6 @@ producers:
s.mu.Unlock()
}
//func (s *Stream) Active() bool {
// if len(s.consumers) > 0 {
// return true
// }
//
// for _, prod := range s.producers {
// if prod.element != nil {
// return true
// }
// }
//
// return false
//}
func (s *Stream) MarshalJSON() ([]byte, error) {
if !s.mu.TryLock() {
log.Warn().Msgf("[streams] json locked")
@@ -200,8 +197,8 @@ func (s *Stream) MarshalJSON() ([]byte, error) {
}
var info struct {
Producers []*Producer `json:"producers"`
Consumers []*Consumer `json:"consumers"`
Producers []*Producer `json:"producers"`
Consumers []core.Consumer `json:"consumers"`
}
info.Producers = s.producers
info.Consumers = s.consumers
@@ -211,40 +208,14 @@ func (s *Stream) MarshalJSON() ([]byte, error) {
return json.Marshal(info)
}
func (s *Stream) removeConsumer(i int) {
switch {
case len(s.consumers) == 1: // only one element
s.consumers = nil
case i == 0: // first element
s.consumers = s.consumers[1:]
case i == len(s.consumers)-1: // last element
s.consumers = s.consumers[:i]
default: // middle element
s.consumers = append(s.consumers[:i], s.consumers[i+1:]...)
}
}
func (s *Stream) removeProducer(i int) {
switch {
case len(s.producers) == 1: // only one element
s.producers = nil
case i == 0: // first element
s.producers = s.producers[1:]
case i == len(s.producers)-1: // last element
s.producers = s.producers[:i]
default: // middle element
s.producers = append(s.producers[:i], s.producers[i+1:]...)
}
}
func collectCodecs(media *streamer.Media, codecs *string) {
if media.Direction == streamer.DirectionRecvonly {
func collectCodecs(media *core.Media, codecs *string) {
if media.Direction == core.DirectionRecvonly {
return
}
for _, codec := range media.Codecs {
name := codec.Name
if name == streamer.CodecAAC {
if name == core.CodecAAC {
name = "AAC"
}
if strings.Contains(*codecs, name) {