From ef13dfe3602de2e3c1792d213afb6e98fda643d7 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Mon, 17 Oct 2016 02:36:45 +0200 Subject: data collection works now --- src/daq/s5proxy/sample.json | 5 +++-- src/daq/s5proxy/src/s5proxy/config.go | 2 ++ src/daq/s5proxy/src/s5proxy/stats.go | 30 +++++++++++++++++++++--------- 3 files changed, 26 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/daq/s5proxy/sample.json b/src/daq/s5proxy/sample.json index b746f03..2cf9074 100644 --- a/src/daq/s5proxy/sample.json +++ b/src/daq/s5proxy/sample.json @@ -15,11 +15,12 @@ ], "sfive": { "socket": "/var/run/sfive/pipegram", + "hostname": "public1", + "tags": [ "hello", "world" ], "duration": "15s", "format" : "/${content-id}-${format}-${quality}", "streamer": [ - "av-dt, av-en / flash, webm / high,medium ,low,mini", - "audio-orig/mp3,ogg/high,medium,low" + "av-orig/flash,webm/high,medium,low,mini" ] } } diff --git a/src/daq/s5proxy/src/s5proxy/config.go b/src/daq/s5proxy/src/s5proxy/config.go index cf03bbc..c36d60d 100644 --- a/src/daq/s5proxy/src/s5proxy/config.go +++ b/src/daq/s5proxy/src/s5proxy/config.go @@ -107,6 +107,8 @@ func (d *Duration) UnmarshalText(data []byte) (err error) { type SFiveConf struct { Sock string `json:"socket"` + Hostname string `json:"hostname"` + Tags []string `json:"tags"` Duration Duration `json:"duration"` Format string `json:"format"` Streamer []string `json:"streamer"` diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go index acee5ec..ddc3885 100644 --- a/src/daq/s5proxy/src/s5proxy/stats.go +++ b/src/daq/s5proxy/src/s5proxy/stats.go @@ -74,7 +74,7 @@ type DataUpdate struct { type StatsWorker struct { stream StreamID - current *DataUpdate + current map[string]*ClientData trigger chan time.Time output chan<- *DataUpdate input chan *ClientData @@ -82,7 +82,7 @@ type StatsWorker struct { func NewStatsWorker(stream StreamID, updates chan<- *DataUpdate) (sw StatsWorker) { sw.stream = stream - sw.current = &DataUpdate{StreamID: sw.stream} + sw.current = make(map[string]*ClientData) sw.trigger = make(chan time.Time) sw.output = updates sw.input = make(chan *ClientData, 100) @@ -94,13 +94,21 @@ func (sw StatsWorker) Run() { select { case t := <-sw.trigger: if t.UnixNano() != 0 { - s5l.Printf("STATS(%v): sending report @%v", sw.stream, t) - // TODO: send out `current` on `output` + upd := &DataUpdate{StreamID: sw.stream, StartTime: t} + upd.Data.ClientCount = uint(len(sw.current)) + for _, c := range sw.current { + upd.Data.Clients = append(upd.Data.Clients, *c) + upd.Data.BytesSent += c.BytesSent + } + sw.output <- upd } - // TODO: reset `current` + sw.current = make(map[string]*ClientData) case upd := <-sw.input: - s5l.Printf("STATS(%v): got client data: IP: %s, UA: %s, %d Bytes", sw.stream, upd.IP, upd.UserAgent, upd.BytesSent) - // TODO: add `upd` to `current` + if data, exists := sw.current[upd.IP]; exists { + data.BytesSent += upd.BytesSent + } else { + sw.current[upd.IP] = upd + } } } } @@ -219,8 +227,12 @@ func (s *Stats) Run() (err error) { worker.trigger <- t } case upd := <-s.updates: - s5l.Printf("STATS: got data update for '%v'", upd.StreamID) - // TODO: fill missing info and send it out on socket + upd.Version = 1 + upd.Hostname = s.conf.SFive.Hostname + upd.Tags = s.conf.SFive.Tags + upd.Duration = int64(s.conf.SFive.Duration.Value / time.Millisecond) + s5l.Printf("STATS: got data update for '%v'", upd) + // TODO: send it out on socket // TODO: if sock error ... try reconnect } } -- cgit v1.2.3