summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/daq/s5proxy/src/s5proxy/stats.go33
1 files changed, 21 insertions, 12 deletions
diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go
index 7b291b7..905c879 100644
--- a/src/daq/s5proxy/src/s5proxy/stats.go
+++ b/src/daq/s5proxy/src/s5proxy/stats.go
@@ -33,6 +33,7 @@
package main
import (
+ "encoding/json"
"errors"
"fmt"
"net"
@@ -114,10 +115,11 @@ func (sw StatsWorker) Run() {
}
type Stats struct {
- conf *Config
- sock net.Conn
- updates chan *DataUpdate
- workers map[string]StatsWorker
+ conf *Config
+ sock net.Conn
+ dataEncoder *json.Encoder
+ updates chan *DataUpdate
+ workers map[string]StatsWorker
}
func (s *Stats) GetUpdateChannel(url string) chan<- *ClientData {
@@ -181,8 +183,9 @@ func NewStats(conf *Config) (s *Stats, err error) {
return
}
s5l.Printf("STATS: connected to sfive-hub on '%s'", conf.SFive.Sock)
- s.updates = make(chan *DataUpdate, 100)
+ s.dataEncoder = json.NewEncoder(s.sock)
+ s.updates = make(chan *DataUpdate, 100)
s.workers = make(map[string]StatsWorker)
var content, format, quality []string
for _, desc := range conf.SFive.Streamer {
@@ -206,6 +209,18 @@ func NewStats(conf *Config) (s *Stats, err error) {
return
}
+func (s *Stats) sendUpdate(upd *DataUpdate) {
+ upd.Version = 1
+ upd.Hostname = s.conf.SFive.Hostname
+ upd.Tags = s.conf.SFive.Tags
+ upd.Duration = int64(s.conf.SFive.Duration)
+ // s5l.Printf("STATS: got data update for '%v'", upd)
+ if err := s.dataEncoder.Encode(upd); err != nil {
+ s5l.Printf("STATS: json encode(): %s", err)
+ // TODO: try reconnect
+ }
+}
+
func (s *Stats) Run() (err error) {
for _, worker := range s.workers {
go worker.Run()
@@ -228,13 +243,7 @@ func (s *Stats) Run() (err error) {
worker.trigger <- t
}
case upd := <-s.updates:
- upd.Version = 1
- upd.Hostname = s.conf.SFive.Hostname
- upd.Tags = s.conf.SFive.Tags
- upd.Duration = int64(s.conf.SFive.Duration)
- s5l.Printf("STATS: got data update for '%v'", upd)
- // TODO: send it out on socket
- // TODO: if sock error ... try reconnect
+ s.sendUpdate(upd)
}
}
return