Code refactoring for #1758

This commit is contained in:
Alex X
2025-10-09 21:07:20 +03:00
parent a667acad07
commit fde1fdc592
4 changed files with 178 additions and 478 deletions
+58 -160
View File
@@ -2,6 +2,7 @@ package rtsp
import (
"bufio"
"context"
"encoding/binary"
"fmt"
"io"
@@ -13,7 +14,6 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)
@@ -49,27 +49,10 @@ type Conn struct {
state State
stateMu sync.Mutex
// UDP
udpRtpConns map[byte]*UDPConnection
udpRtcpConns map[byte]*UDPConnection
udpRtpListeners map[byte]*UDPConnection
udpRtcpListeners map[byte]*UDPConnection
portToChannel map[int]byte
channelCounter byte
udpConn []*net.UDPConn
udpAddr []*net.UDPAddr
}
type UDPConnection struct {
Conn net.UDPConn
Channel byte
}
type TransportMode int
const (
ReceiveMTU = 1500
)
const (
ProtoRTSP = "RTSP/1.0"
MethodOptions = "OPTIONS"
@@ -108,23 +91,25 @@ const (
func (c *Conn) Handle() (err error) {
var timeout time.Duration
var keepaliveDT time.Duration
var keepaliveTS time.Time
switch c.mode {
case core.ModeActiveProducer:
var keepaliveDT time.Duration
if c.keepalive > 5 {
keepaliveDT = time.Duration(c.keepalive-5) * time.Second
} else {
keepaliveDT = 25 * time.Second
}
keepaliveTS = time.Now().Add(keepaliveDT)
ctx, cancel := context.WithCancel(context.Background())
go c.handleKeepalive(ctx, keepaliveDT)
defer cancel()
if c.Timeout == 0 {
// polling frames from remote RTSP Server (ex Camera)
timeout = time.Second * 5
if len(c.Receivers) == 0 {
if len(c.Receivers) == 0 || c.Transport == "udp" {
// if we only send audio to camera
// https://github.com/AlexxIT/go2rtc/issues/659
timeout += keepaliveDT
@@ -149,150 +134,58 @@ func (c *Conn) Handle() (err error) {
return fmt.Errorf("wrong RTSP conn mode: %d", c.mode)
}
for i := 0; i < len(c.udpConn); i++ {
go c.handleUDPData(byte(i))
}
for c.state != StateNone {
ts := time.Now()
time := ts.Add(timeout)
if err = c.conn.SetReadDeadline(time); err != nil {
_ = c.conn.SetReadDeadline(ts.Add(timeout))
if err = c.handleTCPData(); err != nil {
return
}
if c.Transport == "udp" {
if err = c.handleUDPClientData(time); err != nil {
return err
}
} else {
if err = c.handleTCPClientData(); err != nil {
return err
}
}
if keepaliveDT != 0 && ts.After(keepaliveTS) {
req := &tcp.Request{Method: MethodOptions, URL: c.URL}
if err = c.WriteRequest(req); err != nil {
return
}
keepaliveTS = ts.Add(keepaliveDT)
}
}
return
}
func (c *Conn) handleUDPClientData(time time.Time) error {
if c.playErr != nil {
return c.playErr
}
if c.state == StatePlay && c.playOK {
return nil
}
var buf4 []byte
buf4, err := c.reader.Peek(4)
if err != nil {
return err
}
switch string(buf4) {
case "RTSP":
var res *tcp.Response
if res, err = c.ReadResponse(); err != nil {
return err
}
c.Fire(res)
c.playOK = true
for _, listener := range c.udpRtpListeners {
go func(listener *UDPConnection) {
defer listener.Conn.Close()
for c.state != StateNone {
if err := listener.Conn.SetReadDeadline(time); err != nil {
c.playErr = err
return
}
buffer := make([]byte, ReceiveMTU)
n, _, err := listener.Conn.ReadFromUDP(buffer)
if err != nil {
c.playErr = err
break
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(buffer[:n]); err != nil {
c.playErr = err
return
}
for _, receiver := range c.Receivers {
if receiver.ID == listener.Channel {
receiver.WriteRTP(packet)
break
}
}
c.Recv += len(buffer[:n])
}
}(listener)
}
for _, listener := range c.udpRtcpListeners {
go func(listener *UDPConnection) {
defer listener.Conn.Close()
for c.state != StateNone {
if err := listener.Conn.SetReadDeadline(time); err != nil {
return
}
buffer := make([]byte, ReceiveMTU)
n, _, err := listener.Conn.ReadFromUDP(buffer)
if err != nil {
break
}
msg := &RTCP{Channel: listener.Channel}
if err := msg.Header.Unmarshal(buffer[:n]); err != nil {
continue
}
msg.Packets, err = rtcp.Unmarshal(buffer[:n])
if err != nil {
continue
}
c.Fire(msg)
}
}(listener)
}
case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_":
var req *tcp.Request
if req, err = c.ReadRequest(); err != nil {
return err
}
c.Fire(req)
if req.Method == MethodOptions {
res := &tcp.Response{Request: req}
if err = c.WriteResponse(res); err != nil {
return err
func (c *Conn) handleKeepalive(ctx context.Context, d time.Duration) {
ticker := time.NewTicker(d)
for {
select {
case <-ticker.C:
req := &tcp.Request{Method: MethodOptions, URL: c.URL}
if err := c.WriteRequest(req); err != nil {
return
}
case <-ctx.Done():
return
}
default:
return fmt.Errorf("RTSP wrong input")
}
return nil
}
func (c *Conn) handleTCPClientData() error {
func (c *Conn) handleUDPData(channel byte) {
// TODO: handle timeouts and drop TCP connection after any error
conn := c.udpConn[channel]
for {
// TP-Link Tapo camera has crazy 10000 bytes packet size
buf := make([]byte, 10240)
n, _, err := conn.ReadFromUDP(buf)
if err != nil {
return
}
if err = c.handleRawPacket(channel, buf[:n]); err != nil {
return
}
}
}
func (c *Conn) handleTCPData() error {
// we can read:
// 1. RTP interleaved: `$` + 1B channel number + 2B size
// 2. RTSP response: RTSP/1.0 200 OK
@@ -390,9 +283,13 @@ func (c *Conn) handleTCPClientData() error {
c.Recv += int(size)
return c.handleRawPacket(channel, buf)
}
func (c *Conn) handleRawPacket(channel byte, buf []byte) error {
if channel&1 == 0 {
packet := &rtp.Packet{}
if err = packet.Unmarshal(buf); err != nil {
if err := packet.Unmarshal(buf); err != nil {
return err
}
@@ -405,14 +302,15 @@ func (c *Conn) handleTCPClientData() error {
} else {
msg := &RTCP{Channel: channel}
if err = msg.Header.Unmarshal(buf); err != nil {
if err := msg.Header.Unmarshal(buf); err != nil {
return nil
}
msg.Packets, err = rtcp.Unmarshal(buf)
if err != nil {
return nil
}
//var err error
//msg.Packets, err = rtcp.Unmarshal(buf)
//if err != nil {
// return nil
//}
c.Fire(msg)
}