diff options
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/dolmetschctld/statemachine.go | 59 | ||||
-rw-r--r-- | cmd/dolmetschctld/web_socket.go | 64 |
2 files changed, 72 insertions, 51 deletions
diff --git a/cmd/dolmetschctld/statemachine.go b/cmd/dolmetschctld/statemachine.go index a779604..3e92f00 100644 --- a/cmd/dolmetschctld/statemachine.go +++ b/cmd/dolmetschctld/statemachine.go @@ -26,6 +26,7 @@ import ( "container/list" "fmt" "log" + "sync" "time" "spreadspace.org/dolmetschctl/pkg/mixer" @@ -83,6 +84,11 @@ type subscribeStateReq struct { resCh chan error } +type stateSubscriber struct { + publish chan<- types.FullState + unsubscribe <-chan struct{} +} + type StateMachine struct { mixer *mixer.Mixer @@ -92,7 +98,6 @@ type StateMachine struct { setOriginal2InterpreterRatioCh chan setOriginal2InterpreterRatioReq getOriginal2InterpreterRatioCh chan getOriginal2InterpreterRatioReq getStateCh chan getStateReq - subscribeStateCh chan subscribeStateReq quitCh chan bool exitedCh chan struct{} mixerEventCh chan mixer.Event @@ -104,36 +109,41 @@ type StateMachine struct { original2InterpreterRatio float32 language types.Language - stateSubscribers list.List + stateSubscribersLock sync.Mutex + stateSubscribers list.List } func (sm *StateMachine) publishState() { + sm.stateSubscribersLock.Lock() + defer sm.stateSubscribersLock.Unlock() + var next *list.Element - for sub := sm.stateSubscribers.Front(); sub != nil; sub = next { - next = sub.Next() + for entry := sm.stateSubscribers.Front(); entry != nil; entry = next { + next = entry.Next() - ch, ok := (sub.Value).(chan<- types.FullState) + sub, ok := (entry.Value).(stateSubscriber) if !ok { - panic(fmt.Sprintf("statemachine: subscriber list element value has wrong type: %T", sub.Value)) + panic(fmt.Sprintf("statemachine: subscriber list element value has wrong type: %T", entry.Value)) } select { - case ch <- types.FullState{sm.state, sm.original2InterpreterRatio, sm.language}: + case <-sub.unsubscribe: + log.Printf("statemachine: removing subscriber '%v', because it has unsubscribed", sub.publish) + close(sub.publish) + sm.stateSubscribers.Remove(entry) default: - // subscriber is not responding... - // log.Printf("statemachine: removing subscriber '%v', because it is not responding", ch) - close(ch) - sm.stateSubscribers.Remove(sub) + select { + case sub.publish <- types.FullState{sm.state, sm.original2InterpreterRatio, sm.language}: + default: + // subscriber is not responding... + log.Printf("statemachine: removing subscriber '%v', because it is not responding", sub.publish) + close(sub.publish) + sm.stateSubscribers.Remove(entry) + } } } } -func (sm *StateMachine) subscribeState(out chan<- types.FullState) error { - // log.Printf("statemachine: %v subscribed to state changes", out) - sm.stateSubscribers.PushBack(out) - return nil -} - func (sm *StateMachine) handleMixerEvent(ev mixer.Event) { lang, exists := sm.channel2lang[ev.Channel] if !exists { @@ -283,8 +293,6 @@ func (sm *StateMachine) run() { req.resCh <- sm.original2InterpreterRatio case req := <-sm.getStateCh: req.resCh <- getStateRes{types.FullState{sm.state, sm.original2InterpreterRatio, sm.language}} - case req := <-sm.subscribeStateCh: - req.resCh <- sm.subscribeState(req.ch) case ev := <-sm.mixerEventCh: sm.handleMixerEvent(ev) sm.reconcile(false) @@ -306,7 +314,6 @@ func NewStateMachine(m *mixer.Mixer) (*StateMachine, error) { sm.setOriginal2InterpreterRatioCh = make(chan setOriginal2InterpreterRatioReq, 10) sm.getOriginal2InterpreterRatioCh = make(chan getOriginal2InterpreterRatioReq, 10) sm.getStateCh = make(chan getStateReq, 10) - sm.subscribeStateCh = make(chan subscribeStateReq, 10) sm.quitCh = make(chan bool, 1) sm.exitedCh = make(chan struct{}) sm.mixerEventCh = make(chan mixer.Event, 1000) @@ -392,10 +399,14 @@ func (sm *StateMachine) GetState() (types.State, float32, types.Language) { return res.State, res.Ratio, res.Language } -func (sm *StateMachine) SubscribeState(out chan<- types.FullState) error { - resCh := make(chan error) - sm.subscribeStateCh <- subscribeStateReq{out, resCh} - return <-resCh +func (sm *StateMachine) SubscribeState(out chan<- types.FullState) chan<- struct{} { + sm.stateSubscribersLock.Lock() + defer sm.stateSubscribersLock.Unlock() + + log.Printf("statemachine: %v subscribed to state changes", out) + unsubscribe := make(chan struct{}) + sm.stateSubscribers.PushBack(stateSubscriber{publish: out, unsubscribe: unsubscribe}) + return unsubscribe } func (sm *StateMachine) Shutdown() { diff --git a/cmd/dolmetschctld/web_socket.go b/cmd/dolmetschctld/web_socket.go index 09f2efc..dcbb311 100644 --- a/cmd/dolmetschctld/web_socket.go +++ b/cmd/dolmetschctld/web_socket.go @@ -47,74 +47,83 @@ func sendWebSocketErrorResponse(ws *websocket.Conn, code int, errStr string) { sendWebSocketResponse(ws, rd) } -func webSocketHandleRequest(ws *websocket.Conn, sm *StateMachine, req types.WebSocketRequest, subCh chan<- types.FullState) { +type webSocketSession struct { + ws *websocket.Conn + sm *StateMachine + stateSubCh chan types.FullState + stateUnsubCh chan<- struct{} +} + +func (s *webSocketSession) handleRequest(req types.WebSocketRequest) { switch req.Command { case "state": if len(req.Args) != 0 { - sendWebSocketErrorResponse(ws, http.StatusBadRequest, "command 'state' expects no arguments") + sendWebSocketErrorResponse(s.ws, http.StatusBadRequest, "command 'state' expects no arguments") return } var result types.WebSocketResponseState result.ResponseCode = http.StatusOK result.Type = "state" - result.State, result.Ratio, result.Lang = sm.GetState() - sendWebSocketResponse(ws, result) + result.State, result.Ratio, result.Lang = s.sm.GetState() + sendWebSocketResponse(s.ws, result) case "subscribe": if len(req.Args) != 0 { - sendWebSocketErrorResponse(ws, http.StatusBadRequest, "command 'subscribe' expects no arguments") + sendWebSocketErrorResponse(s.ws, http.StatusBadRequest, "command 'subscribe' expects no arguments") return } - if err := sm.SubscribeState(subCh); err != nil { - sendWebSocketErrorResponse(ws, http.StatusInternalServerError, err.Error()) - return - } + s.stateUnsubCh = s.sm.SubscribeState(s.stateSubCh) var result types.WebSocketResponseState result.ResponseCode = http.StatusOK result.Type = "state" - result.State, result.Ratio, result.Lang = sm.GetState() - sendWebSocketResponse(ws, result) + result.State, result.Ratio, result.Lang = s.sm.GetState() + sendWebSocketResponse(s.ws, result) case "languages": if len(req.Args) != 0 { - sendWebSocketErrorResponse(ws, http.StatusBadRequest, "command 'languages' expects no arguments") + sendWebSocketErrorResponse(s.ws, http.StatusBadRequest, "command 'languages' expects no arguments") return } var result types.WebSocketResponseLanguages result.ResponseCode = http.StatusOK result.Type = "languages" - result.Languages = sm.GetLanguages() - sendWebSocketResponse(ws, result) + result.Languages = s.sm.GetLanguages() + sendWebSocketResponse(s.ws, result) case "language": if len(req.Args) != 1 { - sendWebSocketErrorResponse(ws, http.StatusBadRequest, "command 'language' expects exatly one argument") + sendWebSocketErrorResponse(s.ws, http.StatusBadRequest, "command 'language' expects exatly one argument") return } - if err := sm.SetLanguage(types.Language(req.Args[0])); err != nil { - sendWebSocketErrorResponse(ws, http.StatusBadRequest, err.Error()) + if err := s.sm.SetLanguage(types.Language(req.Args[0])); err != nil { + sendWebSocketErrorResponse(s.ws, http.StatusBadRequest, err.Error()) return } var result types.WebSocketResponseState result.ResponseCode = http.StatusOK result.Type = "state" - result.State, result.Ratio, result.Lang = sm.GetState() - sendWebSocketResponse(ws, result) + result.State, result.Ratio, result.Lang = s.sm.GetState() + sendWebSocketResponse(s.ws, result) default: - sendWebSocketErrorResponse(ws, http.StatusBadRequest, "unkown command: '"+req.Command+"'") + sendWebSocketErrorResponse(s.ws, http.StatusBadRequest, "unkown command: '"+req.Command+"'") } } -func webSocketSessionHandler(ws *websocket.Conn, sm *StateMachine, reqCh <-chan types.WebSocketRequest) { - defer ws.Close() +func (s *webSocketSession) Handler(reqCh <-chan types.WebSocketRequest) { + defer func() { + s.ws.Close() + if s.stateUnsubCh != nil { + close(s.stateUnsubCh) + } + }() - subCh := make(chan types.FullState, 100) + s.stateSubCh = make(chan types.FullState, 100) for { select { - case state, ok := <-subCh: + case state, ok := <-s.stateSubCh: if !ok { return } @@ -124,12 +133,12 @@ func webSocketSessionHandler(ws *websocket.Conn, sm *StateMachine, reqCh <-chan result.State = state.State result.Ratio = state.Ratio result.Lang = state.Language - sendWebSocketResponse(ws, result) + sendWebSocketResponse(s.ws, result) case req, ok := <-reqCh: if !ok { return } - webSocketHandleRequest(ws, sm, req, subCh) + s.handleRequest(req) } } } @@ -145,7 +154,8 @@ func webSocketHandler(sm *StateMachine, w http.ResponseWriter, r *http.Request) } log.Println("Web(socket) client", ws.RemoteAddr(), "connected") reqCh := make(chan types.WebSocketRequest) - go webSocketSessionHandler(ws, sm, reqCh) + session := webSocketSession{ws: ws, sm: sm} + go session.Handler(reqCh) defer close(reqCh) for { |