summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/controller/controller.go52
-rw-r--r--pkg/mixer/mixer.go54
2 files changed, 75 insertions, 31 deletions
diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go
index 644cdd1..36c3776 100644
--- a/pkg/controller/controller.go
+++ b/pkg/controller/controller.go
@@ -26,8 +26,9 @@ import (
"container/list"
"errors"
"fmt"
- // "log"
+ "log"
"strings"
+ "sync"
"github.com/scgolang/midi"
)
@@ -71,9 +72,15 @@ type Event struct {
Type EventType
}
+type subscriber struct {
+ publish chan<- Event
+ unsubscribe <-chan struct{}
+}
+
type Controller struct {
- Dev *midi.Device
- subscribers list.List
+ Dev *midi.Device
+ subscribersLock sync.Mutex
+ subscribers list.List
}
func openDevice(devices []*midi.Device, prefix string) (d *midi.Device, err error) {
@@ -104,22 +111,32 @@ func NewController(c Config) (*Controller, error) {
}
func (c *Controller) sendEvent(ev Event) {
+ c.subscribersLock.Lock()
+ defer c.subscribersLock.Unlock()
+
var next *list.Element
- for sub := c.subscribers.Front(); sub != nil; sub = next {
- next = sub.Next()
+ for entry := c.subscribers.Front(); entry != nil; entry = next {
+ next = entry.Next()
- ch, ok := (sub.Value).(chan<- Event)
+ sub, ok := (entry.Value).(subscriber)
if !ok {
- panic(fmt.Sprintf("controller: subscriber list element value has wrong type: %T", sub.Value))
+ panic(fmt.Sprintf("controller: subscriber list element value has wrong type: %T", entry.Value))
}
select {
- case ch <- ev:
+ case <-sub.unsubscribe:
+ log.Printf("controller: removing subscriber '%v', because it has unsubscribed", sub.publish)
+ close(sub.publish)
+ c.subscribers.Remove(entry)
default:
- // subscriber is not responding...
- // log.Printf("controller: removing subscriber '%v', because it is not responding", ch)
- close(ch)
- c.subscribers.Remove(sub)
+ select {
+ case sub.publish <- ev:
+ default:
+ // subscriber is not responding...
+ log.Printf("controller: removing subscriber '%v', because it is not responding", sub.publish)
+ close(sub.publish)
+ c.subscribers.Remove(entry)
+ }
}
}
}
@@ -197,7 +214,12 @@ func (c *Controller) LedBlink(num, wait byte) error {
return c.setLed(num, wait+LED_BLINK_MIN)
}
-func (c *Controller) Subscribe(out chan<- Event) {
- // log.Printf("controller: subscribing '%v' to events", out)
- c.subscribers.PushBack(out)
+func (c *Controller) Subscribe(out chan<- Event) chan<- struct{} {
+ c.subscribersLock.Lock()
+ defer c.subscribersLock.Unlock()
+
+ log.Printf("controller: subscribing '%v' to events", out)
+ unsubscribe := make(chan struct{})
+ c.subscribers.PushBack(subscriber{publish: out, unsubscribe: unsubscribe})
+ return unsubscribe
}
diff --git a/pkg/mixer/mixer.go b/pkg/mixer/mixer.go
index 09817ac..53b0410 100644
--- a/pkg/mixer/mixer.go
+++ b/pkg/mixer/mixer.go
@@ -26,8 +26,9 @@ import (
"container/list"
"errors"
"fmt"
- // "log"
+ "log"
"strings"
+ "sync"
"github.com/scgolang/midi"
)
@@ -114,10 +115,16 @@ func (e Event) String() string {
return fmt.Sprintf("Event(%s) for channel %d: level=%s, muted=%s", e.Type, e.Channel, e.Level, e.Mute)
}
+type subscriber struct {
+ publish chan<- Event
+ unsubscribe <-chan struct{}
+}
+
type Mixer struct {
- DevIn *midi.Device
- DevOut *midi.Device
- subscribers map[Channel]*list.List
+ DevIn *midi.Device
+ DevOut *midi.Device
+ subscribersLock sync.Mutex
+ subscribers map[Channel]*list.List
}
func openDevice(devices []*midi.Device, prefix string) (d *midi.Device, err error) {
@@ -153,24 +160,34 @@ func NewMixer(c Config) (*Mixer, error) {
}
func (m *Mixer) sendEvent(ev Event) {
+ m.subscribersLock.Lock()
+ defer m.subscribersLock.Unlock()
+
subs, exists := m.subscribers[ev.Channel]
if exists && subs != nil {
var next *list.Element
- for sub := subs.Front(); sub != nil; sub = next {
- next = sub.Next()
+ for entry := subs.Front(); entry != nil; entry = next {
+ next = entry.Next()
- ch, ok := (sub.Value).(chan<- Event)
+ sub, ok := (entry.Value).(subscriber)
if !ok {
- panic(fmt.Sprintf("mixer: subscriber list element value has wrong type: %T", sub.Value))
+ panic(fmt.Sprintf("mixer: subscriber list element value has wrong type: %T", entry.Value))
}
select {
- case ch <- ev:
+ case <-sub.unsubscribe:
+ log.Printf("mixer: removing subscriber '%v', because it has unsubscribed", sub.publish)
+ close(sub.publish)
+ subs.Remove(entry)
default:
- // subscriber is not responding...
- // log.Printf("mixer: removing subscriber '%v', because it is not responding", ch)
- close(ch)
- subs.Remove(sub)
+ select {
+ case sub.publish <- ev:
+ default:
+ // subscriber is not responding...
+ log.Printf("mixer: removing subscriber '%v', because it is not responding", sub.publish)
+ close(sub.publish)
+ subs.Remove(entry)
+ }
}
}
}
@@ -257,13 +274,18 @@ func (m *Mixer) SetLevel(ch Channel, level FaderLevel) error {
return nil
}
-func (m *Mixer) Subscribe(ch Channel, out chan<- Event) {
+func (m *Mixer) Subscribe(ch Channel, out chan<- Event) chan<- struct{} {
+ m.subscribersLock.Lock()
+ defer m.subscribersLock.Unlock()
+
subs, exists := m.subscribers[ch]
if !exists {
subs = list.New()
m.subscribers[ch] = subs
}
- // log.Printf("mixer: subscribing '%v' to events for channel: %v", out, ch)
- subs.PushBack(out)
+ log.Printf("mixer: subscribing '%v' to events for channel: %v", out, ch)
+ unsubscribe := make(chan struct{})
+ subs.PushBack(subscriber{publish: out, unsubscribe: unsubscribe})
+ return unsubscribe
}