diff options
-rw-r--r-- | cmd/dolmetschctl/dolmetschctl.go | 3 | ||||
-rw-r--r-- | pkg/controller/controller.go | 52 | ||||
-rw-r--r-- | pkg/mixer/mixer.go | 54 |
3 files changed, 77 insertions, 32 deletions
diff --git a/cmd/dolmetschctl/dolmetschctl.go b/cmd/dolmetschctl/dolmetschctl.go index 9e878a8..86fde95 100644 --- a/cmd/dolmetschctl/dolmetschctl.go +++ b/cmd/dolmetschctl/dolmetschctl.go @@ -122,7 +122,8 @@ func handleWebSocketMessage(ctrl *controller.Controller, ws *websocket.Conn, msg func run(ctrl *controller.Controller, ws *websocket.Conn) error { ctrlCh := make(chan controller.Event, 100) - ctrl.Subscribe(ctrlCh) + unsub := ctrl.Subscribe(ctrlCh) + defer close(unsub) wsCh := make(chan types.WebSocketResponseFull) go wsReader(ws, wsCh) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 644cdd1..36c3776 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -26,8 +26,9 @@ import ( "container/list" "errors" "fmt" - // "log" + "log" "strings" + "sync" "github.com/scgolang/midi" ) @@ -71,9 +72,15 @@ type Event struct { Type EventType } +type subscriber struct { + publish chan<- Event + unsubscribe <-chan struct{} +} + type Controller struct { - Dev *midi.Device - subscribers list.List + Dev *midi.Device + subscribersLock sync.Mutex + subscribers list.List } func openDevice(devices []*midi.Device, prefix string) (d *midi.Device, err error) { @@ -104,22 +111,32 @@ func NewController(c Config) (*Controller, error) { } func (c *Controller) sendEvent(ev Event) { + c.subscribersLock.Lock() + defer c.subscribersLock.Unlock() + var next *list.Element - for sub := c.subscribers.Front(); sub != nil; sub = next { - next = sub.Next() + for entry := c.subscribers.Front(); entry != nil; entry = next { + next = entry.Next() - ch, ok := (sub.Value).(chan<- Event) + sub, ok := (entry.Value).(subscriber) if !ok { - panic(fmt.Sprintf("controller: subscriber list element value has wrong type: %T", sub.Value)) + panic(fmt.Sprintf("controller: subscriber list element value has wrong type: %T", entry.Value)) } select { - case ch <- ev: + case <-sub.unsubscribe: + log.Printf("controller: removing subscriber '%v', because it has unsubscribed", sub.publish) + close(sub.publish) + c.subscribers.Remove(entry) default: - // subscriber is not responding... - // log.Printf("controller: removing subscriber '%v', because it is not responding", ch) - close(ch) - c.subscribers.Remove(sub) + select { + case sub.publish <- ev: + default: + // subscriber is not responding... + log.Printf("controller: removing subscriber '%v', because it is not responding", sub.publish) + close(sub.publish) + c.subscribers.Remove(entry) + } } } } @@ -197,7 +214,12 @@ func (c *Controller) LedBlink(num, wait byte) error { return c.setLed(num, wait+LED_BLINK_MIN) } -func (c *Controller) Subscribe(out chan<- Event) { - // log.Printf("controller: subscribing '%v' to events", out) - c.subscribers.PushBack(out) +func (c *Controller) Subscribe(out chan<- Event) chan<- struct{} { + c.subscribersLock.Lock() + defer c.subscribersLock.Unlock() + + log.Printf("controller: subscribing '%v' to events", out) + unsubscribe := make(chan struct{}) + c.subscribers.PushBack(subscriber{publish: out, unsubscribe: unsubscribe}) + return unsubscribe } diff --git a/pkg/mixer/mixer.go b/pkg/mixer/mixer.go index 09817ac..53b0410 100644 --- a/pkg/mixer/mixer.go +++ b/pkg/mixer/mixer.go @@ -26,8 +26,9 @@ import ( "container/list" "errors" "fmt" - // "log" + "log" "strings" + "sync" "github.com/scgolang/midi" ) @@ -114,10 +115,16 @@ func (e Event) String() string { return fmt.Sprintf("Event(%s) for channel %d: level=%s, muted=%s", e.Type, e.Channel, e.Level, e.Mute) } +type subscriber struct { + publish chan<- Event + unsubscribe <-chan struct{} +} + type Mixer struct { - DevIn *midi.Device - DevOut *midi.Device - subscribers map[Channel]*list.List + DevIn *midi.Device + DevOut *midi.Device + subscribersLock sync.Mutex + subscribers map[Channel]*list.List } func openDevice(devices []*midi.Device, prefix string) (d *midi.Device, err error) { @@ -153,24 +160,34 @@ func NewMixer(c Config) (*Mixer, error) { } func (m *Mixer) sendEvent(ev Event) { + m.subscribersLock.Lock() + defer m.subscribersLock.Unlock() + subs, exists := m.subscribers[ev.Channel] if exists && subs != nil { var next *list.Element - for sub := subs.Front(); sub != nil; sub = next { - next = sub.Next() + for entry := subs.Front(); entry != nil; entry = next { + next = entry.Next() - ch, ok := (sub.Value).(chan<- Event) + sub, ok := (entry.Value).(subscriber) if !ok { - panic(fmt.Sprintf("mixer: subscriber list element value has wrong type: %T", sub.Value)) + panic(fmt.Sprintf("mixer: subscriber list element value has wrong type: %T", entry.Value)) } select { - case ch <- ev: + case <-sub.unsubscribe: + log.Printf("mixer: removing subscriber '%v', because it has unsubscribed", sub.publish) + close(sub.publish) + subs.Remove(entry) default: - // subscriber is not responding... - // log.Printf("mixer: removing subscriber '%v', because it is not responding", ch) - close(ch) - subs.Remove(sub) + select { + case sub.publish <- ev: + default: + // subscriber is not responding... + log.Printf("mixer: removing subscriber '%v', because it is not responding", sub.publish) + close(sub.publish) + subs.Remove(entry) + } } } } @@ -257,13 +274,18 @@ func (m *Mixer) SetLevel(ch Channel, level FaderLevel) error { return nil } -func (m *Mixer) Subscribe(ch Channel, out chan<- Event) { +func (m *Mixer) Subscribe(ch Channel, out chan<- Event) chan<- struct{} { + m.subscribersLock.Lock() + defer m.subscribersLock.Unlock() + subs, exists := m.subscribers[ch] if !exists { subs = list.New() m.subscribers[ch] = subs } - // log.Printf("mixer: subscribing '%v' to events for channel: %v", out, ch) - subs.PushBack(out) + log.Printf("mixer: subscribing '%v' to events for channel: %v", out, ch) + unsubscribe := make(chan struct{}) + subs.PushBack(subscriber{publish: out, unsubscribe: unsubscribe}) + return unsubscribe } |