Merge branch 'AlexxIT:master' into onvif-client

This commit is contained in:
seydx
2025-04-26 00:24:59 +02:00
committed by GitHub
30 changed files with 1781 additions and 166 deletions
+33
View File
@@ -249,3 +249,36 @@ func DecodeH264(fmtp string) (profile string, level byte) {
}
return
}
func ParseCodecString(s string) *Codec {
var codec Codec
ss := strings.Split(s, "/")
switch strings.ToLower(ss[0]) {
case "pcm_s16be", "s16be", "pcm":
codec.Name = CodecPCM
case "pcm_s16le", "s16le", "pcml":
codec.Name = CodecPCML
case "pcm_alaw", "alaw", "pcma":
codec.Name = CodecPCMA
case "pcm_mulaw", "mulaw", "pcmu":
codec.Name = CodecPCMU
case "aac", "mpeg4-generic":
codec.Name = CodecAAC
case "opus":
codec.Name = CodecOpus
case "flac":
codec.Name = CodecFLAC
default:
return nil
}
if len(ss) >= 2 {
codec.ClockRate = uint32(Atoi(ss[1]))
}
if len(ss) >= 3 {
codec.Channels = uint8(Atoi(ss[1]))
}
return &codec
}
+22 -6
View File
@@ -6,17 +6,24 @@ import (
"io"
"net/http"
"regexp"
"strings"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
)
func newRequest(method, url string, headers map[string]any) (*http.Request, error) {
func newRequest(method, url string, headers map[string]any, body string) (*http.Request, error) {
var rd io.Reader
if method == "" {
method = "GET"
}
if body != "" {
rd = strings.NewReader(body)
}
req, err := http.NewRequest(method, url, nil)
req, err := http.NewRequest(method, url, rd)
if err != nil {
return nil, err
}
@@ -55,7 +62,8 @@ var Options = []expr.Option{
options := params[1].(map[string]any)
method, _ := options["method"].(string)
headers, _ := options["headers"].(map[string]any)
req, err = newRequest(method, url, headers)
body, _ := options["body"].(string)
req, err = newRequest(method, url, headers, body)
} else {
req, err = http.NewRequest("GET", url, nil)
}
@@ -105,11 +113,19 @@ var Options = []expr.Option{
),
}
func Run(input string) (any, error) {
program, err := expr.Compile(input, Options...)
func Compile(input string) (*vm.Program, error) {
return expr.Compile(input, Options...)
}
func Eval(input string, env any) (any, error) {
program, err := Compile(input)
if err != nil {
return nil, err
}
return expr.Run(program, nil)
return expr.Run(program, env)
}
func Run(program *vm.Program, env any) (any, error) {
return vm.Run(program, env)
}
+2 -2
View File
@@ -7,11 +7,11 @@ import (
)
func TestMatchHost(t *testing.T) {
v, err := Run(`
v, err := Eval(`
let url = "rtsp://user:pass@192.168.1.123/cam/realmonitor?...";
let host = match(url, "//[^/]+")[0][2:];
host
`)
`, nil)
require.Nil(t, err)
require.Equal(t, "user:pass@192.168.1.123", v)
}
+69
View File
@@ -0,0 +1,69 @@
package pcm
import (
"errors"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/shell"
"github.com/pion/rtp"
)
type Backchannel struct {
core.Connection
cmd *shell.Command
}
func NewBackchannel(cmd *shell.Command, audio string) (core.Producer, error) {
var codec *core.Codec
if audio == "" {
// default codec
codec = &core.Codec{Name: core.CodecPCML, ClockRate: 16000}
} else if codec = core.ParseCodecString(audio); codec == nil {
return nil, errors.New("pcm: unsupported audio format: " + audio)
}
medias := []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{codec},
},
}
return &Backchannel{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "pcm",
Protocol: "pipe",
Medias: medias,
Transport: cmd,
},
cmd: cmd,
}, nil
}
func (c *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
return nil, core.ErrCantGetTrack
}
func (c *Backchannel) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
wr, err := c.cmd.StdinPipe()
if err != nil {
return err
}
sender := core.NewSender(media, track.Codec)
sender.Handler = func(packet *rtp.Packet) {
if n, err := wr.Write(packet.Payload); err != nil {
c.Send += n
}
}
sender.HandleRTP(track)
c.Senders = append(c.Senders, sender)
return nil
}
func (c *Backchannel) Start() error {
return c.cmd.Run()
}
+19 -9
View File
@@ -2,6 +2,7 @@ package pcm
import (
"sync"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
@@ -82,18 +83,27 @@ func TranscodeHandler(dst, src *core.Codec, handler core.HandlerFunc) core.Handl
}
}
func BytesPerFrame(codec *core.Codec) byte {
channels := byte(codec.Channels)
if channels == 0 {
channels = 1
}
func BytesPerSample(codec *core.Codec) int {
switch codec.Name {
case core.CodecPCML, core.CodecPCM:
return 2 * channels
return 2
case core.CodecPCMU, core.CodecPCMA:
return channels
return 1
}
return 0
}
func BytesPerFrame(codec *core.Codec) int {
if codec.Channels <= 1 {
return BytesPerSample(codec)
}
return int(codec.Channels) * BytesPerSample(codec)
}
func FramesPerDuration(codec *core.Codec, duration time.Duration) int {
return int(time.Duration(codec.ClockRate) * duration / time.Second)
}
func BytesPerDuration(codec *core.Codec, duration time.Duration) int {
return BytesPerFrame(codec) * FramesPerDuration(codec, duration)
}
+36 -3
View File
@@ -1,13 +1,25 @@
package pcm
import "github.com/AlexxIT/go2rtc/pkg/core"
import (
"math"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func ceil(x float32) int {
d, fract := math.Modf(float64(x))
if fract == 0.0 {
return int(d)
}
return int(d) + 1
}
func Downsample(k float32) func([]int16) []int16 {
var sampleN, sampleSum float32
return func(src []int16) (dst []int16) {
var i int
dst = make([]int16, int((float32(len(src))+sampleN)/k))
dst = make([]int16, ceil((float32(len(src))+sampleN)/k))
for _, sample := range src {
sampleSum += float32(sample)
sampleN++
@@ -28,7 +40,7 @@ func Upsample(k float32) func([]int16) []int16 {
return func(src []int16) (dst []int16) {
var i int
dst = make([]int16, int(k*float32(len(src))))
dst = make([]int16, ceil(k*float32(len(src))))
for _, sample := range src {
sampleN += k
for sampleN > 0 {
@@ -185,3 +197,24 @@ func Transcode(dst, src *core.Codec) func([]byte) []byte {
return writer(samples)
}
}
func ConsumerCodecs() []*core.Codec {
return []*core.Codec{
{Name: core.CodecPCML},
{Name: core.CodecPCM},
{Name: core.CodecPCMA},
{Name: core.CodecPCMU},
}
}
func ProducerCodecs() []*core.Codec {
return []*core.Codec{
{Name: core.CodecPCML, ClockRate: 16000},
{Name: core.CodecPCM, ClockRate: 16000},
{Name: core.CodecPCML, ClockRate: 8000},
{Name: core.CodecPCM, ClockRate: 8000},
{Name: core.CodecPCMA, ClockRate: 8000},
{Name: core.CodecPCMU, ClockRate: 8000},
{Name: core.CodecPCML, ClockRate: 22050}, // wyoming-snd-external
}
}
+96
View File
@@ -0,0 +1,96 @@
package pcm
import (
"io"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
type ProducerSync struct {
core.Connection
src *core.Codec
rd io.Reader
onClose func()
}
func OpenSync(codec *core.Codec, rd io.Reader) *ProducerSync {
medias := []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: ProducerCodecs(),
},
}
return &ProducerSync{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "pcm",
Medias: medias,
Transport: rd,
},
src: codec,
rd: rd,
}
}
func (p *ProducerSync) OnClose(f func()) {
p.onClose = f
}
func (p *ProducerSync) Start() error {
if len(p.Receivers) == 0 {
return nil
}
var pktSeq uint16
var pktTS uint32 // time in frames
var pktTime time.Duration // time in seconds
t0 := time.Now()
dst := p.Receivers[0].Codec
transcode := Transcode(dst, p.src)
const chunkDuration = 20 * time.Millisecond
chunkBytes := BytesPerDuration(p.src, chunkDuration)
chunkFrames := uint32(FramesPerDuration(dst, chunkDuration))
for {
buf := make([]byte, chunkBytes)
n, _ := io.ReadFull(p.rd, buf)
if n == 0 {
break
}
pkt := &core.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
SequenceNumber: pktSeq,
Timestamp: pktTS,
},
Payload: transcode(buf[:n]),
}
if d := pktTime - time.Since(t0); d > 0 {
time.Sleep(d)
}
p.Receivers[0].WriteRTP(pkt)
p.Recv += n
pktSeq++
pktTS += chunkFrames
pktTime += chunkDuration
}
if p.onClose != nil {
p.onClose()
}
return nil
}
+42
View File
@@ -0,0 +1,42 @@
package s16le
func PeaksRMS(b []byte) int16 {
// RMS of sine wave = peak / sqrt2
// https://en.wikipedia.org/wiki/Root_mean_square
// https://www.youtube.com/watch?v=MUDkL4KZi0I
var peaks int32
var peaksSum int32
var prevSample int16
var prevUp bool
var i int
for n := len(b); i < n; {
lo := b[i]
i++
hi := b[i]
i++
sample := int16(hi)<<8 | int16(lo)
up := sample >= prevSample
if i >= 4 {
if up != prevUp {
if prevSample >= 0 {
peaksSum += int32(prevSample)
} else {
peaksSum -= int32(prevSample)
}
peaks++
}
}
prevSample = sample
prevUp = up
}
if peaks == 0 {
return 0
}
return int16(peaksSum / peaks)
}
-59
View File
@@ -1,59 +0,0 @@
package stdin
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
func (c *Client) GetMedias() []*core.Media {
return c.medias
}
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
return nil, core.ErrCantGetTrack
}
func (c *Client) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error {
if c.sender == nil {
stdin, err := c.cmd.StdinPipe()
if err != nil {
return err
}
c.sender = core.NewSender(media, track.Codec)
c.sender.Handler = func(packet *rtp.Packet) {
_, _ = stdin.Write(packet.Payload)
c.send += len(packet.Payload)
}
}
c.sender.HandleRTP(track)
return nil
}
func (c *Client) Start() (err error) {
return c.cmd.Run()
}
func (c *Client) Stop() (err error) {
if c.sender != nil {
c.sender.Close()
}
return c.cmd.Close()
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Connection{
ID: core.ID(c),
FormatName: "exec",
Protocol: "pipe",
Medias: c.medias,
Send: c.send,
}
if c.sender != nil {
info.Senders = []*core.Sender{c.sender}
}
return json.Marshal(info)
}
-33
View File
@@ -1,33 +0,0 @@
package stdin
import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/shell"
)
// Deprecated: should be rewritten to core.Connection
type Client struct {
cmd *shell.Command
medias []*core.Media
sender *core.Sender
send int
}
func NewClient(cmd *shell.Command) (*Client, error) {
c := &Client{
cmd: cmd,
medias: []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecPCMA, ClockRate: 8000},
{Name: core.CodecPCM},
},
},
},
}
return c, nil
}
+67
View File
@@ -0,0 +1,67 @@
package wav
import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/shell"
"github.com/pion/rtp"
)
type Backchannel struct {
core.Connection
cmd *shell.Command
}
func NewBackchannel(cmd *shell.Command) (core.Producer, error) {
medias := []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
//{Name: core.CodecPCML},
{Name: core.CodecPCMA},
{Name: core.CodecPCMU},
},
},
}
return &Backchannel{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "wav",
Protocol: "pipe",
Medias: medias,
Transport: cmd,
},
cmd: cmd,
}, nil
}
func (c *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
return nil, core.ErrCantGetTrack
}
func (c *Backchannel) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
wr, err := c.cmd.StdinPipe()
if err != nil {
return err
}
b := Header(track.Codec)
if _, err = wr.Write(b); err != nil {
return err
}
sender := core.NewSender(media, track.Codec)
sender.Handler = func(packet *rtp.Packet) {
if n, err := wr.Write(packet.Payload); err != nil {
c.Send += n
}
}
sender.HandleRTP(track)
c.Senders = append(c.Senders, sender)
return nil
}
func (c *Backchannel) Start() error {
return c.cmd.Run()
}
+2 -46
View File
@@ -2,7 +2,6 @@ package wav
import (
"bufio"
"encoding/binary"
"errors"
"io"
@@ -17,39 +16,11 @@ func Open(r io.Reader) (*Producer, error) {
// https://www.mmsp.ece.mcgill.ca/Documents/AudioFormats/WAVE/WAVE.html
rd := bufio.NewReaderSize(r, core.BufferSize)
// skip Master RIFF chunk
if _, err := rd.Discard(12); err != nil {
codec, err := ReadHeader(r)
if err != nil {
return nil, err
}
codec := &core.Codec{}
for {
chunkID, data, err := readChunk(rd)
if err != nil {
return nil, err
}
if chunkID == "data" {
break
}
if chunkID == "fmt " {
// https://audiocoding.cc/articles/2008-05-22-wav-file-structure/wav_formats.txt
switch data[0] {
case 1:
codec.Name = core.CodecPCML
case 6:
codec.Name = core.CodecPCMA
case 7:
codec.Name = core.CodecPCMU
}
codec.Channels = data[2]
codec.ClockRate = binary.LittleEndian.Uint32(data[4:])
}
}
if codec.Name == "" {
return nil, errors.New("waw: unsupported codec")
}
@@ -110,18 +81,3 @@ func (c *Producer) Start() error {
ts += PacketSize
}
}
func readChunk(r io.Reader) (chunkID string, data []byte, err error) {
b := make([]byte, 8)
if _, err = io.ReadFull(r, b); err != nil {
return
}
if chunkID = string(b[:4]); chunkID != "data" {
size := binary.LittleEndian.Uint32(b[4:])
data = make([]byte, size)
_, err = io.ReadFull(r, data)
}
return
}
+103
View File
@@ -0,0 +1,103 @@
package wav
import (
"encoding/binary"
"io"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func Header(codec *core.Codec) []byte {
var fmt, size, extra byte
switch codec.Name {
case core.CodecPCML:
fmt = 1
size = 2
case core.CodecPCMA:
fmt = 6
size = 1
extra = 2
case core.CodecPCMU:
fmt = 7
size = 1
extra = 2
default:
return nil
}
channels := byte(codec.Channels)
if channels == 0 {
channels = 1
}
b := make([]byte, 0, 46) // cap with extra
b = append(b, "RIFF\xFF\xFF\xFF\xFFWAVEfmt "...)
b = append(b, 0x10+extra, 0, 0, 0)
b = append(b, fmt, 0)
b = append(b, channels, 0)
b = binary.LittleEndian.AppendUint32(b, codec.ClockRate)
b = binary.LittleEndian.AppendUint32(b, uint32(size*channels)*codec.ClockRate)
b = append(b, size*channels, 0)
b = append(b, size*8, 0)
if extra > 0 {
b = append(b, 0, 0) // ExtraParamSize (if PCM, then doesn't exist)
}
b = append(b, "data\xFF\xFF\xFF\xFF"...)
return b
}
func ReadHeader(r io.Reader) (*core.Codec, error) {
// skip Master RIFF chunk
if _, err := io.ReadFull(r, make([]byte, 12)); err != nil {
return nil, err
}
var codec core.Codec
for {
chunkID, data, err := readChunk(r)
if err != nil {
return nil, err
}
if chunkID == "data" {
break
}
if chunkID == "fmt " {
// https://audiocoding.cc/articles/2008-05-22-wav-file-structure/wav_formats.txt
switch data[0] {
case 1:
codec.Name = core.CodecPCML
case 6:
codec.Name = core.CodecPCMA
case 7:
codec.Name = core.CodecPCMU
}
codec.Channels = data[2]
codec.ClockRate = binary.LittleEndian.Uint32(data[4:])
}
}
return &codec, nil
}
func readChunk(r io.Reader) (chunkID string, data []byte, err error) {
b := make([]byte, 8)
if _, err = io.ReadFull(r, b); err != nil {
return
}
if chunkID = string(b[:4]); chunkID != "data" {
size := binary.LittleEndian.Uint32(b[4:])
data = make([]byte, size)
_, err = io.ReadFull(r, data)
}
return
}
+14
View File
@@ -0,0 +1,14 @@
## Default wake words
- alexa_v0.1
- hey_jarvis_v0.1
- hey_mycroft_v0.1
- hey_rhasspy_v0.1
- ok_nabu_v0.1
## Useful Links
- https://github.com/rhasspy/wyoming-satellite
- https://github.com/rhasspy/wyoming-openwakeword
- https://github.com/fwartner/home-assistant-wakewords-collection
- https://github.com/esphome/micro-wake-word-models/tree/main?tab=readme-ov-file
+99
View File
@@ -0,0 +1,99 @@
package wyoming
import (
"bufio"
"encoding/json"
"io"
"net"
"github.com/AlexxIT/go2rtc/pkg/core"
)
type API struct {
conn net.Conn
rd *bufio.Reader
}
func DialAPI(address string) (*API, error) {
conn, err := net.DialTimeout("tcp", address, core.ConnDialTimeout)
if err != nil {
return nil, err
}
return NewAPI(conn), nil
}
const Version = "1.5.4"
func NewAPI(conn net.Conn) *API {
return &API{conn: conn, rd: bufio.NewReader(conn)}
}
func (w *API) WriteEvent(evt *Event) (err error) {
hdr := EventHeader{
Type: evt.Type,
Version: Version,
DataLength: len(evt.Data),
PayloadLength: len(evt.Payload),
}
buf, err := json.Marshal(hdr)
if err != nil {
return err
}
buf = append(buf, '\n')
buf = append(buf, evt.Data...)
buf = append(buf, evt.Payload...)
_, err = w.conn.Write(buf)
return err
}
func (w *API) ReadEvent() (*Event, error) {
data, err := w.rd.ReadBytes('\n')
if err != nil {
return nil, err
}
var hdr EventHeader
if err = json.Unmarshal(data, &hdr); err != nil {
return nil, err
}
evt := Event{Type: hdr.Type}
if hdr.DataLength > 0 {
data = make([]byte, hdr.DataLength)
if _, err = io.ReadFull(w.rd, data); err != nil {
return nil, err
}
evt.Data = string(data)
}
if hdr.PayloadLength > 0 {
evt.Payload = make([]byte, hdr.PayloadLength)
if _, err = io.ReadFull(w.rd, evt.Payload); err != nil {
return nil, err
}
}
return &evt, nil
}
func (w *API) Close() error {
return w.conn.Close()
}
type Event struct {
Type string
Data string
Payload []byte
}
type EventHeader struct {
Type string `json:"type"`
Version string `json:"version"`
DataLength int `json:"data_length,omitempty"`
PayloadLength int `json:"payload_length,omitempty"`
}
+63
View File
@@ -0,0 +1,63 @@
package wyoming
import (
"fmt"
"net"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
type Backchannel struct {
core.Connection
api *API
}
func newBackchannel(conn net.Conn) *Backchannel {
return &Backchannel{
core.Connection{
ID: core.NewID(),
FormatName: "wyoming",
Medias: []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecPCML, ClockRate: 22050},
},
},
},
Transport: conn,
},
NewAPI(conn),
}
}
func (b *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
return nil, core.ErrCantGetTrack
}
func (b *Backchannel) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
sender := core.NewSender(media, codec)
sender.Handler = func(pkt *rtp.Packet) {
ts := time.Now().Nanosecond()
evt := &Event{
Type: "audio-chunk",
Data: fmt.Sprintf(`{"rate":22050,"width":2,"channels":1,"timestamp":%d}`, ts),
Payload: pkt.Payload,
}
_ = b.api.WriteEvent(evt)
}
sender.HandleRTP(track)
b.Senders = append(b.Senders, sender)
return nil
}
func (b *Backchannel) Start() error {
for {
if _, err := b.api.ReadEvent(); err != nil {
return err
}
}
}
+138
View File
@@ -0,0 +1,138 @@
package wyoming
import (
"bytes"
"fmt"
"os"
"time"
"github.com/AlexxIT/go2rtc/pkg/expr"
"github.com/AlexxIT/go2rtc/pkg/wav"
)
type env struct {
*satellite
Type string
Data string
}
func (s *satellite) handleEvent(evt *Event) {
switch evt.Type {
case "describe":
// {"asr": [], "tts": [], "handle": [], "intent": [], "wake": [], "satellite": {"name": "my satellite", "attribution": {"name": "", "url": ""}, "installed": true, "description": "my satellite", "version": "1.4.1", "area": null, "snd_format": null}}
data := fmt.Sprintf(`{"satellite":{"name":%q,"attribution":{"name":"go2rtc","url":"https://github.com/AlexxIT/go2rtc"},"installed":true}}`, s.srv.Name)
s.WriteEvent("info", data)
case "run-satellite":
s.Detect()
case "pause-satellite":
s.Stop()
case "detect": // WAKE_WORD_START {"names": null}
case "detection": // WAKE_WORD_END {"name": "ok_nabu_v0.1", "timestamp": 17580, "speaker": null}
case "transcribe": // STT_START {"language": "en"}
case "voice-started": // STT_VAD_START {"timestamp": 1160}
case "voice-stopped": // STT_VAD_END {"timestamp": 2470}
s.Pause()
case "transcript": // STT_END {"text": "how are you"}
case "synthesize": // TTS_START {"text": "Sorry, I couldn't understand that", "voice": {"language": "en"}}
case "audio-start": // TTS_END {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0}
case "audio-chunk": // {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0}
case "audio-stop": // {"timestamp": 2.880000000000002}
// run async because PlayAudio takes some time
go func() {
s.PlayAudio()
s.WriteEvent("played")
s.Detect()
}()
case "error":
s.Detect()
case "internal-run":
s.WriteEvent("run-pipeline", `{"start_stage":"wake","end_stage":"tts"}`)
s.Stream()
case "internal-detection":
s.WriteEvent("run-pipeline", `{"start_stage":"asr","end_stage":"tts"}`)
s.Stream()
}
}
func (s *satellite) handleScript(evt *Event) {
var script string
if s.srv.Event != nil {
script = s.srv.Event[evt.Type]
}
s.srv.Trace("event=%s data=%s payload size=%d", evt.Type, evt.Data, len(evt.Payload))
if script == "" {
s.handleEvent(evt)
return
}
// run async because script can have sleeps
go func() {
e := &env{satellite: s, Type: evt.Type, Data: evt.Data}
if res, err := expr.Eval(script, e); err != nil {
s.srv.Trace("event=%s expr error=%s", evt.Type, err)
s.handleEvent(evt)
} else {
s.srv.Trace("event=%s expr result=%v", evt.Type, res)
}
}()
}
func (s *satellite) Detect() bool {
return s.setMicState(stateWaitVAD)
}
func (s *satellite) Stream() bool {
return s.setMicState(stateActive)
}
func (s *satellite) Pause() bool {
return s.setMicState(stateIdle)
}
func (s *satellite) Stop() bool {
s.micStop()
return true
}
func (s *satellite) WriteEvent(args ...string) bool {
if len(args) == 0 {
return false
}
evt := &Event{Type: args[0]}
if len(args) > 1 {
evt.Data = args[1]
}
if err := s.api.WriteEvent(evt); err != nil {
return false
}
return true
}
func (s *satellite) PlayAudio() bool {
return s.playAudio(sndCodec, bytes.NewReader(s.sndAudio))
}
func (s *satellite) PlayFile(path string) bool {
f, err := os.Open(path)
if err != nil {
return false
}
codec, err := wav.ReadHeader(f)
if err != nil {
return false
}
return s.playAudio(codec, f)
}
func (e *env) Sleep(s string) bool {
d, err := time.ParseDuration(s)
if err != nil {
return false
}
time.Sleep(d)
return true
}
+35
View File
@@ -0,0 +1,35 @@
package wyoming
import (
"fmt"
"net"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func (s *Server) HandleMic(conn net.Conn) {
defer conn.Close()
var closed core.Waiter
var timestamp int
api := NewAPI(conn)
mic := newMicConsumer(func(chunk []byte) {
data := fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, timestamp)
evt := &Event{Type: "audio-chunk", Data: data, Payload: chunk}
if err := api.WriteEvent(evt); err != nil {
closed.Done(nil)
}
timestamp += len(chunk) / 2
})
mic.RemoteAddr = api.conn.RemoteAddr().String()
if err := s.MicHandler(mic); err != nil {
s.Error("mic error: %s", err)
return
}
_ = closed.Wait()
_ = mic.Stop()
}
+65
View File
@@ -0,0 +1,65 @@
package wyoming
import (
"net"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
type Producer struct {
core.Connection
api *API
}
func newProducer(conn net.Conn) *Producer {
return &Producer{
core.Connection{
ID: core.NewID(),
FormatName: "wyoming",
Medias: []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{Name: core.CodecPCML, ClockRate: 16000},
},
},
},
Transport: conn,
},
NewAPI(conn),
}
}
func (p *Producer) Start() error {
var seq uint16
var ts uint32
for {
evt, err := p.api.ReadEvent()
if err != nil {
return err
}
if evt.Type != "audio-chunk" {
continue
}
p.Recv += len(evt.Payload)
pkt := &core.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
SequenceNumber: seq,
Timestamp: ts,
},
Payload: evt.Payload,
}
p.Receivers[0].WriteRTP(pkt)
seq++
ts += uint32(len(evt.Payload) / 2)
}
}
+275
View File
@@ -0,0 +1,275 @@
package wyoming
import (
"context"
"fmt"
"io"
"net"
"sync"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/pcm"
"github.com/AlexxIT/go2rtc/pkg/pcm/s16le"
"github.com/pion/rtp"
)
type Server struct {
Name string
Event map[string]string
VADThreshold int16
WakeURI string
MicHandler func(cons core.Consumer) error
SndHandler func(prod core.Producer) error
Trace func(format string, v ...any)
Error func(format string, v ...any)
}
func (s *Server) Serve(l net.Listener) error {
for {
conn, err := l.Accept()
if err != nil {
return err
}
go s.Handle(conn)
}
}
func (s *Server) Handle(conn net.Conn) {
api := NewAPI(conn)
sat := newSatellite(api, s)
defer sat.Close()
for {
evt, err := api.ReadEvent()
if err != nil {
return
}
switch evt.Type {
case "ping": // {"text": null}
_ = api.WriteEvent(&Event{Type: "pong", Data: evt.Data})
case "audio-start": // TTS_END {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0}
sat.sndAudio = sat.sndAudio[:0]
case "audio-chunk": // {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0}
sat.sndAudio = append(sat.sndAudio, evt.Payload...)
default:
sat.handleScript(evt)
}
}
}
// states like http.ConnState
const (
stateError = -2
stateClosed = -1
stateNew = 0
stateIdle = 1
stateWaitVAD = 2 // aka wait VAD
stateWaitWakeWord = 3
stateActive = 4
)
type satellite struct {
api *API
srv *Server
micState int8
micTS int
micMu sync.Mutex
sndAudio []byte
mic *micConsumer
wake *WakeWord
}
func newSatellite(api *API, srv *Server) *satellite {
sat := &satellite{api: api, srv: srv}
return sat
}
func (s *satellite) Close() error {
s.Stop()
return s.api.Close()
}
const wakeTimeout = 5 * 2 * 16000 // 5 seconds
func (s *satellite) setMicState(state int8) bool {
s.micMu.Lock()
defer s.micMu.Unlock()
if s.micState == stateNew {
s.mic = newMicConsumer(s.onMicChunk)
s.mic.RemoteAddr = s.api.conn.RemoteAddr().String()
if err := s.srv.MicHandler(s.mic); err != nil {
s.micState = stateError
s.srv.Error("can't get mic: %w", err)
_ = s.api.Close()
} else {
s.micState = stateIdle
}
}
if s.micState < stateIdle {
return false
}
s.micState = state
s.micTS = 0
return true
}
func (s *satellite) micStop() {
s.micMu.Lock()
s.micState = stateClosed
if s.mic != nil {
_ = s.mic.Stop()
s.mic = nil
}
if s.wake != nil {
_ = s.wake.Close()
s.wake = nil
}
s.micMu.Unlock()
}
func (s *satellite) onMicChunk(chunk []byte) {
s.micMu.Lock()
defer s.micMu.Unlock()
if s.micState == stateIdle {
return
}
if s.micState == stateWaitVAD {
// tests show that values over 1000 are most likely speech
if s.srv.VADThreshold == 0 || s16le.PeaksRMS(chunk) > s.srv.VADThreshold {
if s.wake == nil && s.srv.WakeURI != "" {
s.wake, _ = DialWakeWord(s.srv.WakeURI)
}
if s.wake == nil {
// some problems with wake word - redirect to HA
s.micState = stateIdle
go s.handleScript(&Event{Type: "internal-run"})
} else {
s.micState = stateWaitWakeWord
}
s.micTS = 0
}
}
if s.micState == stateWaitWakeWord {
if s.wake.Detection != "" {
// check if wake word detected
s.micState = stateIdle
go s.handleScript(&Event{Type: "internal-detection", Data: `{"name":"` + s.wake.Detection + `"}`})
} else if err := s.wake.WriteChunk(chunk); err != nil {
// wake word service failed
s.micState = stateWaitVAD
_ = s.wake.Close()
s.wake = nil
} else if s.micTS > wakeTimeout {
// wake word detection timeout
s.micState = stateWaitVAD
}
} else if s.wake != nil {
_ = s.wake.Close()
s.wake = nil
}
if s.micState == stateActive {
data := fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, s.micTS)
evt := &Event{Type: "audio-chunk", Data: data, Payload: chunk}
_ = s.api.WriteEvent(evt)
}
s.micTS += len(chunk) / 2
}
func (s *satellite) playAudio(codec *core.Codec, rd io.Reader) bool {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
prod := pcm.OpenSync(codec, rd)
prod.OnClose(cancel)
if err := s.srv.SndHandler(prod); err != nil {
return false
} else {
<-ctx.Done()
return true
}
}
type micConsumer struct {
core.Connection
onData func(chunk []byte)
onClose func()
}
func newMicConsumer(onData func(chunk []byte)) *micConsumer {
medias := []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
Codecs: pcm.ConsumerCodecs(),
},
}
return &micConsumer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "wyoming",
Protocol: "tcp",
Medias: medias,
},
onData: onData,
}
}
func (c *micConsumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
src := track.Codec
dst := &core.Codec{
Name: core.CodecPCML,
ClockRate: 16000,
Channels: 1,
}
sender := core.NewSender(media, dst)
sender.Handler = pcm.TranscodeHandler(dst, src,
repack(func(packet *core.Packet) {
c.onData(packet.Payload)
}),
)
sender.HandleRTP(track)
c.Senders = append(c.Senders, sender)
return nil
}
func (c *micConsumer) Stop() error {
if c.onClose != nil {
c.onClose()
}
return c.Connection.Stop()
}
func repack(handler core.HandlerFunc) core.HandlerFunc {
const PacketSize = 2 * 16000 / 50 // 20ms
var buf []byte
return func(pkt *rtp.Packet) {
buf = append(buf, pkt.Payload...)
for len(buf) >= PacketSize {
pkt = &core.Packet{Payload: buf[:PacketSize]}
buf = buf[PacketSize:]
handler(pkt)
}
}
}
+40
View File
@@ -0,0 +1,40 @@
package wyoming
import (
"bytes"
"net"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/pcm"
)
func (s *Server) HandleSnd(conn net.Conn) {
defer conn.Close()
var snd []byte
api := NewAPI(conn)
for {
evt, err := api.ReadEvent()
if err != nil {
return
}
s.Trace("event: %s data: %s payload: %d", evt.Type, evt.Data, len(evt.Payload))
switch evt.Type {
case "audio-start":
snd = snd[:0]
case "audio-chunk":
snd = append(snd, evt.Payload...)
case "audio-stop":
prod := pcm.OpenSync(sndCodec, bytes.NewReader(snd))
if err = s.SndHandler(prod); err != nil {
s.Error("snd error: %s", err)
return
}
}
}
}
var sndCodec = &core.Codec{Name: core.CodecPCML, ClockRate: 22050}
+120
View File
@@ -0,0 +1,120 @@
package wyoming
import (
"encoding/json"
"fmt"
"net/url"
)
type WakeWord struct {
*API
names []string
send int
Detection string
}
func DialWakeWord(rawURL string) (*WakeWord, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, err
}
api, err := DialAPI(u.Host)
if err != nil {
return nil, err
}
names := u.Query()["name"]
if len(names) == 0 {
names = []string{"ok_nabu_v0.1"}
}
wake := &WakeWord{API: api, names: names}
if err = wake.Start(); err != nil {
_ = wake.Close()
return nil, err
}
go wake.handle()
return wake, nil
}
func (w *WakeWord) handle() {
defer w.Close()
for {
evt, err := w.ReadEvent()
if err != nil {
return
}
if evt.Type == "detection" {
var data struct {
Name string `json:"name"`
}
if err = json.Unmarshal([]byte(evt.Data), &data); err != nil {
return
}
w.Detection = data.Name
}
}
}
//func (w *WakeWord) Describe() error {
// if err := w.WriteEvent(&Event{Type: "describe"}); err != nil {
// return err
// }
//
// evt, err := w.ReadEvent()
// if err != nil {
// return err
// }
//
// var info struct {
// Wake []struct {
// Models []struct {
// Name string `json:"name"`
// } `json:"models"`
// } `json:"wake"`
// }
// if err = json.Unmarshal(evt.Data, &info); err != nil {
// return err
// }
//
// return nil
//}
func (w *WakeWord) Start() error {
msg := struct {
Names []string `json:"names"`
}{
Names: w.names,
}
data, err := json.Marshal(msg)
if err != nil {
return err
}
evt := &Event{Type: "detect", Data: string(data)}
if err := w.WriteEvent(evt); err != nil {
return err
}
evt = &Event{Type: "audio-start", Data: audioData(0)}
return w.WriteEvent(evt)
}
func (w *WakeWord) Close() error {
return w.conn.Close()
}
func (w *WakeWord) WriteChunk(payload []byte) error {
evt := &Event{Type: "audio-chunk", Data: audioData(w.send), Payload: payload}
w.send += len(payload)
return w.WriteEvent(evt)
}
func audioData(send int) string {
// timestamp in ms = send / 2 * 1000 / 16000 = send / 32
return fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, send/32)
}
+26
View File
@@ -0,0 +1,26 @@
package wyoming
import (
"net"
"net/url"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func Dial(rawURL string) (core.Producer, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, err
}
conn, err := net.DialTimeout("tcp", u.Host, core.ConnDialTimeout)
if err != nil {
return nil, err
}
if u.Query().Get("backchannel") != "1" {
return newProducer(conn), nil
} else {
return newBackchannel(conn), nil
}
}