diff options
author | Christian Pointner <equinox@spreadspace.org> | 2019-02-13 20:12:31 +0100 |
---|---|---|
committer | Christian Pointner <equinox@spreadspace.org> | 2019-02-13 20:12:31 +0100 |
commit | 2154c5b763188e2ed1bedca911635805ff43b90a (patch) | |
tree | 7e44222f56ba63293da38a432660e758d235b5db /cmd/dolmetschctld | |
parent | implemented command to fetch list of languages (diff) |
added state change subscription to state machine
Diffstat (limited to 'cmd/dolmetschctld')
-rw-r--r-- | cmd/dolmetschctld/statemachine.go | 68 | ||||
-rw-r--r-- | cmd/dolmetschctld/web-static/socket.html | 1 | ||||
-rw-r--r-- | cmd/dolmetschctld/web_socket.go | 49 |
3 files changed, 110 insertions, 8 deletions
diff --git a/cmd/dolmetschctld/statemachine.go b/cmd/dolmetschctld/statemachine.go index 99416a5..2d8013f 100644 --- a/cmd/dolmetschctld/statemachine.go +++ b/cmd/dolmetschctld/statemachine.go @@ -23,6 +23,7 @@ package main import ( + "container/list" "fmt" "log" "time" @@ -70,6 +71,12 @@ func (s Language) MarshalText() (data []byte, err error) { return } +type FullState struct { + state State + ratio float32 + language Language +} + type MixerChannelState struct { level mixer.FaderLevel mute mixer.Mute @@ -109,15 +116,18 @@ type getOriginal2InterpreterRatioReq struct { } type getStateRes struct { - state State - ratio float32 - language Language + FullState } type getStateReq struct { resCh chan getStateRes } +type subscribeStateReq struct { + ch chan<- FullState + resCh chan error +} + type StateMachine struct { mixer *mixer.Mixer @@ -127,6 +137,7 @@ type StateMachine struct { setOriginal2InterpreterRatioCh chan setOriginal2InterpreterRatioReq getOriginal2InterpreterRatioCh chan getOriginal2InterpreterRatioReq getStateCh chan getStateReq + subscribeStateCh chan subscribeStateReq quitCh chan bool exitedCh chan struct{} mixerEventCh chan mixer.Event @@ -134,10 +145,38 @@ type StateMachine struct { languages map[Language]*MixerChannels channel2lang map[mixer.Channel]Language + state State original2InterpreterRatio float32 + language Language - state State - language Language + stateSubscribers list.List +} + +func (sm *StateMachine) publishState() { + var next *list.Element + for sub := sm.stateSubscribers.Front(); sub != nil; sub = next { + next = sub.Next() + + ch, ok := (sub.Value).(chan<- FullState) + if !ok { + panic(fmt.Sprintf("statemachine: subscriber list element value has wrong type: %T", sub.Value)) + } + + select { + case ch <- FullState{sm.state, sm.original2InterpreterRatio, sm.language}: + default: + // subscriber is not responding... + // log.Printf("statemachine: removing subscriber '%v', because it is not responding", ch) + close(ch) + sm.stateSubscribers.Remove(sub) + } + } +} + +func (sm *StateMachine) subscribeState(out chan<- FullState) error { + // log.Printf("statemachine: %v subscribed to state changes", out) + sm.stateSubscribers.PushBack(out) + return nil } func (sm *StateMachine) handleMixerEvent(ev mixer.Event) { @@ -266,6 +305,10 @@ func (sm *StateMachine) run() { sm.initMixer() t := time.NewTicker(10 * time.Millisecond) for { + oldState := sm.state + oldOriginal2InterpreterRadio := sm.original2InterpreterRatio + oldLanguage := sm.language + select { case <-t.C: if sm.state == StateSettling { @@ -284,13 +327,19 @@ func (sm *StateMachine) run() { case req := <-sm.getOriginal2InterpreterRatioCh: req.resCh <- sm.original2InterpreterRatio case req := <-sm.getStateCh: - req.resCh <- getStateRes{sm.state, sm.original2InterpreterRatio, sm.language} + req.resCh <- getStateRes{FullState{sm.state, sm.original2InterpreterRatio, sm.language}} + case req := <-sm.subscribeStateCh: + req.resCh <- sm.subscribeState(req.ch) case ev := <-sm.mixerEventCh: sm.handleMixerEvent(ev) sm.reconcile(false) case <-sm.quitCh: return } + + if oldState != sm.state || oldOriginal2InterpreterRadio != sm.original2InterpreterRatio || oldLanguage != sm.language { + sm.publishState() + } } } @@ -302,6 +351,7 @@ func NewStateMachine(m *mixer.Mixer) (*StateMachine, error) { sm.setOriginal2InterpreterRatioCh = make(chan setOriginal2InterpreterRatioReq, 10) sm.getOriginal2InterpreterRatioCh = make(chan getOriginal2InterpreterRatioReq, 10) sm.getStateCh = make(chan getStateReq, 10) + sm.subscribeStateCh = make(chan subscribeStateReq, 10) sm.quitCh = make(chan bool, 1) sm.exitedCh = make(chan struct{}) sm.mixerEventCh = make(chan mixer.Event, 1000) @@ -387,6 +437,12 @@ func (sm *StateMachine) GetState() (State, float32, Language) { return res.state, res.ratio, res.language } +func (sm *StateMachine) SubscribeState(out chan<- FullState) error { + resCh := make(chan error) + sm.subscribeStateCh <- subscribeStateReq{out, resCh} + return <-resCh +} + func (sm *StateMachine) Shutdown() { select { case sm.quitCh <- true: diff --git a/cmd/dolmetschctld/web-static/socket.html b/cmd/dolmetschctld/web-static/socket.html index 0dd88c5..f1156c7 100644 --- a/cmd/dolmetschctld/web-static/socket.html +++ b/cmd/dolmetschctld/web-static/socket.html @@ -43,6 +43,7 @@ this.sock_onopen = function() { this.send({ command: "languages" }); + this.send({ command: "subscribe" }); $('#buttonstate').removeAttr('disabled','disabled'); $('#buttonlang').removeAttr('disabled','disabled'); } diff --git a/cmd/dolmetschctld/web_socket.go b/cmd/dolmetschctld/web_socket.go index 3879650..f904a1c 100644 --- a/cmd/dolmetschctld/web_socket.go +++ b/cmd/dolmetschctld/web_socket.go @@ -71,7 +71,7 @@ func sendWebSocketErrorResponse(ws *websocket.Conn, code int, errStr string) { sendWebSocketResponse(ws, rd) } -func webSocketHandleCommand(ws *websocket.Conn, sm *StateMachine, req webSocketRequest) { +func webSocketHandleRequest(ws *websocket.Conn, sm *StateMachine, req webSocketRequest, subCh chan<- FullState) { switch req.Command { case "state": if len(req.Args) != 0 { @@ -84,6 +84,22 @@ func webSocketHandleCommand(ws *websocket.Conn, sm *StateMachine, req webSocketR result.Type = "state" result.State, result.Ratio, result.Lang = sm.GetState() sendWebSocketResponse(ws, result) + case "subscribe": + if len(req.Args) != 0 { + sendWebSocketErrorResponse(ws, http.StatusBadRequest, "command 'subscribe' expects no arguments") + return + } + + if err := sm.SubscribeState(subCh); err != nil { + sendWebSocketErrorResponse(ws, http.StatusInternalServerError, err.Error()) + return + } + + var result webSocketResponseState + result.ResponseCode = http.StatusOK + result.Type = "state" + result.State, result.Ratio, result.Lang = sm.GetState() + sendWebSocketResponse(ws, result) case "languages": if len(req.Args) != 0 { sendWebSocketErrorResponse(ws, http.StatusBadRequest, "command 'languages' expects no arguments") @@ -116,6 +132,32 @@ func webSocketHandleCommand(ws *websocket.Conn, sm *StateMachine, req webSocketR } } +func webSocketSessionHandler(ws *websocket.Conn, sm *StateMachine, reqCh <-chan webSocketRequest) { + defer ws.Close() + + subCh := make(chan FullState, 100) + for { + select { + case state, ok := <-subCh: + if !ok { + return + } + var result webSocketResponseState + result.ResponseCode = http.StatusOK + result.Type = "state" + result.State = state.state + result.Ratio = state.ratio + result.Lang = state.language + sendWebSocketResponse(ws, result) + case req, ok := <-reqCh: + if !ok { + return + } + webSocketHandleRequest(ws, sm, req, subCh) + } + } +} + func webSocketHandler(sm *StateMachine, w http.ResponseWriter, r *http.Request) { ws, err := websocket.Upgrade(w, r, nil, 64*1024, 64*1024) if _, ok := err.(websocket.HandshakeError); ok { @@ -126,6 +168,9 @@ func webSocketHandler(sm *StateMachine, w http.ResponseWriter, r *http.Request) return } log.Println("Web(socket) client", ws.RemoteAddr(), "connected") + reqCh := make(chan webSocketRequest) + go webSocketSessionHandler(ws, sm, reqCh) + defer close(reqCh) for { t, r, err := ws.NextReader() @@ -148,7 +193,7 @@ func webSocketHandler(sm *StateMachine, w http.ResponseWriter, r *http.Request) return } - webSocketHandleCommand(ws, sm, req) + reqCh <- req case websocket.BinaryMessage: sendWebSocketErrorResponse(ws, http.StatusBadRequest, "binary messages are not allowed") io.Copy(ioutil.Discard, r) // consume all the data |