summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/dolmetschctld/statemachine.go59
-rw-r--r--cmd/dolmetschctld/web_socket.go64
2 files changed, 72 insertions, 51 deletions
diff --git a/cmd/dolmetschctld/statemachine.go b/cmd/dolmetschctld/statemachine.go
index a779604..3e92f00 100644
--- a/cmd/dolmetschctld/statemachine.go
+++ b/cmd/dolmetschctld/statemachine.go
@@ -26,6 +26,7 @@ import (
"container/list"
"fmt"
"log"
+ "sync"
"time"
"spreadspace.org/dolmetschctl/pkg/mixer"
@@ -83,6 +84,11 @@ type subscribeStateReq struct {
resCh chan error
}
+type stateSubscriber struct {
+ publish chan<- types.FullState
+ unsubscribe <-chan struct{}
+}
+
type StateMachine struct {
mixer *mixer.Mixer
@@ -92,7 +98,6 @@ type StateMachine struct {
setOriginal2InterpreterRatioCh chan setOriginal2InterpreterRatioReq
getOriginal2InterpreterRatioCh chan getOriginal2InterpreterRatioReq
getStateCh chan getStateReq
- subscribeStateCh chan subscribeStateReq
quitCh chan bool
exitedCh chan struct{}
mixerEventCh chan mixer.Event
@@ -104,36 +109,41 @@ type StateMachine struct {
original2InterpreterRatio float32
language types.Language
- stateSubscribers list.List
+ stateSubscribersLock sync.Mutex
+ stateSubscribers list.List
}
func (sm *StateMachine) publishState() {
+ sm.stateSubscribersLock.Lock()
+ defer sm.stateSubscribersLock.Unlock()
+
var next *list.Element
- for sub := sm.stateSubscribers.Front(); sub != nil; sub = next {
- next = sub.Next()
+ for entry := sm.stateSubscribers.Front(); entry != nil; entry = next {
+ next = entry.Next()
- ch, ok := (sub.Value).(chan<- types.FullState)
+ sub, ok := (entry.Value).(stateSubscriber)
if !ok {
- panic(fmt.Sprintf("statemachine: subscriber list element value has wrong type: %T", sub.Value))
+ panic(fmt.Sprintf("statemachine: subscriber list element value has wrong type: %T", entry.Value))
}
select {
- case ch <- types.FullState{sm.state, sm.original2InterpreterRatio, sm.language}:
+ case <-sub.unsubscribe:
+ log.Printf("statemachine: removing subscriber '%v', because it has unsubscribed", sub.publish)
+ close(sub.publish)
+ sm.stateSubscribers.Remove(entry)
default:
- // subscriber is not responding...
- // log.Printf("statemachine: removing subscriber '%v', because it is not responding", ch)
- close(ch)
- sm.stateSubscribers.Remove(sub)
+ select {
+ case sub.publish <- types.FullState{sm.state, sm.original2InterpreterRatio, sm.language}:
+ default:
+ // subscriber is not responding...
+ log.Printf("statemachine: removing subscriber '%v', because it is not responding", sub.publish)
+ close(sub.publish)
+ sm.stateSubscribers.Remove(entry)
+ }
}
}
}
-func (sm *StateMachine) subscribeState(out chan<- types.FullState) error {
- // log.Printf("statemachine: %v subscribed to state changes", out)
- sm.stateSubscribers.PushBack(out)
- return nil
-}
-
func (sm *StateMachine) handleMixerEvent(ev mixer.Event) {
lang, exists := sm.channel2lang[ev.Channel]
if !exists {
@@ -283,8 +293,6 @@ func (sm *StateMachine) run() {
req.resCh <- sm.original2InterpreterRatio
case req := <-sm.getStateCh:
req.resCh <- getStateRes{types.FullState{sm.state, sm.original2InterpreterRatio, sm.language}}
- case req := <-sm.subscribeStateCh:
- req.resCh <- sm.subscribeState(req.ch)
case ev := <-sm.mixerEventCh:
sm.handleMixerEvent(ev)
sm.reconcile(false)
@@ -306,7 +314,6 @@ func NewStateMachine(m *mixer.Mixer) (*StateMachine, error) {
sm.setOriginal2InterpreterRatioCh = make(chan setOriginal2InterpreterRatioReq, 10)
sm.getOriginal2InterpreterRatioCh = make(chan getOriginal2InterpreterRatioReq, 10)
sm.getStateCh = make(chan getStateReq, 10)
- sm.subscribeStateCh = make(chan subscribeStateReq, 10)
sm.quitCh = make(chan bool, 1)
sm.exitedCh = make(chan struct{})
sm.mixerEventCh = make(chan mixer.Event, 1000)
@@ -392,10 +399,14 @@ func (sm *StateMachine) GetState() (types.State, float32, types.Language) {
return res.State, res.Ratio, res.Language
}
-func (sm *StateMachine) SubscribeState(out chan<- types.FullState) error {
- resCh := make(chan error)
- sm.subscribeStateCh <- subscribeStateReq{out, resCh}
- return <-resCh
+func (sm *StateMachine) SubscribeState(out chan<- types.FullState) chan<- struct{} {
+ sm.stateSubscribersLock.Lock()
+ defer sm.stateSubscribersLock.Unlock()
+
+ log.Printf("statemachine: %v subscribed to state changes", out)
+ unsubscribe := make(chan struct{})
+ sm.stateSubscribers.PushBack(stateSubscriber{publish: out, unsubscribe: unsubscribe})
+ return unsubscribe
}
func (sm *StateMachine) Shutdown() {
diff --git a/cmd/dolmetschctld/web_socket.go b/cmd/dolmetschctld/web_socket.go
index 09f2efc..dcbb311 100644
--- a/cmd/dolmetschctld/web_socket.go
+++ b/cmd/dolmetschctld/web_socket.go
@@ -47,74 +47,83 @@ func sendWebSocketErrorResponse(ws *websocket.Conn, code int, errStr string) {
sendWebSocketResponse(ws, rd)
}
-func webSocketHandleRequest(ws *websocket.Conn, sm *StateMachine, req types.WebSocketRequest, subCh chan<- types.FullState) {
+type webSocketSession struct {
+ ws *websocket.Conn
+ sm *StateMachine
+ stateSubCh chan types.FullState
+ stateUnsubCh chan<- struct{}
+}
+
+func (s *webSocketSession) handleRequest(req types.WebSocketRequest) {
switch req.Command {
case "state":
if len(req.Args) != 0 {
- sendWebSocketErrorResponse(ws, http.StatusBadRequest, "command 'state' expects no arguments")
+ sendWebSocketErrorResponse(s.ws, http.StatusBadRequest, "command 'state' expects no arguments")
return
}
var result types.WebSocketResponseState
result.ResponseCode = http.StatusOK
result.Type = "state"
- result.State, result.Ratio, result.Lang = sm.GetState()
- sendWebSocketResponse(ws, result)
+ result.State, result.Ratio, result.Lang = s.sm.GetState()
+ sendWebSocketResponse(s.ws, result)
case "subscribe":
if len(req.Args) != 0 {
- sendWebSocketErrorResponse(ws, http.StatusBadRequest, "command 'subscribe' expects no arguments")
+ sendWebSocketErrorResponse(s.ws, http.StatusBadRequest, "command 'subscribe' expects no arguments")
return
}
- if err := sm.SubscribeState(subCh); err != nil {
- sendWebSocketErrorResponse(ws, http.StatusInternalServerError, err.Error())
- return
- }
+ s.stateUnsubCh = s.sm.SubscribeState(s.stateSubCh)
var result types.WebSocketResponseState
result.ResponseCode = http.StatusOK
result.Type = "state"
- result.State, result.Ratio, result.Lang = sm.GetState()
- sendWebSocketResponse(ws, result)
+ result.State, result.Ratio, result.Lang = s.sm.GetState()
+ sendWebSocketResponse(s.ws, result)
case "languages":
if len(req.Args) != 0 {
- sendWebSocketErrorResponse(ws, http.StatusBadRequest, "command 'languages' expects no arguments")
+ sendWebSocketErrorResponse(s.ws, http.StatusBadRequest, "command 'languages' expects no arguments")
return
}
var result types.WebSocketResponseLanguages
result.ResponseCode = http.StatusOK
result.Type = "languages"
- result.Languages = sm.GetLanguages()
- sendWebSocketResponse(ws, result)
+ result.Languages = s.sm.GetLanguages()
+ sendWebSocketResponse(s.ws, result)
case "language":
if len(req.Args) != 1 {
- sendWebSocketErrorResponse(ws, http.StatusBadRequest, "command 'language' expects exatly one argument")
+ sendWebSocketErrorResponse(s.ws, http.StatusBadRequest, "command 'language' expects exatly one argument")
return
}
- if err := sm.SetLanguage(types.Language(req.Args[0])); err != nil {
- sendWebSocketErrorResponse(ws, http.StatusBadRequest, err.Error())
+ if err := s.sm.SetLanguage(types.Language(req.Args[0])); err != nil {
+ sendWebSocketErrorResponse(s.ws, http.StatusBadRequest, err.Error())
return
}
var result types.WebSocketResponseState
result.ResponseCode = http.StatusOK
result.Type = "state"
- result.State, result.Ratio, result.Lang = sm.GetState()
- sendWebSocketResponse(ws, result)
+ result.State, result.Ratio, result.Lang = s.sm.GetState()
+ sendWebSocketResponse(s.ws, result)
default:
- sendWebSocketErrorResponse(ws, http.StatusBadRequest, "unkown command: '"+req.Command+"'")
+ sendWebSocketErrorResponse(s.ws, http.StatusBadRequest, "unkown command: '"+req.Command+"'")
}
}
-func webSocketSessionHandler(ws *websocket.Conn, sm *StateMachine, reqCh <-chan types.WebSocketRequest) {
- defer ws.Close()
+func (s *webSocketSession) Handler(reqCh <-chan types.WebSocketRequest) {
+ defer func() {
+ s.ws.Close()
+ if s.stateUnsubCh != nil {
+ close(s.stateUnsubCh)
+ }
+ }()
- subCh := make(chan types.FullState, 100)
+ s.stateSubCh = make(chan types.FullState, 100)
for {
select {
- case state, ok := <-subCh:
+ case state, ok := <-s.stateSubCh:
if !ok {
return
}
@@ -124,12 +133,12 @@ func webSocketSessionHandler(ws *websocket.Conn, sm *StateMachine, reqCh <-chan
result.State = state.State
result.Ratio = state.Ratio
result.Lang = state.Language
- sendWebSocketResponse(ws, result)
+ sendWebSocketResponse(s.ws, result)
case req, ok := <-reqCh:
if !ok {
return
}
- webSocketHandleRequest(ws, sm, req, subCh)
+ s.handleRequest(req)
}
}
}
@@ -145,7 +154,8 @@ func webSocketHandler(sm *StateMachine, w http.ResponseWriter, r *http.Request)
}
log.Println("Web(socket) client", ws.RemoteAddr(), "connected")
reqCh := make(chan types.WebSocketRequest)
- go webSocketSessionHandler(ws, sm, reqCh)
+ session := webSocketSession{ws: ws, sm: sm}
+ go session.Handler(reqCh)
defer close(reqCh)
for {