summaryrefslogtreecommitdiff
path: root/pkg/controller
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2019-02-14 20:03:00 +0100
committerChristian Pointner <equinox@spreadspace.org>2019-02-14 20:03:00 +0100
commit1dd2b0a9ea1119139fdbd97d6ac810c7ddb5698f (patch)
tree51dc0ef0a74f560cb01f054446a345814f96992a /pkg/controller
parentmake code a little nicer (diff)
make subscribtions thread safe and more bullet proof
Diffstat (limited to 'pkg/controller')
-rw-r--r--pkg/controller/controller.go52
1 files changed, 37 insertions, 15 deletions
diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go
index 644cdd1..36c3776 100644
--- a/pkg/controller/controller.go
+++ b/pkg/controller/controller.go
@@ -26,8 +26,9 @@ import (
"container/list"
"errors"
"fmt"
- // "log"
+ "log"
"strings"
+ "sync"
"github.com/scgolang/midi"
)
@@ -71,9 +72,15 @@ type Event struct {
Type EventType
}
+type subscriber struct {
+ publish chan<- Event
+ unsubscribe <-chan struct{}
+}
+
type Controller struct {
- Dev *midi.Device
- subscribers list.List
+ Dev *midi.Device
+ subscribersLock sync.Mutex
+ subscribers list.List
}
func openDevice(devices []*midi.Device, prefix string) (d *midi.Device, err error) {
@@ -104,22 +111,32 @@ func NewController(c Config) (*Controller, error) {
}
func (c *Controller) sendEvent(ev Event) {
+ c.subscribersLock.Lock()
+ defer c.subscribersLock.Unlock()
+
var next *list.Element
- for sub := c.subscribers.Front(); sub != nil; sub = next {
- next = sub.Next()
+ for entry := c.subscribers.Front(); entry != nil; entry = next {
+ next = entry.Next()
- ch, ok := (sub.Value).(chan<- Event)
+ sub, ok := (entry.Value).(subscriber)
if !ok {
- panic(fmt.Sprintf("controller: subscriber list element value has wrong type: %T", sub.Value))
+ panic(fmt.Sprintf("controller: subscriber list element value has wrong type: %T", entry.Value))
}
select {
- case ch <- ev:
+ case <-sub.unsubscribe:
+ log.Printf("controller: removing subscriber '%v', because it has unsubscribed", sub.publish)
+ close(sub.publish)
+ c.subscribers.Remove(entry)
default:
- // subscriber is not responding...
- // log.Printf("controller: removing subscriber '%v', because it is not responding", ch)
- close(ch)
- c.subscribers.Remove(sub)
+ select {
+ case sub.publish <- ev:
+ default:
+ // subscriber is not responding...
+ log.Printf("controller: removing subscriber '%v', because it is not responding", sub.publish)
+ close(sub.publish)
+ c.subscribers.Remove(entry)
+ }
}
}
}
@@ -197,7 +214,12 @@ func (c *Controller) LedBlink(num, wait byte) error {
return c.setLed(num, wait+LED_BLINK_MIN)
}
-func (c *Controller) Subscribe(out chan<- Event) {
- // log.Printf("controller: subscribing '%v' to events", out)
- c.subscribers.PushBack(out)
+func (c *Controller) Subscribe(out chan<- Event) chan<- struct{} {
+ c.subscribersLock.Lock()
+ defer c.subscribersLock.Unlock()
+
+ log.Printf("controller: subscribing '%v' to events", out)
+ unsubscribe := make(chan struct{})
+ c.subscribers.PushBack(subscriber{publish: out, unsubscribe: unsubscribe})
+ return unsubscribe
}