summaryrefslogtreecommitdiff
path: root/pkg/mixer/mixer.go
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2019-02-14 20:03:00 +0100
committerChristian Pointner <equinox@spreadspace.org>2019-02-14 20:03:00 +0100
commit1dd2b0a9ea1119139fdbd97d6ac810c7ddb5698f (patch)
tree51dc0ef0a74f560cb01f054446a345814f96992a /pkg/mixer/mixer.go
parentmake code a little nicer (diff)
make subscribtions thread safe and more bullet proof
Diffstat (limited to 'pkg/mixer/mixer.go')
-rw-r--r--pkg/mixer/mixer.go54
1 files changed, 38 insertions, 16 deletions
diff --git a/pkg/mixer/mixer.go b/pkg/mixer/mixer.go
index 09817ac..53b0410 100644
--- a/pkg/mixer/mixer.go
+++ b/pkg/mixer/mixer.go
@@ -26,8 +26,9 @@ import (
"container/list"
"errors"
"fmt"
- // "log"
+ "log"
"strings"
+ "sync"
"github.com/scgolang/midi"
)
@@ -114,10 +115,16 @@ func (e Event) String() string {
return fmt.Sprintf("Event(%s) for channel %d: level=%s, muted=%s", e.Type, e.Channel, e.Level, e.Mute)
}
+type subscriber struct {
+ publish chan<- Event
+ unsubscribe <-chan struct{}
+}
+
type Mixer struct {
- DevIn *midi.Device
- DevOut *midi.Device
- subscribers map[Channel]*list.List
+ DevIn *midi.Device
+ DevOut *midi.Device
+ subscribersLock sync.Mutex
+ subscribers map[Channel]*list.List
}
func openDevice(devices []*midi.Device, prefix string) (d *midi.Device, err error) {
@@ -153,24 +160,34 @@ func NewMixer(c Config) (*Mixer, error) {
}
func (m *Mixer) sendEvent(ev Event) {
+ m.subscribersLock.Lock()
+ defer m.subscribersLock.Unlock()
+
subs, exists := m.subscribers[ev.Channel]
if exists && subs != nil {
var next *list.Element
- for sub := subs.Front(); sub != nil; sub = next {
- next = sub.Next()
+ for entry := subs.Front(); entry != nil; entry = next {
+ next = entry.Next()
- ch, ok := (sub.Value).(chan<- Event)
+ sub, ok := (entry.Value).(subscriber)
if !ok {
- panic(fmt.Sprintf("mixer: subscriber list element value has wrong type: %T", sub.Value))
+ panic(fmt.Sprintf("mixer: subscriber list element value has wrong type: %T", entry.Value))
}
select {
- case ch <- ev:
+ case <-sub.unsubscribe:
+ log.Printf("mixer: removing subscriber '%v', because it has unsubscribed", sub.publish)
+ close(sub.publish)
+ subs.Remove(entry)
default:
- // subscriber is not responding...
- // log.Printf("mixer: removing subscriber '%v', because it is not responding", ch)
- close(ch)
- subs.Remove(sub)
+ select {
+ case sub.publish <- ev:
+ default:
+ // subscriber is not responding...
+ log.Printf("mixer: removing subscriber '%v', because it is not responding", sub.publish)
+ close(sub.publish)
+ subs.Remove(entry)
+ }
}
}
}
@@ -257,13 +274,18 @@ func (m *Mixer) SetLevel(ch Channel, level FaderLevel) error {
return nil
}
-func (m *Mixer) Subscribe(ch Channel, out chan<- Event) {
+func (m *Mixer) Subscribe(ch Channel, out chan<- Event) chan<- struct{} {
+ m.subscribersLock.Lock()
+ defer m.subscribersLock.Unlock()
+
subs, exists := m.subscribers[ch]
if !exists {
subs = list.New()
m.subscribers[ch] = subs
}
- // log.Printf("mixer: subscribing '%v' to events for channel: %v", out, ch)
- subs.PushBack(out)
+ log.Printf("mixer: subscribing '%v' to events for channel: %v", out, ch)
+ unsubscribe := make(chan struct{})
+ subs.PushBack(subscriber{publish: out, unsubscribe: unsubscribe})
+ return unsubscribe
}