Add support backchannel for Tapo source

This commit is contained in:
Alexey Khit
2023-02-16 11:29:10 +03:00
parent 9fd783793e
commit 70c415a1d8
8 changed files with 498 additions and 76 deletions
+53
View File
@@ -0,0 +1,53 @@
package tapo
import (
"bytes"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/pion/rtp"
"strconv"
)
func (c *Client) backchannelWriter() streamer.WriterFunc {
w := mpegts.NewWriter()
w.AddPES(68, mpegts.StreamTypePCMATapo)
w.WritePAT()
w.WritePMT()
return func(packet *rtp.Packet) (err error) {
// don't know why 68 and 192
w.WritePES(68, 192, packet.Payload)
err = c.WriteBackchannel(w.Bytes())
w.Reset()
return
}
}
func (c *Client) SetupBackchannel() (err error) {
// if conn1 is not used - we will use it for backchannel
// or we need to start another conn for session2
if c.session1 != "" {
if c.conn2, err = c.newConn(); err != nil {
return
}
} else {
c.conn2 = c.conn1
}
c.session2, err = c.Request(c.conn2, []byte(`{"params":{"talk":{"mode":"aec"},"method":"get"},"seq":3,"type":"request"}`))
return
}
func (c *Client) WriteBackchannel(body []byte) (err error) {
// TODO: fixme (size)
buf := bytes.NewBuffer(nil)
buf.WriteString("----client-stream-boundary--\r\n")
buf.WriteString("Content-Type: audio/mp2t\r\n")
buf.WriteString("X-If-Encrypt: 0\r\n")
buf.WriteString("X-Session-Id: " + c.session2 + "\r\n")
buf.WriteString("Content-Length: " + strconv.Itoa(len(body)) + "\r\n\r\n")
buf.Write(body)
_, err = buf.WriteTo(c.conn2)
return
}
+83 -59
View File
@@ -1,9 +1,11 @@
package tapo
import (
"bytes"
"crypto/aes"
"crypto/cipher"
"crypto/md5"
"encoding/json"
"errors"
"fmt"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
@@ -14,7 +16,6 @@ import (
"net/http"
"net/url"
"strconv"
"strings"
)
type Client struct {
@@ -25,10 +26,13 @@ type Client struct {
medias []*streamer.Media
tracks map[byte]*streamer.Track
conn net.Conn
reader *multipart.Reader
conn1 net.Conn
conn2 net.Conn
decrypt func(b []byte) []byte
session1 string
session2 string
}
// block ciphers using cipher block chaining.
@@ -42,9 +46,14 @@ func NewClient(url string) *Client {
}
func (c *Client) Dial() (err error) {
c.conn1, err = c.newConn()
return
}
func (c *Client) newConn() (net.Conn, error) {
u, err := url.Parse(c.url)
if err != nil {
return
return nil, err
}
// support raw username/password
@@ -66,20 +75,28 @@ func (c *Client) Dial() (err error) {
req, err := http.NewRequest("POST", u.String(), nil)
if err != nil {
return
return nil, err
}
req.Header.Set("Content-Type", "multipart/mixed; boundary=--client-stream-boundary--")
res, err := tcp.Do(req)
if err != nil {
return
return nil, err
}
if res.StatusCode != http.StatusOK {
return errors.New(res.Status)
return nil, errors.New(res.Status)
}
if c.decrypt == nil {
c.newDectypter(res, username, password)
}
return res.Body.(net.Conn), nil
}
func (c *Client) newDectypter(res *http.Response, username, password string) {
// extract nonce from response
// cipher="AES_128_CBC" username="admin" padding="PKCS7_16" algorithm="MD5" nonce="***"
nonce := res.Header.Get("Key-Exchange")
@@ -106,52 +123,34 @@ func (c *Client) Dial() (err error) {
padSize := int(b[len(b)-1])
return b[:len(b)-padSize]
}
c.conn = res.Body.(net.Conn)
boundary := res.Header.Get("Content-Type")
_, boundary, _ = strings.Cut(boundary, "boundary=")
c.reader = multipart.NewReader(c.conn, boundary)
return nil
}
func (c *Client) Play() (err error) {
// audio: default, disable, enable
body := []byte(
"----client-stream-boundary--\r\n" +
"Content-Type: application/json\r\nContent-Length: 120\r\n\r\n" +
`{"params":{"preview":{"audio":["default"],"channels":[0],"resolutions":["HD"]},"method":"get"},"seq":1,"type":"request"}` +
"\r\n",
)
func (c *Client) SetupStream() (err error) {
if c.session1 != "" {
return
}
_, err = c.conn.Write(body)
return nil
// audio: default, disable, enable
c.session1, err = c.Request(c.conn1, []byte(`{"params":{"preview":{"audio":["default"],"channels":[0],"resolutions":["HD"]},"method":"get"},"seq":1,"type":"request"}`))
return
}
// Handle - first run will be in probe state
func (c *Client) Handle() error {
if c.tracks == nil {
c.tracks = map[byte]*streamer.Track{}
}
mpReader := multipart.NewReader(c.conn1, "--device-stream-boundary--")
tsReader := mpegts.NewReader()
reader := mpegts.NewReader()
probe := streamer.NewProbe(c.medias == nil)
for probe == nil || probe.Active() {
p, err := c.reader.NextRawPart()
for {
p, err := mpReader.NextRawPart()
if err != nil {
return err
}
ct := p.Header.Get("Content-Type")
if ct != "video/mp2t" {
if ct := p.Header.Get("Content-Type"); ct != "video/mp2t" {
continue
}
cl := p.Header.Get("Content-Length")
size, err := strconv.Atoi(cl)
if err != nil {
return err
@@ -169,37 +168,62 @@ func (c *Client) Handle() error {
}
body = c.decrypt(body)
reader.SetBuffer(body)
tsReader.SetBuffer(body)
for {
pkt := reader.GetPacket()
pkt := tsReader.GetPacket()
if pkt == nil {
break
}
track := c.tracks[pkt.PayloadType]
if track == nil {
// count track on probe state even if not support it
probe.Append(pkt.PayloadType)
media := mpegts.GetMedia(pkt)
if media == nil {
continue // unsupported codec
}
track = streamer.NewTrack2(media, nil)
c.medias = append(c.medias, media)
c.tracks[pkt.PayloadType] = track
if track := c.tracks[pkt.PayloadType]; track != nil {
_ = track.WriteRTP(pkt)
}
_ = track.WriteRTP(pkt)
}
}
return nil
}
func (c *Client) Close() error {
return c.conn.Close()
func (c *Client) Close() (err error) {
if c.conn1 != nil {
err = c.conn1.Close()
}
if c.conn2 != nil {
_ = c.conn2.Close()
}
return
}
func (c *Client) Request(conn net.Conn, body []byte) (string, error) {
// TODO: fixme (size)
buf := bytes.NewBuffer(nil)
buf.WriteString("----client-stream-boundary--\r\n")
buf.WriteString("Content-Type: application/json\r\n")
buf.WriteString("Content-Length: " + strconv.Itoa(len(body)) + "\r\n\r\n")
buf.Write(body)
buf.WriteString("\r\n")
if _, err := buf.WriteTo(conn); err != nil {
return "", err
}
mpReader := multipart.NewReader(conn, "--device-stream-boundary--")
for {
p, err := mpReader.NextRawPart()
if err != nil {
return "", err
}
var v struct {
Params struct {
SessionID string `json:"session_id"`
} `json:"params"`
}
if err = json.NewDecoder(p).Decode(&v); err != nil {
return "", err
}
return v.Params.SessionID, nil
}
}
+67 -3
View File
@@ -1,18 +1,82 @@
package tapo
import "github.com/AlexxIT/go2rtc/pkg/streamer"
import (
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/AlexxIT/go2rtc/pkg/streamer"
)
func (c *Client) GetMedias() []*streamer.Media {
// producer should have persistent medias
if c.medias == nil {
// don't know if all Tapo has this capabilities...
c.medias = []*streamer.Media{
{
Kind: streamer.KindVideo,
Direction: streamer.DirectionSendonly,
Codecs: []*streamer.Codec{
{Name: streamer.CodecH264, ClockRate: 90000, PayloadType: streamer.PayloadTypeRAW},
},
},
{
Kind: streamer.KindAudio,
Direction: streamer.DirectionSendonly,
Codecs: []*streamer.Codec{
{Name: streamer.CodecPCMA, ClockRate: 8000, PayloadType: 8},
},
},
{
Kind: streamer.KindAudio,
Direction: streamer.DirectionRecvonly,
Codecs: []*streamer.Codec{
{Name: streamer.CodecPCMA, ClockRate: 8000, PayloadType: 8},
},
},
}
}
return c.medias
}
func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track {
func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) (track *streamer.Track) {
for _, track := range c.tracks {
if track.Codec == codec {
return track
}
}
return nil
if c.tracks == nil {
c.tracks = map[byte]*streamer.Track{}
}
if media.Kind == streamer.KindVideo {
if err := c.SetupStream(); err != nil {
return nil
}
track = streamer.NewTrack2(media, codec)
c.tracks[mpegts.StreamTypeH264] = track
} else {
if media.Direction == streamer.DirectionSendonly {
if err := c.SetupStream(); err != nil {
return nil
}
track = streamer.NewTrack2(media, codec)
c.tracks[mpegts.StreamTypePCMATapo] = track
} else {
if err := c.SetupBackchannel(); err != nil {
return nil
}
if w := c.backchannelWriter(); w != nil {
track = streamer.NewTrack2(media, codec)
track.Bind(w)
c.tracks[0] = track
}
}
}
return
}
func (c *Client) Start() error {