diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/daq/s5proxy/src/s5proxy/stats.go | 60 |
1 files changed, 39 insertions, 21 deletions
diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go index 905c879..2b3c5e2 100644 --- a/src/daq/s5proxy/src/s5proxy/stats.go +++ b/src/daq/s5proxy/src/s5proxy/stats.go @@ -101,7 +101,11 @@ func (sw StatsWorker) Run() { upd.Data.Clients = append(upd.Data.Clients, *c) upd.Data.BytesSent += c.BytesSent } - sw.output <- upd + select { + case sw.output <- upd: + default: + s5l.Printf("STATS: worker(%v): writing to output channel would block, dropping data update...", sw.stream) + } } sw.current = make(map[string]*ClientData) case upd := <-sw.input: @@ -179,11 +183,9 @@ func generateStreamerName(format, c, f, q string) string { func NewStats(conf *Config) (s *Stats, err error) { s = &Stats{conf: conf} - if s.sock, err = net.DialTimeout("unixgram", conf.SFive.Sock, time.Second); err != nil { + if err = s.connectToHub(); err != nil { return } - s5l.Printf("STATS: connected to sfive-hub on '%s'", conf.SFive.Sock) - s.dataEncoder = json.NewEncoder(s.sock) s.updates = make(chan *DataUpdate, 100) s.workers = make(map[string]StatsWorker) @@ -209,15 +211,34 @@ func NewStats(conf *Config) (s *Stats, err error) { return } -func (s *Stats) sendUpdate(upd *DataUpdate) { - upd.Version = 1 - upd.Hostname = s.conf.SFive.Hostname - upd.Tags = s.conf.SFive.Tags - upd.Duration = int64(s.conf.SFive.Duration) - // s5l.Printf("STATS: got data update for '%v'", upd) - if err := s.dataEncoder.Encode(upd); err != nil { - s5l.Printf("STATS: json encode(): %s", err) - // TODO: try reconnect +func (s *Stats) connectToHub() (err error) { + if s.sock, err = net.DialTimeout("unixgram", s.conf.SFive.Sock, time.Second); err != nil { + return + } + s5l.Printf("STATS: connected to sfive-hub on '%s'", s.conf.SFive.Sock) + s.dataEncoder = json.NewEncoder(s.sock) + return +} + +func (s *Stats) sendUpdates() { + for { + upd := <-s.updates + upd.Version = 1 + upd.Hostname = s.conf.SFive.Hostname + upd.Tags = s.conf.SFive.Tags + upd.Duration = int64(s.conf.SFive.Duration) + // s5l.Printf("STATS: got data update for '%v'", upd) + if err := s.dataEncoder.Encode(upd); err != nil { + s5l.Printf("STATS: json encode(): %s", err) + // TODO: check if error comes from the socket... + for { + time.Sleep(5 * time.Second) + s5l.Printf("STATS: attempting reconnect to hub") + if err := s.connectToHub(); err == nil { + break + } + } + } } } @@ -235,15 +256,12 @@ func (s *Stats) Run() (err error) { for _, worker := range s.workers { worker.trigger <- time.Unix(0, 0) // stats-worker will not send data but just reset } + go s.sendUpdates() for { - select { - case t := <-ticker.C: - t = t.Add(-1 * d) - for _, worker := range s.workers { - worker.trigger <- t - } - case upd := <-s.updates: - s.sendUpdate(upd) + t := <-ticker.C + t = t.Add(-1 * d) + for _, worker := range s.workers { + worker.trigger <- t } } return |