summaryrefslogtreecommitdiff
path: root/cmd/dolmetschctld
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2019-02-13 20:12:31 +0100
committerChristian Pointner <equinox@spreadspace.org>2019-02-13 20:12:31 +0100
commit2154c5b763188e2ed1bedca911635805ff43b90a (patch)
tree7e44222f56ba63293da38a432660e758d235b5db /cmd/dolmetschctld
parentimplemented command to fetch list of languages (diff)
added state change subscription to state machine
Diffstat (limited to 'cmd/dolmetschctld')
-rw-r--r--cmd/dolmetschctld/statemachine.go68
-rw-r--r--cmd/dolmetschctld/web-static/socket.html1
-rw-r--r--cmd/dolmetschctld/web_socket.go49
3 files changed, 110 insertions, 8 deletions
diff --git a/cmd/dolmetschctld/statemachine.go b/cmd/dolmetschctld/statemachine.go
index 99416a5..2d8013f 100644
--- a/cmd/dolmetschctld/statemachine.go
+++ b/cmd/dolmetschctld/statemachine.go
@@ -23,6 +23,7 @@
package main
import (
+ "container/list"
"fmt"
"log"
"time"
@@ -70,6 +71,12 @@ func (s Language) MarshalText() (data []byte, err error) {
return
}
+type FullState struct {
+ state State
+ ratio float32
+ language Language
+}
+
type MixerChannelState struct {
level mixer.FaderLevel
mute mixer.Mute
@@ -109,15 +116,18 @@ type getOriginal2InterpreterRatioReq struct {
}
type getStateRes struct {
- state State
- ratio float32
- language Language
+ FullState
}
type getStateReq struct {
resCh chan getStateRes
}
+type subscribeStateReq struct {
+ ch chan<- FullState
+ resCh chan error
+}
+
type StateMachine struct {
mixer *mixer.Mixer
@@ -127,6 +137,7 @@ type StateMachine struct {
setOriginal2InterpreterRatioCh chan setOriginal2InterpreterRatioReq
getOriginal2InterpreterRatioCh chan getOriginal2InterpreterRatioReq
getStateCh chan getStateReq
+ subscribeStateCh chan subscribeStateReq
quitCh chan bool
exitedCh chan struct{}
mixerEventCh chan mixer.Event
@@ -134,10 +145,38 @@ type StateMachine struct {
languages map[Language]*MixerChannels
channel2lang map[mixer.Channel]Language
+ state State
original2InterpreterRatio float32
+ language Language
- state State
- language Language
+ stateSubscribers list.List
+}
+
+func (sm *StateMachine) publishState() {
+ var next *list.Element
+ for sub := sm.stateSubscribers.Front(); sub != nil; sub = next {
+ next = sub.Next()
+
+ ch, ok := (sub.Value).(chan<- FullState)
+ if !ok {
+ panic(fmt.Sprintf("statemachine: subscriber list element value has wrong type: %T", sub.Value))
+ }
+
+ select {
+ case ch <- FullState{sm.state, sm.original2InterpreterRatio, sm.language}:
+ default:
+ // subscriber is not responding...
+ // log.Printf("statemachine: removing subscriber '%v', because it is not responding", ch)
+ close(ch)
+ sm.stateSubscribers.Remove(sub)
+ }
+ }
+}
+
+func (sm *StateMachine) subscribeState(out chan<- FullState) error {
+ // log.Printf("statemachine: %v subscribed to state changes", out)
+ sm.stateSubscribers.PushBack(out)
+ return nil
}
func (sm *StateMachine) handleMixerEvent(ev mixer.Event) {
@@ -266,6 +305,10 @@ func (sm *StateMachine) run() {
sm.initMixer()
t := time.NewTicker(10 * time.Millisecond)
for {
+ oldState := sm.state
+ oldOriginal2InterpreterRadio := sm.original2InterpreterRatio
+ oldLanguage := sm.language
+
select {
case <-t.C:
if sm.state == StateSettling {
@@ -284,13 +327,19 @@ func (sm *StateMachine) run() {
case req := <-sm.getOriginal2InterpreterRatioCh:
req.resCh <- sm.original2InterpreterRatio
case req := <-sm.getStateCh:
- req.resCh <- getStateRes{sm.state, sm.original2InterpreterRatio, sm.language}
+ req.resCh <- getStateRes{FullState{sm.state, sm.original2InterpreterRatio, sm.language}}
+ case req := <-sm.subscribeStateCh:
+ req.resCh <- sm.subscribeState(req.ch)
case ev := <-sm.mixerEventCh:
sm.handleMixerEvent(ev)
sm.reconcile(false)
case <-sm.quitCh:
return
}
+
+ if oldState != sm.state || oldOriginal2InterpreterRadio != sm.original2InterpreterRatio || oldLanguage != sm.language {
+ sm.publishState()
+ }
}
}
@@ -302,6 +351,7 @@ func NewStateMachine(m *mixer.Mixer) (*StateMachine, error) {
sm.setOriginal2InterpreterRatioCh = make(chan setOriginal2InterpreterRatioReq, 10)
sm.getOriginal2InterpreterRatioCh = make(chan getOriginal2InterpreterRatioReq, 10)
sm.getStateCh = make(chan getStateReq, 10)
+ sm.subscribeStateCh = make(chan subscribeStateReq, 10)
sm.quitCh = make(chan bool, 1)
sm.exitedCh = make(chan struct{})
sm.mixerEventCh = make(chan mixer.Event, 1000)
@@ -387,6 +437,12 @@ func (sm *StateMachine) GetState() (State, float32, Language) {
return res.state, res.ratio, res.language
}
+func (sm *StateMachine) SubscribeState(out chan<- FullState) error {
+ resCh := make(chan error)
+ sm.subscribeStateCh <- subscribeStateReq{out, resCh}
+ return <-resCh
+}
+
func (sm *StateMachine) Shutdown() {
select {
case sm.quitCh <- true:
diff --git a/cmd/dolmetschctld/web-static/socket.html b/cmd/dolmetschctld/web-static/socket.html
index 0dd88c5..f1156c7 100644
--- a/cmd/dolmetschctld/web-static/socket.html
+++ b/cmd/dolmetschctld/web-static/socket.html
@@ -43,6 +43,7 @@
this.sock_onopen = function() {
this.send({ command: "languages" });
+ this.send({ command: "subscribe" });
$('#buttonstate').removeAttr('disabled','disabled');
$('#buttonlang').removeAttr('disabled','disabled');
}
diff --git a/cmd/dolmetschctld/web_socket.go b/cmd/dolmetschctld/web_socket.go
index 3879650..f904a1c 100644
--- a/cmd/dolmetschctld/web_socket.go
+++ b/cmd/dolmetschctld/web_socket.go
@@ -71,7 +71,7 @@ func sendWebSocketErrorResponse(ws *websocket.Conn, code int, errStr string) {
sendWebSocketResponse(ws, rd)
}
-func webSocketHandleCommand(ws *websocket.Conn, sm *StateMachine, req webSocketRequest) {
+func webSocketHandleRequest(ws *websocket.Conn, sm *StateMachine, req webSocketRequest, subCh chan<- FullState) {
switch req.Command {
case "state":
if len(req.Args) != 0 {
@@ -84,6 +84,22 @@ func webSocketHandleCommand(ws *websocket.Conn, sm *StateMachine, req webSocketR
result.Type = "state"
result.State, result.Ratio, result.Lang = sm.GetState()
sendWebSocketResponse(ws, result)
+ case "subscribe":
+ if len(req.Args) != 0 {
+ sendWebSocketErrorResponse(ws, http.StatusBadRequest, "command 'subscribe' expects no arguments")
+ return
+ }
+
+ if err := sm.SubscribeState(subCh); err != nil {
+ sendWebSocketErrorResponse(ws, http.StatusInternalServerError, err.Error())
+ return
+ }
+
+ var result webSocketResponseState
+ result.ResponseCode = http.StatusOK
+ result.Type = "state"
+ result.State, result.Ratio, result.Lang = sm.GetState()
+ sendWebSocketResponse(ws, result)
case "languages":
if len(req.Args) != 0 {
sendWebSocketErrorResponse(ws, http.StatusBadRequest, "command 'languages' expects no arguments")
@@ -116,6 +132,32 @@ func webSocketHandleCommand(ws *websocket.Conn, sm *StateMachine, req webSocketR
}
}
+func webSocketSessionHandler(ws *websocket.Conn, sm *StateMachine, reqCh <-chan webSocketRequest) {
+ defer ws.Close()
+
+ subCh := make(chan FullState, 100)
+ for {
+ select {
+ case state, ok := <-subCh:
+ if !ok {
+ return
+ }
+ var result webSocketResponseState
+ result.ResponseCode = http.StatusOK
+ result.Type = "state"
+ result.State = state.state
+ result.Ratio = state.ratio
+ result.Lang = state.language
+ sendWebSocketResponse(ws, result)
+ case req, ok := <-reqCh:
+ if !ok {
+ return
+ }
+ webSocketHandleRequest(ws, sm, req, subCh)
+ }
+ }
+}
+
func webSocketHandler(sm *StateMachine, w http.ResponseWriter, r *http.Request) {
ws, err := websocket.Upgrade(w, r, nil, 64*1024, 64*1024)
if _, ok := err.(websocket.HandshakeError); ok {
@@ -126,6 +168,9 @@ func webSocketHandler(sm *StateMachine, w http.ResponseWriter, r *http.Request)
return
}
log.Println("Web(socket) client", ws.RemoteAddr(), "connected")
+ reqCh := make(chan webSocketRequest)
+ go webSocketSessionHandler(ws, sm, reqCh)
+ defer close(reqCh)
for {
t, r, err := ws.NextReader()
@@ -148,7 +193,7 @@ func webSocketHandler(sm *StateMachine, w http.ResponseWriter, r *http.Request)
return
}
- webSocketHandleCommand(ws, sm, req)
+ reqCh <- req
case websocket.BinaryMessage:
sendWebSocketErrorResponse(ws, http.StatusBadRequest, "binary messages are not allowed")
io.Copy(ioutil.Discard, r) // consume all the data