diff options
Diffstat (limited to 'cmd/dolmetschctld/statemachine.go')
-rw-r--r-- | cmd/dolmetschctld/statemachine.go | 59 |
1 files changed, 35 insertions, 24 deletions
diff --git a/cmd/dolmetschctld/statemachine.go b/cmd/dolmetschctld/statemachine.go index a779604..3e92f00 100644 --- a/cmd/dolmetschctld/statemachine.go +++ b/cmd/dolmetschctld/statemachine.go @@ -26,6 +26,7 @@ import ( "container/list" "fmt" "log" + "sync" "time" "spreadspace.org/dolmetschctl/pkg/mixer" @@ -83,6 +84,11 @@ type subscribeStateReq struct { resCh chan error } +type stateSubscriber struct { + publish chan<- types.FullState + unsubscribe <-chan struct{} +} + type StateMachine struct { mixer *mixer.Mixer @@ -92,7 +98,6 @@ type StateMachine struct { setOriginal2InterpreterRatioCh chan setOriginal2InterpreterRatioReq getOriginal2InterpreterRatioCh chan getOriginal2InterpreterRatioReq getStateCh chan getStateReq - subscribeStateCh chan subscribeStateReq quitCh chan bool exitedCh chan struct{} mixerEventCh chan mixer.Event @@ -104,36 +109,41 @@ type StateMachine struct { original2InterpreterRatio float32 language types.Language - stateSubscribers list.List + stateSubscribersLock sync.Mutex + stateSubscribers list.List } func (sm *StateMachine) publishState() { + sm.stateSubscribersLock.Lock() + defer sm.stateSubscribersLock.Unlock() + var next *list.Element - for sub := sm.stateSubscribers.Front(); sub != nil; sub = next { - next = sub.Next() + for entry := sm.stateSubscribers.Front(); entry != nil; entry = next { + next = entry.Next() - ch, ok := (sub.Value).(chan<- types.FullState) + sub, ok := (entry.Value).(stateSubscriber) if !ok { - panic(fmt.Sprintf("statemachine: subscriber list element value has wrong type: %T", sub.Value)) + panic(fmt.Sprintf("statemachine: subscriber list element value has wrong type: %T", entry.Value)) } select { - case ch <- types.FullState{sm.state, sm.original2InterpreterRatio, sm.language}: + case <-sub.unsubscribe: + log.Printf("statemachine: removing subscriber '%v', because it has unsubscribed", sub.publish) + close(sub.publish) + sm.stateSubscribers.Remove(entry) default: - // subscriber is not responding... - // log.Printf("statemachine: removing subscriber '%v', because it is not responding", ch) - close(ch) - sm.stateSubscribers.Remove(sub) + select { + case sub.publish <- types.FullState{sm.state, sm.original2InterpreterRatio, sm.language}: + default: + // subscriber is not responding... + log.Printf("statemachine: removing subscriber '%v', because it is not responding", sub.publish) + close(sub.publish) + sm.stateSubscribers.Remove(entry) + } } } } -func (sm *StateMachine) subscribeState(out chan<- types.FullState) error { - // log.Printf("statemachine: %v subscribed to state changes", out) - sm.stateSubscribers.PushBack(out) - return nil -} - func (sm *StateMachine) handleMixerEvent(ev mixer.Event) { lang, exists := sm.channel2lang[ev.Channel] if !exists { @@ -283,8 +293,6 @@ func (sm *StateMachine) run() { req.resCh <- sm.original2InterpreterRatio case req := <-sm.getStateCh: req.resCh <- getStateRes{types.FullState{sm.state, sm.original2InterpreterRatio, sm.language}} - case req := <-sm.subscribeStateCh: - req.resCh <- sm.subscribeState(req.ch) case ev := <-sm.mixerEventCh: sm.handleMixerEvent(ev) sm.reconcile(false) @@ -306,7 +314,6 @@ func NewStateMachine(m *mixer.Mixer) (*StateMachine, error) { sm.setOriginal2InterpreterRatioCh = make(chan setOriginal2InterpreterRatioReq, 10) sm.getOriginal2InterpreterRatioCh = make(chan getOriginal2InterpreterRatioReq, 10) sm.getStateCh = make(chan getStateReq, 10) - sm.subscribeStateCh = make(chan subscribeStateReq, 10) sm.quitCh = make(chan bool, 1) sm.exitedCh = make(chan struct{}) sm.mixerEventCh = make(chan mixer.Event, 1000) @@ -392,10 +399,14 @@ func (sm *StateMachine) GetState() (types.State, float32, types.Language) { return res.State, res.Ratio, res.Language } -func (sm *StateMachine) SubscribeState(out chan<- types.FullState) error { - resCh := make(chan error) - sm.subscribeStateCh <- subscribeStateReq{out, resCh} - return <-resCh +func (sm *StateMachine) SubscribeState(out chan<- types.FullState) chan<- struct{} { + sm.stateSubscribersLock.Lock() + defer sm.stateSubscribersLock.Unlock() + + log.Printf("statemachine: %v subscribed to state changes", out) + unsubscribe := make(chan struct{}) + sm.stateSubscribers.PushBack(stateSubscriber{publish: out, unsubscribe: unsubscribe}) + return unsubscribe } func (sm *StateMachine) Shutdown() { |