support rtsp udp transport

This commit is contained in:
seydx
2025-06-01 01:44:01 +03:00
parent ae8145f266
commit 859cd1cbe6
5 changed files with 626 additions and 150 deletions
+272 -119
View File
@@ -40,6 +40,7 @@ type Conn struct {
keepalive int
mode core.Mode
playOK bool
playErr error
reader *bufio.Reader
sequence int
session string
@@ -47,8 +48,32 @@ type Conn struct {
state State
stateMu sync.Mutex
transportMode TransportMode
// UDP
udpRtpConns map[byte]*UDPConnection
udpRtcpConns map[byte]*UDPConnection
udpRtpListeners map[byte]*UDPConnection
udpRtcpListeners map[byte]*UDPConnection
portToChannel map[int]byte
channelCounter byte
}
type UDPConnection struct {
Conn net.UDPConn
Channel byte
}
type TransportMode int
const (
TransportTCP TransportMode = iota
TransportUDP
ReceiveMTU = 1500
)
const (
ProtoRTSP = "RTSP/1.0"
MethodOptions = "OPTIONS"
@@ -68,7 +93,6 @@ func (s State) String() string {
case StateNone:
return "NONE"
case StateConn:
return "CONN"
case StateSetup:
return MethodSetup
@@ -131,133 +155,22 @@ func (c *Conn) Handle() (err error) {
for c.state != StateNone {
ts := time.Now()
time := ts.Add(timeout)
if err = c.conn.SetReadDeadline(ts.Add(timeout)); err != nil {
if err = c.conn.SetReadDeadline(time); err != nil {
return
}
// we can read:
// 1. RTP interleaved: `$` + 1B channel number + 2B size
// 2. RTSP response: RTSP/1.0 200 OK
// 3. RTSP request: OPTIONS ...
var buf4 []byte // `$` + 1B channel number + 2B size
buf4, err = c.reader.Peek(4)
if err != nil {
return
}
var channelID byte
var size uint16
if buf4[0] != '$' {
switch string(buf4) {
case "RTSP":
var res *tcp.Response
if res, err = c.ReadResponse(); err != nil {
return
}
c.Fire(res)
// for playing backchannel only after OK response on play
c.playOK = true
continue
case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_":
var req *tcp.Request
if req, err = c.ReadRequest(); err != nil {
return
}
c.Fire(req)
if req.Method == MethodOptions {
res := &tcp.Response{Request: req}
if err = c.WriteResponse(res); err != nil {
return
}
}
continue
default:
c.Fire("RTSP wrong input")
for i := 0; ; i++ {
// search next start symbol
if _, err = c.reader.ReadBytes('$'); err != nil {
return err
}
if channelID, err = c.reader.ReadByte(); err != nil {
return err
}
// TODO: better check maximum good channel ID
if channelID >= 20 {
continue
}
buf4 = make([]byte, 2)
if _, err = io.ReadFull(c.reader, buf4); err != nil {
return err
}
// check if size good for RTP
size = binary.BigEndian.Uint16(buf4)
if size <= 1500 {
break
}
// 10 tries to find good packet
if i >= 10 {
return fmt.Errorf("RTSP wrong input")
}
}
if c.transportMode == TransportUDP {
if err = c.handleUDPClientData(time); err != nil {
return err
}
} else {
// hope that the odd channels are always RTCP
channelID = buf4[1]
// get data size
size = binary.BigEndian.Uint16(buf4[2:])
// skip 4 bytes from c.reader.Peek
if _, err = c.reader.Discard(4); err != nil {
return
if err = c.handleTCPClientData(); err != nil {
return err
}
}
// init memory for data
buf := make([]byte, size)
if _, err = io.ReadFull(c.reader, buf); err != nil {
return
}
c.Recv += int(size)
if channelID&1 == 0 {
packet := &rtp.Packet{}
if err = packet.Unmarshal(buf); err != nil {
return
}
for _, receiver := range c.Receivers {
if receiver.ID == channelID {
receiver.WriteRTP(packet)
break
}
}
} else {
msg := &RTCP{Channel: channelID}
if err = msg.Header.Unmarshal(buf); err != nil {
continue
}
msg.Packets, err = rtcp.Unmarshal(buf)
if err != nil {
continue
}
c.Fire(msg)
}
if keepaliveDT != 0 && ts.After(keepaliveTS) {
req := &tcp.Request{Method: MethodOptions, URL: c.URL}
if err = c.WriteRequest(req); err != nil {
@@ -271,6 +184,246 @@ func (c *Conn) Handle() (err error) {
return
}
func (c *Conn) handleUDPClientData(time time.Time) error {
if c.playErr != nil {
return c.playErr
}
if c.state == StatePlay && c.playOK {
return nil
}
var buf4 []byte
buf4, err := c.reader.Peek(4)
if err != nil {
return err
}
switch string(buf4) {
case "RTSP":
var res *tcp.Response
if res, err = c.ReadResponse(); err != nil {
return err
}
c.Fire(res)
c.playOK = true
for _, listener := range c.udpRtpListeners {
go func(listener *UDPConnection) {
defer listener.Conn.Close()
for c.state != StateNone {
if err := listener.Conn.SetReadDeadline(time); err != nil {
c.playErr = err
return
}
buffer := make([]byte, ReceiveMTU)
n, _, err := listener.Conn.ReadFromUDP(buffer)
if err != nil {
c.playErr = err
break
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(buffer[:n]); err != nil {
c.playErr = err
return
}
for _, receiver := range c.Receivers {
if receiver.ID == listener.Channel {
receiver.WriteRTP(packet)
break
}
}
c.Recv += len(buffer[:n])
}
}(listener)
}
for _, listener := range c.udpRtcpListeners {
go func(listener *UDPConnection) {
defer listener.Conn.Close()
for c.state != StateNone {
if err := listener.Conn.SetReadDeadline(time); err != nil {
return
}
buffer := make([]byte, ReceiveMTU)
n, _, err := listener.Conn.ReadFromUDP(buffer)
if err != nil {
break
}
msg := &RTCP{Channel: listener.Channel}
if err := msg.Header.Unmarshal(buffer[:n]); err != nil {
continue
}
msg.Packets, err = rtcp.Unmarshal(buffer[:n])
if err != nil {
continue
}
c.Fire(msg)
}
}(listener)
}
case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_":
var req *tcp.Request
if req, err = c.ReadRequest(); err != nil {
return err
}
c.Fire(req)
if req.Method == MethodOptions {
res := &tcp.Response{Request: req}
if err = c.WriteResponse(res); err != nil {
return err
}
}
default:
return fmt.Errorf("RTSP wrong input")
}
return nil
}
func (c *Conn) handleTCPClientData() error {
// we can read:
// 1. RTP interleaved: `$` + 1B channel number + 2B size
// 2. RTSP response: RTSP/1.0 200 OK
// 3. RTSP request: OPTIONS ...
var buf4 []byte // `$` + 1B channel number + 2B size
var err error
buf4, err = c.reader.Peek(4)
if err != nil {
return err
}
var channel byte
var size uint16
if buf4[0] != '$' {
switch string(buf4) {
case "RTSP":
var res *tcp.Response
if res, err = c.ReadResponse(); err != nil {
return err
}
c.Fire(res)
// for playing backchannel only after OK response on play
c.playOK = true
return nil
case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_":
var req *tcp.Request
if req, err = c.ReadRequest(); err != nil {
return err
}
c.Fire(req)
if req.Method == MethodOptions {
res := &tcp.Response{Request: req}
if err = c.WriteResponse(res); err != nil {
return err
}
}
return nil
default:
c.Fire("RTSP wrong input")
for i := 0; ; i++ {
// search next start symbol
if _, err = c.reader.ReadBytes('$'); err != nil {
return err
}
if channel, err = c.reader.ReadByte(); err != nil {
return err
}
// TODO: better check maximum good channel ID
if channel >= 20 {
continue
}
buf4 = make([]byte, 2)
if _, err = io.ReadFull(c.reader, buf4); err != nil {
return err
}
// check if size good for RTP
size = binary.BigEndian.Uint16(buf4)
if size <= 1500 {
break
}
// 10 tries to find good packet
if i >= 10 {
return fmt.Errorf("RTSP wrong input")
}
}
}
} else {
// hope that the odd channels are always RTCP
channel = buf4[1]
// get data size
size = binary.BigEndian.Uint16(buf4[2:])
// skip 4 bytes from c.reader.Peek
if _, err = c.reader.Discard(4); err != nil {
return err
}
}
// init memory for data
buf := make([]byte, size)
if _, err = io.ReadFull(c.reader, buf); err != nil {
return err
}
c.Recv += int(size)
if channel&1 == 0 {
packet := &rtp.Packet{}
if err = packet.Unmarshal(buf); err != nil {
return err
}
for _, receiver := range c.Receivers {
if receiver.ID == channel {
receiver.WriteRTP(packet)
break
}
}
} else {
msg := &RTCP{Channel: channel}
if err = msg.Header.Unmarshal(buf); err != nil {
return nil
}
msg.Packets, err = rtcp.Unmarshal(buf)
if err != nil {
return nil
}
c.Fire(msg)
}
return nil
}
func (c *Conn) WriteRequest(req *tcp.Request) error {
if req.Proto == "" {
req.Proto = ProtoRTSP