summaryrefslogtreecommitdiff
path: root/cmd/dolmetschctld/statemachine.go
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/dolmetschctld/statemachine.go')
-rw-r--r--cmd/dolmetschctld/statemachine.go59
1 files changed, 35 insertions, 24 deletions
diff --git a/cmd/dolmetschctld/statemachine.go b/cmd/dolmetschctld/statemachine.go
index a779604..3e92f00 100644
--- a/cmd/dolmetschctld/statemachine.go
+++ b/cmd/dolmetschctld/statemachine.go
@@ -26,6 +26,7 @@ import (
"container/list"
"fmt"
"log"
+ "sync"
"time"
"spreadspace.org/dolmetschctl/pkg/mixer"
@@ -83,6 +84,11 @@ type subscribeStateReq struct {
resCh chan error
}
+type stateSubscriber struct {
+ publish chan<- types.FullState
+ unsubscribe <-chan struct{}
+}
+
type StateMachine struct {
mixer *mixer.Mixer
@@ -92,7 +98,6 @@ type StateMachine struct {
setOriginal2InterpreterRatioCh chan setOriginal2InterpreterRatioReq
getOriginal2InterpreterRatioCh chan getOriginal2InterpreterRatioReq
getStateCh chan getStateReq
- subscribeStateCh chan subscribeStateReq
quitCh chan bool
exitedCh chan struct{}
mixerEventCh chan mixer.Event
@@ -104,36 +109,41 @@ type StateMachine struct {
original2InterpreterRatio float32
language types.Language
- stateSubscribers list.List
+ stateSubscribersLock sync.Mutex
+ stateSubscribers list.List
}
func (sm *StateMachine) publishState() {
+ sm.stateSubscribersLock.Lock()
+ defer sm.stateSubscribersLock.Unlock()
+
var next *list.Element
- for sub := sm.stateSubscribers.Front(); sub != nil; sub = next {
- next = sub.Next()
+ for entry := sm.stateSubscribers.Front(); entry != nil; entry = next {
+ next = entry.Next()
- ch, ok := (sub.Value).(chan<- types.FullState)
+ sub, ok := (entry.Value).(stateSubscriber)
if !ok {
- panic(fmt.Sprintf("statemachine: subscriber list element value has wrong type: %T", sub.Value))
+ panic(fmt.Sprintf("statemachine: subscriber list element value has wrong type: %T", entry.Value))
}
select {
- case ch <- types.FullState{sm.state, sm.original2InterpreterRatio, sm.language}:
+ case <-sub.unsubscribe:
+ log.Printf("statemachine: removing subscriber '%v', because it has unsubscribed", sub.publish)
+ close(sub.publish)
+ sm.stateSubscribers.Remove(entry)
default:
- // subscriber is not responding...
- // log.Printf("statemachine: removing subscriber '%v', because it is not responding", ch)
- close(ch)
- sm.stateSubscribers.Remove(sub)
+ select {
+ case sub.publish <- types.FullState{sm.state, sm.original2InterpreterRatio, sm.language}:
+ default:
+ // subscriber is not responding...
+ log.Printf("statemachine: removing subscriber '%v', because it is not responding", sub.publish)
+ close(sub.publish)
+ sm.stateSubscribers.Remove(entry)
+ }
}
}
}
-func (sm *StateMachine) subscribeState(out chan<- types.FullState) error {
- // log.Printf("statemachine: %v subscribed to state changes", out)
- sm.stateSubscribers.PushBack(out)
- return nil
-}
-
func (sm *StateMachine) handleMixerEvent(ev mixer.Event) {
lang, exists := sm.channel2lang[ev.Channel]
if !exists {
@@ -283,8 +293,6 @@ func (sm *StateMachine) run() {
req.resCh <- sm.original2InterpreterRatio
case req := <-sm.getStateCh:
req.resCh <- getStateRes{types.FullState{sm.state, sm.original2InterpreterRatio, sm.language}}
- case req := <-sm.subscribeStateCh:
- req.resCh <- sm.subscribeState(req.ch)
case ev := <-sm.mixerEventCh:
sm.handleMixerEvent(ev)
sm.reconcile(false)
@@ -306,7 +314,6 @@ func NewStateMachine(m *mixer.Mixer) (*StateMachine, error) {
sm.setOriginal2InterpreterRatioCh = make(chan setOriginal2InterpreterRatioReq, 10)
sm.getOriginal2InterpreterRatioCh = make(chan getOriginal2InterpreterRatioReq, 10)
sm.getStateCh = make(chan getStateReq, 10)
- sm.subscribeStateCh = make(chan subscribeStateReq, 10)
sm.quitCh = make(chan bool, 1)
sm.exitedCh = make(chan struct{})
sm.mixerEventCh = make(chan mixer.Event, 1000)
@@ -392,10 +399,14 @@ func (sm *StateMachine) GetState() (types.State, float32, types.Language) {
return res.State, res.Ratio, res.Language
}
-func (sm *StateMachine) SubscribeState(out chan<- types.FullState) error {
- resCh := make(chan error)
- sm.subscribeStateCh <- subscribeStateReq{out, resCh}
- return <-resCh
+func (sm *StateMachine) SubscribeState(out chan<- types.FullState) chan<- struct{} {
+ sm.stateSubscribersLock.Lock()
+ defer sm.stateSubscribersLock.Unlock()
+
+ log.Printf("statemachine: %v subscribed to state changes", out)
+ unsubscribe := make(chan struct{})
+ sm.stateSubscribers.PushBack(stateSubscriber{publish: out, unsubscribe: unsubscribe})
+ return unsubscribe
}
func (sm *StateMachine) Shutdown() {