This commit is contained in:
seydx
2025-05-21 13:16:49 +02:00
parent adb1b21e81
commit c90fcd1ce1
6 changed files with 702 additions and 632 deletions
+112 -293
View File
@@ -6,103 +6,24 @@ import (
"fmt"
"net/url"
"strconv"
"sync"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/webrtc"
"github.com/google/uuid"
"github.com/gorilla/websocket"
pion "github.com/pion/webrtc/v4"
)
type Client struct {
api *RingRestClient
ws *websocket.Conn
api *RingApi
wsClient *WSClient
prod core.Producer
cameraID int
dialogID string
sessionID string
wsMutex sync.Mutex
done chan struct{}
connected core.Waiter
closed bool
}
type SessionBody struct {
DoorbotID int `json:"doorbot_id"`
SessionID string `json:"session_id"`
}
type AnswerMessage struct {
Method string `json:"method"` // "sdp"
Body struct {
SessionBody
SDP string `json:"sdp"`
Type string `json:"type"` // "answer"
} `json:"body"`
}
type IceCandidateMessage struct {
Method string `json:"method"` // "ice"
Body struct {
SessionBody
Ice string `json:"ice"`
MLineIndex int `json:"mlineindex"`
} `json:"body"`
}
type SessionMessage struct {
Method string `json:"method"` // "session_created" or "session_started"
Body SessionBody `json:"body"`
}
type PongMessage struct {
Method string `json:"method"` // "pong"
Body SessionBody `json:"body"`
}
type NotificationMessage struct {
Method string `json:"method"` // "notification"
Body struct {
SessionBody
IsOK bool `json:"is_ok"`
Text string `json:"text"`
} `json:"body"`
}
type StreamInfoMessage struct {
Method string `json:"method"` // "stream_info"
Body struct {
SessionBody
Transcoding bool `json:"transcoding"`
TranscodingReason string `json:"transcoding_reason"`
} `json:"body"`
}
type CloseMessage struct {
Method string `json:"method"` // "close"
Body struct {
SessionBody
Reason struct {
Code int `json:"code"`
Text string `json:"text"`
} `json:"reason"`
} `json:"body"`
}
type BaseMessage struct {
Method string `json:"method"`
Body map[string]any `json:"body"`
}
// Close reason codes
const (
CloseReasonNormalClose = 0
CloseReasonAuthenticationFailed = 5
CloseReasonTimeout = 6
)
func Dial(rawURL string) (*Client, error) {
// 1. Parse URL and validate basic params
u, err := url.Parse(rawURL)
if err != nil {
return nil, err
@@ -114,55 +35,38 @@ func Dial(rawURL string) (*Client, error) {
deviceID := query.Get("device_id")
_, isSnapshot := query["snapshot"]
if encodedToken == "" || deviceID == "" {
if encodedToken == "" || deviceID == "" || cameraID == "" {
return nil, errors.New("ring: wrong query")
}
camID, err := strconv.Atoi(cameraID)
client := &Client{
dialogID: uuid.NewString(),
}
client.cameraID, err = strconv.Atoi(cameraID)
if err != nil {
return nil, fmt.Errorf("ring: invalid camera_id: %w", err)
}
// URL-decode the refresh token
refreshToken, err := url.QueryUnescape(encodedToken)
if err != nil {
return nil, fmt.Errorf("ring: invalid refresh token encoding: %w", err)
}
// Initialize Ring API client
ringAPI, err := NewRingRestClient(RefreshTokenAuth{RefreshToken: refreshToken}, nil)
client.api, err = NewRestClient(RefreshTokenAuth{RefreshToken: refreshToken}, nil)
if err != nil {
return nil, err
}
// Create base client
client := &Client{
api: ringAPI,
cameraID: camID,
dialogID: uuid.NewString(),
done: make(chan struct{}),
}
// Check if snapshot request
// Snapshot Flow
if isSnapshot {
client.prod = NewSnapshotProducer(ringAPI, cameraID)
client.prod = NewSnapshotProducer(client.api, client.cameraID)
return client, nil
}
// If not snapshot, continue with WebRTC setup
ticket, err := ringAPI.GetSocketTicket()
if err != nil {
return nil, err
}
// Create WebSocket connection
wsURL := fmt.Sprintf("wss://api.prod.signalling.ring.devices.a2z.com/ws?api_version=4.0&auth_type=ring_solutions&client_id=ring_site-%s&token=%s",
uuid.NewString(), url.QueryEscape(ticket.Ticket))
client.ws, _, err = websocket.DefaultDialer.Dial(wsURL, map[string][]string{
"User-Agent": {"android:com.ringapp"},
})
client.wsClient, err = StartWebsocket(client.cameraID, client.api)
if err != nil {
client.Stop()
return nil, err
}
@@ -186,13 +90,13 @@ func Dial(rawURL string) (*Client, error) {
api, err := webrtc.NewAPI()
if err != nil {
client.ws.Close()
client.Stop()
return nil, err
}
pc, err := api.NewPeerConnection(conf)
if err != nil {
client.ws.Close()
client.Stop()
return nil, err
}
@@ -202,16 +106,27 @@ func Dial(rawURL string) (*Client, error) {
// protect from blocking on errors
defer sendOffer.Done(nil)
// waiter will wait PC error or WS error or nil (connection OK)
var connState core.Waiter
prod := webrtc.NewConn(pc)
prod.FormatName = "ring/webrtc"
prod.Mode = core.ModeActiveProducer
prod.Protocol = "ws"
prod.URL = rawURL
client.prod = prod
client.wsClient.onMessage = func(msg WSMessage) {
client.onWSMessage(msg)
}
client.wsClient.onError = func(err error) {
// fmt.Printf("ring: error: %s\n", err.Error())
client.Stop()
client.connected.Done(err)
}
client.wsClient.onClose = func() {
// fmt.Println("ring: disconnect")
client.Stop()
client.connected.Done(errors.New("ring: disconnect"))
}
prod.Listen(func(msg any) {
switch msg := msg.(type) {
@@ -230,8 +145,8 @@ func Dial(rawURL string) (*Client, error) {
"mlineindex": iceCandidate.SDPMLineIndex,
}
if err = client.sendSessionMessage("ice", icePayload); err != nil {
connState.Done(err)
if err = client.wsClient.sendSessionMessage("ice", icePayload); err != nil {
client.connected.Done(err)
return
}
@@ -242,13 +157,16 @@ func Dial(rawURL string) (*Client, error) {
case pion.PeerConnectionStateConnecting:
break
case pion.PeerConnectionStateConnected:
connState.Done(nil)
client.connected.Done(nil)
default:
connState.Done(errors.New("ring: " + msg.String()))
client.Stop()
client.connected.Done(errors.New("ring: " + msg.String()))
}
}
})
client.prod = prod
// Setup media configuration
medias := []*core.Media{
{
@@ -290,188 +208,103 @@ func Dial(rawURL string) (*Client, error) {
"sdp": offer,
}
if err = client.sendSessionMessage("live_view", offerPayload); err != nil {
if err = client.wsClient.sendSessionMessage("live_view", offerPayload); err != nil {
client.Stop()
return nil, err
}
sendOffer.Done(nil)
// Ring expects a ping message every 5 seconds
go client.startPingLoop(pc)
go client.startMessageLoop(&connState)
if err = connState.Wait(); err != nil {
if err = client.connected.Wait(); err != nil {
return nil, err
}
return client, nil
}
func (c *Client) startPingLoop(pc *pion.PeerConnection) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
func (c *Client) onWSMessage(msg WSMessage) {
rawMsg, _ := json.Marshal(msg)
for {
select {
case <-c.done:
return
case <-ticker.C:
if pc.ConnectionState() == pion.PeerConnectionStateConnected {
if err := c.sendSessionMessage("ping", nil); err != nil {
return
}
}
// fmt.Printf("ring: onWSMessage: %s\n", string(rawMsg))
// check if "doorbot_id" is present
if _, ok := msg.Body["doorbot_id"]; !ok {
return
}
// check if the message is from the correct doorbot
doorbotID := msg.Body["doorbot_id"].(float64)
if int(doorbotID) != c.cameraID {
return
}
if msg.Method == "session_created" || msg.Method == "session_started" {
if _, ok := msg.Body["session_id"]; ok && c.wsClient.sessionID == "" {
c.wsClient.sessionID = msg.Body["session_id"].(string)
}
}
}
func (c *Client) startMessageLoop(connState *core.Waiter) {
var err error
// will be closed when conn will be closed
defer func() {
connState.Done(err)
}()
for {
select {
case <-c.done:
// check if the message is from the correct session
if _, ok := msg.Body["session_id"]; ok {
if msg.Body["session_id"].(string) != c.wsClient.sessionID {
return
default:
var res BaseMessage
if err = c.ws.ReadJSON(&res); err != nil {
select {
case <-c.done:
return
default:
}
}
}
switch msg.Method {
case "sdp":
if prod, ok := c.prod.(*webrtc.Conn); ok {
// Get answer
var msg AnswerMessage
if err := json.Unmarshal(rawMsg, &msg); err != nil {
c.Stop()
c.connected.Done(err)
return
}
// check if "doorbot_id" is present
if _, ok := res.Body["doorbot_id"]; !ok {
continue
}
// check if the message is from the correct doorbot
doorbotID := res.Body["doorbot_id"].(float64)
if int(doorbotID) != c.cameraID {
continue
}
// check if the message is from the correct session
if res.Method == "session_created" || res.Method == "session_started" {
if _, ok := res.Body["session_id"]; ok && c.sessionID == "" {
c.sessionID = res.Body["session_id"].(string)
}
}
if _, ok := res.Body["session_id"]; ok {
if res.Body["session_id"].(string) != c.sessionID {
continue
}
}
rawMsg, _ := json.Marshal(res)
switch res.Method {
case "sdp":
if prod, ok := c.prod.(*webrtc.Conn); ok {
// Get answer
var msg AnswerMessage
if err = json.Unmarshal(rawMsg, &msg); err != nil {
c.Stop()
return
}
if err = prod.SetAnswer(msg.Body.SDP); err != nil {
c.Stop()
return
}
if err = c.activateSession(); err != nil {
c.Stop()
return
}
prod.SDP = msg.Body.SDP
}
case "ice":
if prod, ok := c.prod.(*webrtc.Conn); ok {
// Continue to receiving candidates
var msg IceCandidateMessage
if err = json.Unmarshal(rawMsg, &msg); err != nil {
break
}
// check for empty ICE candidate
if msg.Body.Ice == "" {
break
}
if err = prod.AddCandidate(msg.Body.Ice); err != nil {
c.Stop()
return
}
}
case "close":
if err := prod.SetAnswer(msg.Body.SDP); err != nil {
c.Stop()
c.connected.Done(err)
return
}
case "pong":
// Ignore
continue
if err := c.wsClient.activateSession(); err != nil {
c.Stop()
c.connected.Done(err)
return
}
prod.SDP = msg.Body.SDP
}
case "ice":
if prod, ok := c.prod.(*webrtc.Conn); ok {
var msg IceCandidateMessage
if err := json.Unmarshal(rawMsg, &msg); err != nil {
break
}
// Skip empty candidates
if msg.Body.Ice == "" {
break
}
if err := prod.AddCandidate(msg.Body.Ice); err != nil {
c.Stop()
c.connected.Done(err)
return
}
}
case "close":
c.Stop()
c.connected.Done(errors.New("ring: close"))
case "pong":
// Ignore
}
}
func (c *Client) activateSession() error {
if err := c.sendSessionMessage("activate_session", nil); err != nil {
return err
}
streamPayload := map[string]interface{}{
"audio_enabled": true,
"video_enabled": true,
}
if err := c.sendSessionMessage("stream_options", streamPayload); err != nil {
return err
}
return nil
}
func (c *Client) sendSessionMessage(method string, body map[string]interface{}) error {
c.wsMutex.Lock()
defer c.wsMutex.Unlock()
if body == nil {
body = make(map[string]interface{})
}
body["doorbot_id"] = c.cameraID
if c.sessionID != "" {
body["session_id"] = c.sessionID
}
msg := map[string]interface{}{
"method": method,
"dialog_id": c.dialogID,
"body": body,
}
if err := c.ws.WriteJSON(msg); err != nil {
return err
}
return nil
}
func (c *Client) GetMedias() []*core.Media {
return c.prod.GetMedias()
}
@@ -487,7 +320,7 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece
speakerPayload := map[string]interface{}{
"stealth_mode": false,
}
_ = c.sendSessionMessage("camera_options", speakerPayload)
_ = c.wsClient.sendSessionMessage("camera_options", speakerPayload)
}
return webrtcProd.AddTrack(media, codec, track)
}
@@ -500,37 +333,23 @@ func (c *Client) Start() error {
}
func (c *Client) Stop() error {
select {
case <-c.done:
if c.closed {
return nil
default:
close(c.done)
}
c.closed = true
if c.prod != nil {
_ = c.prod.Stop()
}
if c.ws != nil {
closePayload := map[string]interface{}{
"reason": map[string]interface{}{
"code": CloseReasonNormalClose,
"text": "",
},
}
_ = c.sendSessionMessage("close", closePayload)
_ = c.ws.Close()
c.ws = nil
if c.wsClient != nil {
_ = c.wsClient.Close()
}
return nil
}
func (c *Client) MarshalJSON() ([]byte, error) {
if webrtcProd, ok := c.prod.(*webrtc.Conn); ok {
return webrtcProd.MarshalJSON()
}
return json.Marshal(c.prod)
}