summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2016-10-17 02:36:45 +0200
committerChristian Pointner <equinox@spreadspace.org>2016-10-17 02:36:45 +0200
commitef13dfe3602de2e3c1792d213afb6e98fda643d7 (patch)
treef8372f1a783c1ee4fc0b7f9dca8d66dc353ff127
parentgenerating streamer according to config works now (diff)
data collection works now
-rw-r--r--src/daq/s5proxy/sample.json5
-rw-r--r--src/daq/s5proxy/src/s5proxy/config.go2
-rw-r--r--src/daq/s5proxy/src/s5proxy/stats.go30
3 files changed, 26 insertions, 11 deletions
diff --git a/src/daq/s5proxy/sample.json b/src/daq/s5proxy/sample.json
index b746f03..2cf9074 100644
--- a/src/daq/s5proxy/sample.json
+++ b/src/daq/s5proxy/sample.json
@@ -15,11 +15,12 @@
],
"sfive": {
"socket": "/var/run/sfive/pipegram",
+ "hostname": "public1",
+ "tags": [ "hello", "world" ],
"duration": "15s",
"format" : "/${content-id}-${format}-${quality}",
"streamer": [
- "av-dt, av-en / flash, webm / high,medium ,low,mini",
- "audio-orig/mp3,ogg/high,medium,low"
+ "av-orig/flash,webm/high,medium,low,mini"
]
}
}
diff --git a/src/daq/s5proxy/src/s5proxy/config.go b/src/daq/s5proxy/src/s5proxy/config.go
index cf03bbc..c36d60d 100644
--- a/src/daq/s5proxy/src/s5proxy/config.go
+++ b/src/daq/s5proxy/src/s5proxy/config.go
@@ -107,6 +107,8 @@ func (d *Duration) UnmarshalText(data []byte) (err error) {
type SFiveConf struct {
Sock string `json:"socket"`
+ Hostname string `json:"hostname"`
+ Tags []string `json:"tags"`
Duration Duration `json:"duration"`
Format string `json:"format"`
Streamer []string `json:"streamer"`
diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go
index acee5ec..ddc3885 100644
--- a/src/daq/s5proxy/src/s5proxy/stats.go
+++ b/src/daq/s5proxy/src/s5proxy/stats.go
@@ -74,7 +74,7 @@ type DataUpdate struct {
type StatsWorker struct {
stream StreamID
- current *DataUpdate
+ current map[string]*ClientData
trigger chan time.Time
output chan<- *DataUpdate
input chan *ClientData
@@ -82,7 +82,7 @@ type StatsWorker struct {
func NewStatsWorker(stream StreamID, updates chan<- *DataUpdate) (sw StatsWorker) {
sw.stream = stream
- sw.current = &DataUpdate{StreamID: sw.stream}
+ sw.current = make(map[string]*ClientData)
sw.trigger = make(chan time.Time)
sw.output = updates
sw.input = make(chan *ClientData, 100)
@@ -94,13 +94,21 @@ func (sw StatsWorker) Run() {
select {
case t := <-sw.trigger:
if t.UnixNano() != 0 {
- s5l.Printf("STATS(%v): sending report @%v", sw.stream, t)
- // TODO: send out `current` on `output`
+ upd := &DataUpdate{StreamID: sw.stream, StartTime: t}
+ upd.Data.ClientCount = uint(len(sw.current))
+ for _, c := range sw.current {
+ upd.Data.Clients = append(upd.Data.Clients, *c)
+ upd.Data.BytesSent += c.BytesSent
+ }
+ sw.output <- upd
}
- // TODO: reset `current`
+ sw.current = make(map[string]*ClientData)
case upd := <-sw.input:
- s5l.Printf("STATS(%v): got client data: IP: %s, UA: %s, %d Bytes", sw.stream, upd.IP, upd.UserAgent, upd.BytesSent)
- // TODO: add `upd` to `current`
+ if data, exists := sw.current[upd.IP]; exists {
+ data.BytesSent += upd.BytesSent
+ } else {
+ sw.current[upd.IP] = upd
+ }
}
}
}
@@ -219,8 +227,12 @@ func (s *Stats) Run() (err error) {
worker.trigger <- t
}
case upd := <-s.updates:
- s5l.Printf("STATS: got data update for '%v'", upd.StreamID)
- // TODO: fill missing info and send it out on socket
+ upd.Version = 1
+ upd.Hostname = s.conf.SFive.Hostname
+ upd.Tags = s.conf.SFive.Tags
+ upd.Duration = int64(s.conf.SFive.Duration.Value / time.Millisecond)
+ s5l.Printf("STATS: got data update for '%v'", upd)
+ // TODO: send it out on socket
// TODO: if sock error ... try reconnect
}
}