From 7d04d05d11dd293ac4a29d450d8bd9159cb5680e Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sun, 16 Oct 2016 17:38:02 +0200 Subject: introduced statsworker --- src/daq/s5proxy/sample.json | 7 +++- src/daq/s5proxy/src/s5proxy/config.go | 4 +- src/daq/s5proxy/src/s5proxy/stats.go | 74 ++++++++++++++++++++++++++--------- 3 files changed, 65 insertions(+), 20 deletions(-) (limited to 'src/daq/s5proxy') diff --git a/src/daq/s5proxy/sample.json b/src/daq/s5proxy/sample.json index b501382..06a3604 100644 --- a/src/daq/s5proxy/sample.json +++ b/src/daq/s5proxy/sample.json @@ -14,6 +14,11 @@ { "op": "time", "header": "Expires", "value": "-1s" } ], "sfive": { - "socket": "/var/run/sfive/pipegram" + "socket": "/var/run/sfive/pipegram", + "format" : "/${content-id}-${format}-${quality}", + "streamer": [ + "av-dt,av-en/flash,webm/high,medium,low,mini", + "audio-orig/mp3,ogg/high,medium,low" + } } } diff --git a/src/daq/s5proxy/src/s5proxy/config.go b/src/daq/s5proxy/src/s5proxy/config.go index da7360e..5f8dd0a 100644 --- a/src/daq/s5proxy/src/s5proxy/config.go +++ b/src/daq/s5proxy/src/s5proxy/config.go @@ -93,7 +93,9 @@ type HeaderOperation struct { } type SFiveConf struct { - Sock string `json:"socket"` + Sock string `json:"socket"` + Format string `json:"format"` + Streamer []string `json:"streamer"` } type Config struct { diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go index 1d12f74..e3499d1 100644 --- a/src/daq/s5proxy/src/s5proxy/stats.go +++ b/src/daq/s5proxy/src/s5proxy/stats.go @@ -33,15 +33,15 @@ package main import ( - "encoding/json" "net" + "strings" "time" ) // TODO: this is basically a copy from src/hub.... -type StreamId struct { - ContentId string `json:"content-id"` +type StreamID struct { + ContentID string `json:"content-id"` Format string `json:"format"` Quality string `json:"quality"` } @@ -62,24 +62,57 @@ type SourceData struct { type DataUpdate struct { Version uint `json:"version"` Hostname string `json:"hostname"` - StreamId StreamId `json:"streamer-id"` + StreamID StreamID `json:"streamer-id"` Tags []string `json:"tags,omitempty"` StartTime time.Time `json:"start-time"` Duration int64 `json:"duration-ms"` Data SourceData `json:"data"` } +type StatsWorker struct { + stream StreamID + current DataUpdate + trigger chan time.Time + output chan<- DataUpdate + input chan ClientData +} + +func NewStatsWorker(stream StreamID, updates chan<- DataUpdate) (sw StatsWorker) { + sw.stream = stream + sw.trigger = make(chan time.Time) + sw.output = updates + sw.input = make(chan ClientData, 100) + return +} + +func (sw StatsWorker) Run() { + for { + select { + case t := <-sw.trigger: + s5l.Printf("STATS(%v): sending report @%v", sw.stream, t) + // TODO: send out `current` on `output` and reset it + case upd := <-sw.input: + s5l.Printf("STATS(%v): got client data: IP: %s, UA: %s, %d Bytes", sw.stream, upd.IP, upd.UserAgent, upd.BytesSent) + // TODO: add `upd` to `current` + } + } +} + type Stats struct { - conf *Config - sock net.Conn - // TODO: create datastructure to hold update channels - updateCH chan ClientData + conf *Config + sock net.Conn + updates chan DataUpdate + workers map[string]StatsWorker } func (s *Stats) GetUpdateChannel(url string) chan<- ClientData { s5l.Printf("STATS: got new client for url: %s", url) - - return s.updateCH + for name, worker := range s.workers { + if strings.Contains(url, name) { + return worker.input + } + } + return nil } func NewStats(conf *Config) (s *Stats, err error) { @@ -88,23 +121,28 @@ func NewStats(conf *Config) (s *Stats, err error) { return } s5l.Printf("STATS: connected to sfive-hub on '%s'", conf.SFive.Sock) + s.updates = make(chan DataUpdate, 100) - // TODO: create update-channels according to conf - s.updateCH = make(chan ClientData, 512) + s.workers = make(map[string]StatsWorker) + // TODO: create stats-worker depending on conf.SFive.Streamer + // TODO: generate stats-worker name using conf.SFive.Format + s.workers["av-orig-webm-high"] = NewStatsWorker(StreamID{"av-orig", "webm", "high"}, s.updates) return } func (s *Stats) Run() (err error) { - update := DataUpdate{Version: 1, Hostname: "hugo", StartTime: time.Now()} - update.Tags = append(update.Tags, "hello", "world") - if err = json.NewEncoder(s.sock).Encode(update); err != nil { - return + for _, worker := range s.workers { + go worker.Run() } + // TODO: create ticker for { select { - case upd := <-s.updateCH: - s5l.Printf("STATS: got client data: IP: %s, UA: %s, %d Bytes", upd.IP, upd.UserAgent, upd.BytesSent) + //TODO: wait for ticker + case upd := <-s.updates: + s5l.Printf("STATS: got data update for '%v'", upd.StreamID) + // TODO: send it out on socket + // TODO: if sock error ... try reconnect } } return -- cgit v1.2.3