From 9c33cf9bf5bbc9eaed2cbd9c66d897c099a948cf Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sun, 16 Oct 2016 19:02:05 +0200 Subject: generating streamer according to config works now --- src/daq/s5proxy/sample.json | 5 +- src/daq/s5proxy/src/s5proxy/config.go | 15 +++++ src/daq/s5proxy/src/s5proxy/main.go | 111 +++++++++++++++++++++++++++++++++ src/daq/s5proxy/src/s5proxy/proxy.go | 4 +- src/daq/s5proxy/src/s5proxy/s5proxy.go | 111 --------------------------------- src/daq/s5proxy/src/s5proxy/stats.go | 111 ++++++++++++++++++++++++++++----- 6 files changed, 226 insertions(+), 131 deletions(-) create mode 100644 src/daq/s5proxy/src/s5proxy/main.go delete mode 100644 src/daq/s5proxy/src/s5proxy/s5proxy.go diff --git a/src/daq/s5proxy/sample.json b/src/daq/s5proxy/sample.json index 06a3604..b746f03 100644 --- a/src/daq/s5proxy/sample.json +++ b/src/daq/s5proxy/sample.json @@ -15,10 +15,11 @@ ], "sfive": { "socket": "/var/run/sfive/pipegram", + "duration": "15s", "format" : "/${content-id}-${format}-${quality}", "streamer": [ - "av-dt,av-en/flash,webm/high,medium,low,mini", + "av-dt, av-en / flash, webm / high,medium ,low,mini", "audio-orig/mp3,ogg/high,medium,low" - } + ] } } diff --git a/src/daq/s5proxy/src/s5proxy/config.go b/src/daq/s5proxy/src/s5proxy/config.go index 5f8dd0a..cf03bbc 100644 --- a/src/daq/s5proxy/src/s5proxy/config.go +++ b/src/daq/s5proxy/src/s5proxy/config.go @@ -36,6 +36,7 @@ import ( "encoding/json" "errors" "os" + "time" ) type Operation uint @@ -92,8 +93,21 @@ type HeaderOperation struct { Value string `json:"value,omitempty"` } +type Duration struct { + Value time.Duration +} + +func (d *Duration) UnmarshalText(data []byte) (err error) { + d.Value, err = time.ParseDuration(string(data)) + if err == nil && d.Value < 0 { + err = errors.New("negative durations are not allowed") + } + return +} + type SFiveConf struct { Sock string `json:"socket"` + Duration Duration `json:"duration"` Format string `json:"format"` Streamer []string `json:"streamer"` } @@ -118,6 +132,7 @@ func readConfig(configfile string) (conf *Config, err error) { defer f.Close() conf = &Config{} + conf.SFive.Duration.Value = 10 * time.Second err = json.NewDecoder(f).Decode(conf) return } diff --git a/src/daq/s5proxy/src/s5proxy/main.go b/src/daq/s5proxy/src/s5proxy/main.go new file mode 100644 index 0000000..fd57e42 --- /dev/null +++ b/src/daq/s5proxy/src/s5proxy/main.go @@ -0,0 +1,111 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2016 Christian Pointner +// Markus Grüneis +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see . +// + +package main + +import ( + "flag" + "log" + "os" + "os/signal" +) + +var s5l = log.New(os.Stderr, "[s5proxy]\t", log.LstdFlags) + +func main() { + cf := flag.String("config", "/etc/s5proxy/default.json", "path to the config file") + help := flag.Bool("help", false, "show usage") + + flag.Parse() + + s5l.Printf("Hello, world.\n") + if *help { + flag.Usage() + return + } + + conf, err := readConfig(*cf) + if err != nil { + s5l.Printf("Error reading config file: %v\n", err) + os.Exit(1) + } + + var stats *Stats + if conf.SFive.Sock != "" { + stats, err = NewStats(conf) + if err != nil { + s5l.Printf("Error creating sfive-stats-module: %v\n", err) + os.Exit(1) + } + } + + var proxy *Proxy + proxy, err = NewProxy(conf, stats) + if err != nil { + s5l.Printf("Error creating HTTP-Proxy: %v\n", err) + os.Exit(1) + } + + done := make(chan bool) + + if stats != nil { + go func() { + defer func() { done <- true }() + s5l.Println("running sfive-stats-module") + if err := stats.Run(); err != nil { + s5l.Printf("ERROR: sfive-stats-module: %v", err) + } + s5l.Println("ERROR: sfive-stats-module stopped") + }() + } + + go func() { + defer func() { done <- true }() + s5l.Println("running HTTP-Proxy") + if err := proxy.Run(); err != nil { + s5l.Printf("ERROR: HTTP-Proxy: %v", err) + } + s5l.Println("ERROR: HTTP-Proxy stopped") + }() + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + + select { + case <-c: + s5l.Println("received interrupt, shutdown") + return + case <-done: + return + } + +} diff --git a/src/daq/s5proxy/src/s5proxy/proxy.go b/src/daq/s5proxy/src/s5proxy/proxy.go index 19a6a1e..b5c7a72 100644 --- a/src/daq/s5proxy/src/s5proxy/proxy.go +++ b/src/daq/s5proxy/src/s5proxy/proxy.go @@ -53,7 +53,7 @@ func generateTime(input string) string { type s5proxyResponseWriter struct { wrapped http.ResponseWriter conf *Config - StatsCh chan<- ClientData + StatsCh chan<- *ClientData IP string UserAgent string } @@ -65,7 +65,7 @@ func (r s5proxyResponseWriter) Header() http.Header { func (r s5proxyResponseWriter) Write(data []byte) (int, error) { sent, err := r.wrapped.Write(data) if r.StatsCh != nil { - stats := ClientData{IP: r.IP, UserAgent: r.UserAgent, BytesSent: uint(sent)} + stats := &ClientData{IP: r.IP, UserAgent: r.UserAgent, BytesSent: uint(sent)} select { case r.StatsCh <- stats: default: diff --git a/src/daq/s5proxy/src/s5proxy/s5proxy.go b/src/daq/s5proxy/src/s5proxy/s5proxy.go deleted file mode 100644 index 0eb91cf..0000000 --- a/src/daq/s5proxy/src/s5proxy/s5proxy.go +++ /dev/null @@ -1,111 +0,0 @@ -// -// sfive -// -// sfive - spreadspace streaming statistics suite is a generic -// statistic collection tool for streaming server infrastuctures. -// The system collects and stores meta data like number of views -// and throughput from a number of streaming servers and stores -// it in a global data store. -// The data acquisition is designed to be generic and extensible in -// order to support different streaming software. -// sfive also contains tools and applications to filter and visualize -// live and recorded data. -// -// -// Copyright (C) 2014-2016 Christian Pointner -// Markus Grüneis -// -// This file is part of sfive. -// -// sfive is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License version 3 -// as published by the Free Software Foundation. -// -// sfive is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with sfive. If not, see . -// - -package main - -import ( - "flag" - "log" - "os" - "os/signal" -) - -var s5l = log.New(os.Stderr, "[s5proxy]\t", log.LstdFlags) - -func main() { - cf := flag.String("config", "/etc/s5proxy/default.json", "path to the config file") - help := flag.Bool("help", false, "show usage") - - flag.Parse() - - s5l.Printf("Hello, world.\n") - if *help { - flag.Usage() - return - } - - conf, err := readConfig(*cf) - if err != nil { - s5l.Printf("Error reading config file: %v\n", err) - os.Exit(1) - } - - var stats *Stats - if conf.SFive.Sock != "" { - stats, err = NewStats(conf) - if err != nil { - s5l.Printf("Error reading config file: %v\n", err) - os.Exit(1) - } - } - - var proxy *Proxy - proxy, err = NewProxy(conf, stats) - if err != nil { - s5l.Printf("Error reading config file: %v\n", err) - os.Exit(1) - } - - done := make(chan bool) - - if stats != nil { - go func() { - defer func() { done <- true }() - s5l.Println("running sfive-stats-module") - if err := stats.Run(); err != nil { - s5l.Printf("ERROR: sfive-stats-module: %v", err) - } - s5l.Println("ERROR: sfive-stats-module stopped") - }() - } - - go func() { - defer func() { done <- true }() - s5l.Println("running HTTP-Proxy") - if err := proxy.Run(); err != nil { - s5l.Printf("ERROR: HTTP-Proxy: %v", err) - } - s5l.Println("ERROR: HTTP-Proxy stopped") - }() - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - - select { - case <-c: - s5l.Println("received interrupt, shutdown") - return - case <-done: - return - } - -} diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go index e3499d1..acee5ec 100644 --- a/src/daq/s5proxy/src/s5proxy/stats.go +++ b/src/daq/s5proxy/src/s5proxy/stats.go @@ -33,7 +33,10 @@ package main import ( + "errors" + "fmt" "net" + "os" "strings" "time" ) @@ -71,17 +74,18 @@ type DataUpdate struct { type StatsWorker struct { stream StreamID - current DataUpdate + current *DataUpdate trigger chan time.Time - output chan<- DataUpdate - input chan ClientData + output chan<- *DataUpdate + input chan *ClientData } -func NewStatsWorker(stream StreamID, updates chan<- DataUpdate) (sw StatsWorker) { +func NewStatsWorker(stream StreamID, updates chan<- *DataUpdate) (sw StatsWorker) { sw.stream = stream + sw.current = &DataUpdate{StreamID: sw.stream} sw.trigger = make(chan time.Time) sw.output = updates - sw.input = make(chan ClientData, 100) + sw.input = make(chan *ClientData, 100) return } @@ -89,8 +93,11 @@ func (sw StatsWorker) Run() { for { select { case t := <-sw.trigger: - s5l.Printf("STATS(%v): sending report @%v", sw.stream, t) - // TODO: send out `current` on `output` and reset it + if t.UnixNano() != 0 { + s5l.Printf("STATS(%v): sending report @%v", sw.stream, t) + // TODO: send out `current` on `output` + } + // TODO: reset `current` case upd := <-sw.input: s5l.Printf("STATS(%v): got client data: IP: %s, UA: %s, %d Bytes", sw.stream, upd.IP, upd.UserAgent, upd.BytesSent) // TODO: add `upd` to `current` @@ -101,11 +108,11 @@ func (sw StatsWorker) Run() { type Stats struct { conf *Config sock net.Conn - updates chan DataUpdate + updates chan *DataUpdate workers map[string]StatsWorker } -func (s *Stats) GetUpdateChannel(url string) chan<- ClientData { +func (s *Stats) GetUpdateChannel(url string) chan<- *ClientData { s5l.Printf("STATS: got new client for url: %s", url) for name, worker := range s.workers { if strings.Contains(url, name) { @@ -115,18 +122,79 @@ func (s *Stats) GetUpdateChannel(url string) chan<- ClientData { return nil } +func parseStreamerDescriptionElement(desc, desctype string) (e []string, err error) { + parts := strings.Split(desc, ",") + for _, p := range parts { + if p = strings.TrimSpace(p); p == "" { + err = fmt.Errorf("invalid streamer description %s: '%s'", desctype, desc) + return + } + e = append(e, p) + } + return +} + +func parseStreamerDescription(desc string) (c, f, q []string, err error) { + parts := strings.Split(desc, "/") + if len(parts) != 3 { + err = fmt.Errorf("invalid streamer description: '%s'", desc) + return + } + if c, err = parseStreamerDescriptionElement(parts[0], "content-id"); err != nil { + return + } + if f, err = parseStreamerDescriptionElement(parts[1], "format"); err != nil { + return + } + if q, err = parseStreamerDescriptionElement(parts[2], "quality"); err != nil { + return + } + return +} + +func generateStreamerName(format, c, f, q string) string { + return os.Expand(format, func(k string) string { + switch k { + case "content-id": + return c + case "format": + return f + case "quality": + return q + default: + return "" + } + }) +} + func NewStats(conf *Config) (s *Stats, err error) { s = &Stats{conf: conf} if s.sock, err = net.DialTimeout("unixgram", conf.SFive.Sock, time.Second); err != nil { return } s5l.Printf("STATS: connected to sfive-hub on '%s'", conf.SFive.Sock) - s.updates = make(chan DataUpdate, 100) + s.updates = make(chan *DataUpdate, 100) s.workers = make(map[string]StatsWorker) - // TODO: create stats-worker depending on conf.SFive.Streamer - // TODO: generate stats-worker name using conf.SFive.Format - s.workers["av-orig-webm-high"] = NewStatsWorker(StreamID{"av-orig", "webm", "high"}, s.updates) + var content, format, quality []string + for _, desc := range conf.SFive.Streamer { + if content, format, quality, err = parseStreamerDescription(desc); err != nil { + return + } + for _, c := range content { + for _, f := range format { + for _, q := range quality { + name := generateStreamerName(conf.SFive.Format, c, f, q) + s.workers[name] = NewStatsWorker(StreamID{c, f, q}, s.updates) + s5l.Printf("STATS: adding streamer '%s'", name) + } + } + } + } + if len(s.workers) < 1 { + err = errors.New("no streamers defined") + return + } return } @@ -134,14 +202,25 @@ func (s *Stats) Run() (err error) { for _, worker := range s.workers { go worker.Run() } - // TODO: create ticker + offset := s.conf.SFive.Duration.Value - (time.Duration(time.Now().UnixNano()) % (s.conf.SFive.Duration.Value)) + s5l.Printf("STATS: waiting %v for the next duration boundary", offset) + time.Sleep(offset) + s5l.Printf("STATS: starting ticker") + ticker := time.NewTicker(s.conf.SFive.Duration.Value) + for _, worker := range s.workers { + worker.trigger <- time.Unix(0, 0) // stats-worker will not send data but just reset + } for { select { - //TODO: wait for ticker + case t := <-ticker.C: + t = t.Add(-1 * s.conf.SFive.Duration.Value) + for _, worker := range s.workers { + worker.trigger <- t + } case upd := <-s.updates: s5l.Printf("STATS: got data update for '%v'", upd.StreamID) - // TODO: send it out on socket + // TODO: fill missing info and send it out on socket // TODO: if sock error ... try reconnect } } -- cgit v1.2.3