summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/daq/s5proxy/src/s5proxy/stats.go60
1 files changed, 39 insertions, 21 deletions
diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go
index 905c879..2b3c5e2 100644
--- a/src/daq/s5proxy/src/s5proxy/stats.go
+++ b/src/daq/s5proxy/src/s5proxy/stats.go
@@ -101,7 +101,11 @@ func (sw StatsWorker) Run() {
upd.Data.Clients = append(upd.Data.Clients, *c)
upd.Data.BytesSent += c.BytesSent
}
- sw.output <- upd
+ select {
+ case sw.output <- upd:
+ default:
+ s5l.Printf("STATS: worker(%v): writing to output channel would block, dropping data update...", sw.stream)
+ }
}
sw.current = make(map[string]*ClientData)
case upd := <-sw.input:
@@ -179,11 +183,9 @@ func generateStreamerName(format, c, f, q string) string {
func NewStats(conf *Config) (s *Stats, err error) {
s = &Stats{conf: conf}
- if s.sock, err = net.DialTimeout("unixgram", conf.SFive.Sock, time.Second); err != nil {
+ if err = s.connectToHub(); err != nil {
return
}
- s5l.Printf("STATS: connected to sfive-hub on '%s'", conf.SFive.Sock)
- s.dataEncoder = json.NewEncoder(s.sock)
s.updates = make(chan *DataUpdate, 100)
s.workers = make(map[string]StatsWorker)
@@ -209,15 +211,34 @@ func NewStats(conf *Config) (s *Stats, err error) {
return
}
-func (s *Stats) sendUpdate(upd *DataUpdate) {
- upd.Version = 1
- upd.Hostname = s.conf.SFive.Hostname
- upd.Tags = s.conf.SFive.Tags
- upd.Duration = int64(s.conf.SFive.Duration)
- // s5l.Printf("STATS: got data update for '%v'", upd)
- if err := s.dataEncoder.Encode(upd); err != nil {
- s5l.Printf("STATS: json encode(): %s", err)
- // TODO: try reconnect
+func (s *Stats) connectToHub() (err error) {
+ if s.sock, err = net.DialTimeout("unixgram", s.conf.SFive.Sock, time.Second); err != nil {
+ return
+ }
+ s5l.Printf("STATS: connected to sfive-hub on '%s'", s.conf.SFive.Sock)
+ s.dataEncoder = json.NewEncoder(s.sock)
+ return
+}
+
+func (s *Stats) sendUpdates() {
+ for {
+ upd := <-s.updates
+ upd.Version = 1
+ upd.Hostname = s.conf.SFive.Hostname
+ upd.Tags = s.conf.SFive.Tags
+ upd.Duration = int64(s.conf.SFive.Duration)
+ // s5l.Printf("STATS: got data update for '%v'", upd)
+ if err := s.dataEncoder.Encode(upd); err != nil {
+ s5l.Printf("STATS: json encode(): %s", err)
+ // TODO: check if error comes from the socket...
+ for {
+ time.Sleep(5 * time.Second)
+ s5l.Printf("STATS: attempting reconnect to hub")
+ if err := s.connectToHub(); err == nil {
+ break
+ }
+ }
+ }
}
}
@@ -235,15 +256,12 @@ func (s *Stats) Run() (err error) {
for _, worker := range s.workers {
worker.trigger <- time.Unix(0, 0) // stats-worker will not send data but just reset
}
+ go s.sendUpdates()
for {
- select {
- case t := <-ticker.C:
- t = t.Add(-1 * d)
- for _, worker := range s.workers {
- worker.trigger <- t
- }
- case upd := <-s.updates:
- s.sendUpdate(upd)
+ t := <-ticker.C
+ t = t.Add(-1 * d)
+ for _, worker := range s.workers {
+ worker.trigger <- t
}
}
return