diff options
Diffstat (limited to 'pkg/controller')
-rw-r--r-- | pkg/controller/controller.go | 52 |
1 files changed, 37 insertions, 15 deletions
diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 644cdd1..36c3776 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -26,8 +26,9 @@ import ( "container/list" "errors" "fmt" - // "log" + "log" "strings" + "sync" "github.com/scgolang/midi" ) @@ -71,9 +72,15 @@ type Event struct { Type EventType } +type subscriber struct { + publish chan<- Event + unsubscribe <-chan struct{} +} + type Controller struct { - Dev *midi.Device - subscribers list.List + Dev *midi.Device + subscribersLock sync.Mutex + subscribers list.List } func openDevice(devices []*midi.Device, prefix string) (d *midi.Device, err error) { @@ -104,22 +111,32 @@ func NewController(c Config) (*Controller, error) { } func (c *Controller) sendEvent(ev Event) { + c.subscribersLock.Lock() + defer c.subscribersLock.Unlock() + var next *list.Element - for sub := c.subscribers.Front(); sub != nil; sub = next { - next = sub.Next() + for entry := c.subscribers.Front(); entry != nil; entry = next { + next = entry.Next() - ch, ok := (sub.Value).(chan<- Event) + sub, ok := (entry.Value).(subscriber) if !ok { - panic(fmt.Sprintf("controller: subscriber list element value has wrong type: %T", sub.Value)) + panic(fmt.Sprintf("controller: subscriber list element value has wrong type: %T", entry.Value)) } select { - case ch <- ev: + case <-sub.unsubscribe: + log.Printf("controller: removing subscriber '%v', because it has unsubscribed", sub.publish) + close(sub.publish) + c.subscribers.Remove(entry) default: - // subscriber is not responding... - // log.Printf("controller: removing subscriber '%v', because it is not responding", ch) - close(ch) - c.subscribers.Remove(sub) + select { + case sub.publish <- ev: + default: + // subscriber is not responding... + log.Printf("controller: removing subscriber '%v', because it is not responding", sub.publish) + close(sub.publish) + c.subscribers.Remove(entry) + } } } } @@ -197,7 +214,12 @@ func (c *Controller) LedBlink(num, wait byte) error { return c.setLed(num, wait+LED_BLINK_MIN) } -func (c *Controller) Subscribe(out chan<- Event) { - // log.Printf("controller: subscribing '%v' to events", out) - c.subscribers.PushBack(out) +func (c *Controller) Subscribe(out chan<- Event) chan<- struct{} { + c.subscribersLock.Lock() + defer c.subscribersLock.Unlock() + + log.Printf("controller: subscribing '%v' to events", out) + unsubscribe := make(chan struct{}) + c.subscribers.PushBack(subscriber{publish: out, unsubscribe: unsubscribe}) + return unsubscribe } |