diff options
author | Christian Pointner <equinox@spreadspace.org> | 2019-02-14 22:19:48 +0100 |
---|---|---|
committer | Christian Pointner <equinox@spreadspace.org> | 2019-02-14 22:19:48 +0100 |
commit | df7430d20269c45843ae6cc56b5424bc98be2b31 (patch) | |
tree | 8b20a79d217bbded9184ce6ed8230e7f39972e44 /pkg/mixer/mixer.go | |
parent | controller error handling (diff) |
automatic reconnect for mixer devices
Diffstat (limited to 'pkg/mixer/mixer.go')
-rw-r--r-- | pkg/mixer/mixer.go | 152 |
1 files changed, 118 insertions, 34 deletions
diff --git a/pkg/mixer/mixer.go b/pkg/mixer/mixer.go index 96fbf8d..0eb5746 100644 --- a/pkg/mixer/mixer.go +++ b/pkg/mixer/mixer.go @@ -29,6 +29,7 @@ import ( "log" "strings" "sync" + "time" "github.com/scgolang/midi" ) @@ -121,8 +122,10 @@ type subscriber struct { } type Mixer struct { - DevIn *midi.Device - DevOut *midi.Device + config Config + devIn *midi.Device + devOutLock sync.Mutex + devOut *midi.Device subscribersLock sync.Mutex subscribers map[Channel]*list.List } @@ -145,13 +148,13 @@ func NewMixer(c Config) (*Mixer, error) { return nil, err } - // TODO: add support for DevIn == DevOut - m := &Mixer{} - if m.DevIn, err = openDevice(devices, c.DevIn); err != nil { + // TODO: add support for devIn == devOut + m := &Mixer{config: c} + if m.devIn, err = openDevice(devices, c.DevIn); err != nil { return nil, err } - m.DevIn.QueueSize = 100 - if m.DevOut, err = openDevice(devices, c.DevOut); err != nil { + m.devIn.QueueSize = 100 + if m.devOut, err = openDevice(devices, c.DevOut); err != nil { return nil, err } @@ -159,6 +162,57 @@ func NewMixer(c Config) (*Mixer, error) { return m, nil } +func (m *Mixer) reopenInput() { + for { + time.Sleep(time.Second) + + devices, err := midi.Devices() + if err != nil { + log.Printf("mixer: error listing midi devices: %v, retrying...", err) + continue + } + + if m.devIn, err = openDevice(devices, m.config.DevIn); err != nil { + log.Printf("mixer: error re-opening midi input device: %v, retrying...", err) + continue + } + + if err = m.Init(); err != nil { + log.Printf("mixer: error re-initializing midi input device: %v, retrying...", err) + + continue + } + + log.Printf("mixer: successfully re-initialized midi input device") + break + } +} + +func (m *Mixer) reopenOutput() { + for { + time.Sleep(time.Second) + + devices, err := midi.Devices() + if err != nil { + log.Printf("mixer: error listing midi devices: %v, retrying...", err) + continue + } + + newOut, err := openDevice(devices, m.config.DevOut) + if err != nil { + log.Printf("mixer: error re-opening midi output device: %v, retrying...", err) + continue + } + + log.Printf("mixer: successfully re-opened midi output device") + + m.devOutLock.Lock() + defer m.devOutLock.Unlock() + m.devOut = newOut + break + } +} + func (m *Mixer) publishEvent(ev Event) { m.subscribersLock.Lock() defer m.subscribersLock.Unlock() @@ -176,15 +230,14 @@ func (m *Mixer) publishEvent(ev Event) { select { case <-sub.unsubscribe: - log.Printf("mixer: removing subscriber '%v', because it has unsubscribed", sub.publish) + // log.Printf("mixer: removing subscriber '%v', because it has unsubscribed", sub.publish) close(sub.publish) subs.Remove(entry) default: select { case sub.publish <- ev: default: - // subscriber is not responding... - log.Printf("mixer: removing subscriber '%v', because it is not responding", sub.publish) + // log.Printf("mixer: removing subscriber '%v', because it is not responding", sub.publish) close(sub.publish) subs.Remove(entry) } @@ -213,44 +266,83 @@ func (m *Mixer) handleMidiPacket(p midi.Packet) { } func (m *Mixer) Init() error { - ch, err := m.DevIn.Packets() + ch, err := m.devIn.Packets() if err != nil { return err } go func() { for { - // TODO: handle Errors (reopen the device!) - m.handleMidiPacket(<-ch) + p := <-ch + if p.Err != nil { + log.Printf("mixer: got fatal error from midi input device: %v, trying to reopen it...", p.Err) + m.devIn.Close() + m.devIn = nil + go m.reopenInput() + return + } + m.handleMidiPacket(p) } }() return nil } func (m *Mixer) Shutdown() error { - if m.DevIn != nil { - m.DevIn.Close() + if m.devIn != nil { + m.devIn.Close() + m.devIn = nil } - if m.DevOut != nil { - m.DevOut.Close() + if m.devOut != nil { + m.devOut.Close() + m.devOut = nil + } + + m.subscribersLock.Lock() + defer m.subscribersLock.Unlock() + + for _, subs := range m.subscribers { + var next *list.Element + for entry := subs.Front(); entry != nil; entry = next { + next = entry.Next() + + sub, ok := (entry.Value).(subscriber) + if !ok { + panic(fmt.Sprintf("mixer: subscriber list element value has wrong type: %T", entry.Value)) + } + + close(sub.publish) + subs.Remove(entry) + } } - // TODO: also close all subscribed channels - // terminate go-routine started by Init() return nil } -func (m *Mixer) sendMute(channel byte, value byte) error { - n, err := m.DevOut.Write([]byte{CC_MUTE, channel, value}) +func (m *Mixer) sendMidiMessage(msg []byte) error { + m.devOutLock.Lock() + defer m.devOutLock.Unlock() + + if m.devOut == nil { + return errors.New("mixer: output device is not ready.") + } + + n, err := m.devOut.Write(msg) if err != nil { - // reopen device? + log.Printf("mixer: got fatal error from midi output device: %v, trying to reopen it...", err) + m.devOut.Close() + m.devOut = nil + go m.reopenOutput() return err } - if n != 3 { - return errors.New("sending mute command failed.") + if n != len(msg) { + return errors.New("sending midi message failed: short write") } return nil } +func (m *Mixer) sendMute(channel byte, value byte) error { + return m.sendMidiMessage([]byte{CC_MUTE, channel, value}) +} + func (m *Mixer) Mute(ch Channel) error { return m.sendMute(byte(ch), 0x7F) } @@ -263,15 +355,7 @@ func (m *Mixer) SetLevel(ch Channel, level FaderLevel) error { if level > FaderLevelMax { level = FaderLevelMax } - n, err := m.DevOut.Write([]byte{CC_FADER, byte(ch), byte(level)}) - if err != nil { - // reopen device? - return err - } - if n != 3 { - return errors.New("setting fader level failed.") - } - return nil + return m.sendMidiMessage([]byte{CC_FADER, byte(ch), byte(level)}) } func (m *Mixer) Subscribe(ch Channel, out chan<- Event) chan<- struct{} { @@ -284,7 +368,7 @@ func (m *Mixer) Subscribe(ch Channel, out chan<- Event) chan<- struct{} { m.subscribers[ch] = subs } - log.Printf("mixer: subscribing '%v' to events for channel: %v", out, ch) + // log.Printf("mixer: subscribing '%v' to events for channel: %v", out, ch) unsubscribe := make(chan struct{}) subs.PushBack(subscriber{publish: out, unsubscribe: unsubscribe}) return unsubscribe |