summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2016-10-16 19:02:05 +0200
committerChristian Pointner <equinox@spreadspace.org>2016-10-16 19:02:05 +0200
commit9c33cf9bf5bbc9eaed2cbd9c66d897c099a948cf (patch)
tree51c3c9b5ce949da8521654fd9e45b64e9b532f25
parentintroduced statsworker (diff)
generating streamer according to config works now
-rw-r--r--src/daq/s5proxy/sample.json5
-rw-r--r--src/daq/s5proxy/src/s5proxy/config.go15
-rw-r--r--src/daq/s5proxy/src/s5proxy/main.go (renamed from src/daq/s5proxy/src/s5proxy/s5proxy.go)4
-rw-r--r--src/daq/s5proxy/src/s5proxy/proxy.go4
-rw-r--r--src/daq/s5proxy/src/s5proxy/stats.go111
5 files changed, 117 insertions, 22 deletions
diff --git a/src/daq/s5proxy/sample.json b/src/daq/s5proxy/sample.json
index 06a3604..b746f03 100644
--- a/src/daq/s5proxy/sample.json
+++ b/src/daq/s5proxy/sample.json
@@ -15,10 +15,11 @@
],
"sfive": {
"socket": "/var/run/sfive/pipegram",
+ "duration": "15s",
"format" : "/${content-id}-${format}-${quality}",
"streamer": [
- "av-dt,av-en/flash,webm/high,medium,low,mini",
+ "av-dt, av-en / flash, webm / high,medium ,low,mini",
"audio-orig/mp3,ogg/high,medium,low"
- }
+ ]
}
}
diff --git a/src/daq/s5proxy/src/s5proxy/config.go b/src/daq/s5proxy/src/s5proxy/config.go
index 5f8dd0a..cf03bbc 100644
--- a/src/daq/s5proxy/src/s5proxy/config.go
+++ b/src/daq/s5proxy/src/s5proxy/config.go
@@ -36,6 +36,7 @@ import (
"encoding/json"
"errors"
"os"
+ "time"
)
type Operation uint
@@ -92,8 +93,21 @@ type HeaderOperation struct {
Value string `json:"value,omitempty"`
}
+type Duration struct {
+ Value time.Duration
+}
+
+func (d *Duration) UnmarshalText(data []byte) (err error) {
+ d.Value, err = time.ParseDuration(string(data))
+ if err == nil && d.Value < 0 {
+ err = errors.New("negative durations are not allowed")
+ }
+ return
+}
+
type SFiveConf struct {
Sock string `json:"socket"`
+ Duration Duration `json:"duration"`
Format string `json:"format"`
Streamer []string `json:"streamer"`
}
@@ -118,6 +132,7 @@ func readConfig(configfile string) (conf *Config, err error) {
defer f.Close()
conf = &Config{}
+ conf.SFive.Duration.Value = 10 * time.Second
err = json.NewDecoder(f).Decode(conf)
return
}
diff --git a/src/daq/s5proxy/src/s5proxy/s5proxy.go b/src/daq/s5proxy/src/s5proxy/main.go
index 0eb91cf..fd57e42 100644
--- a/src/daq/s5proxy/src/s5proxy/s5proxy.go
+++ b/src/daq/s5proxy/src/s5proxy/main.go
@@ -63,7 +63,7 @@ func main() {
if conf.SFive.Sock != "" {
stats, err = NewStats(conf)
if err != nil {
- s5l.Printf("Error reading config file: %v\n", err)
+ s5l.Printf("Error creating sfive-stats-module: %v\n", err)
os.Exit(1)
}
}
@@ -71,7 +71,7 @@ func main() {
var proxy *Proxy
proxy, err = NewProxy(conf, stats)
if err != nil {
- s5l.Printf("Error reading config file: %v\n", err)
+ s5l.Printf("Error creating HTTP-Proxy: %v\n", err)
os.Exit(1)
}
diff --git a/src/daq/s5proxy/src/s5proxy/proxy.go b/src/daq/s5proxy/src/s5proxy/proxy.go
index 19a6a1e..b5c7a72 100644
--- a/src/daq/s5proxy/src/s5proxy/proxy.go
+++ b/src/daq/s5proxy/src/s5proxy/proxy.go
@@ -53,7 +53,7 @@ func generateTime(input string) string {
type s5proxyResponseWriter struct {
wrapped http.ResponseWriter
conf *Config
- StatsCh chan<- ClientData
+ StatsCh chan<- *ClientData
IP string
UserAgent string
}
@@ -65,7 +65,7 @@ func (r s5proxyResponseWriter) Header() http.Header {
func (r s5proxyResponseWriter) Write(data []byte) (int, error) {
sent, err := r.wrapped.Write(data)
if r.StatsCh != nil {
- stats := ClientData{IP: r.IP, UserAgent: r.UserAgent, BytesSent: uint(sent)}
+ stats := &ClientData{IP: r.IP, UserAgent: r.UserAgent, BytesSent: uint(sent)}
select {
case r.StatsCh <- stats:
default:
diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go
index e3499d1..acee5ec 100644
--- a/src/daq/s5proxy/src/s5proxy/stats.go
+++ b/src/daq/s5proxy/src/s5proxy/stats.go
@@ -33,7 +33,10 @@
package main
import (
+ "errors"
+ "fmt"
"net"
+ "os"
"strings"
"time"
)
@@ -71,17 +74,18 @@ type DataUpdate struct {
type StatsWorker struct {
stream StreamID
- current DataUpdate
+ current *DataUpdate
trigger chan time.Time
- output chan<- DataUpdate
- input chan ClientData
+ output chan<- *DataUpdate
+ input chan *ClientData
}
-func NewStatsWorker(stream StreamID, updates chan<- DataUpdate) (sw StatsWorker) {
+func NewStatsWorker(stream StreamID, updates chan<- *DataUpdate) (sw StatsWorker) {
sw.stream = stream
+ sw.current = &DataUpdate{StreamID: sw.stream}
sw.trigger = make(chan time.Time)
sw.output = updates
- sw.input = make(chan ClientData, 100)
+ sw.input = make(chan *ClientData, 100)
return
}
@@ -89,8 +93,11 @@ func (sw StatsWorker) Run() {
for {
select {
case t := <-sw.trigger:
- s5l.Printf("STATS(%v): sending report @%v", sw.stream, t)
- // TODO: send out `current` on `output` and reset it
+ if t.UnixNano() != 0 {
+ s5l.Printf("STATS(%v): sending report @%v", sw.stream, t)
+ // TODO: send out `current` on `output`
+ }
+ // TODO: reset `current`
case upd := <-sw.input:
s5l.Printf("STATS(%v): got client data: IP: %s, UA: %s, %d Bytes", sw.stream, upd.IP, upd.UserAgent, upd.BytesSent)
// TODO: add `upd` to `current`
@@ -101,11 +108,11 @@ func (sw StatsWorker) Run() {
type Stats struct {
conf *Config
sock net.Conn
- updates chan DataUpdate
+ updates chan *DataUpdate
workers map[string]StatsWorker
}
-func (s *Stats) GetUpdateChannel(url string) chan<- ClientData {
+func (s *Stats) GetUpdateChannel(url string) chan<- *ClientData {
s5l.Printf("STATS: got new client for url: %s", url)
for name, worker := range s.workers {
if strings.Contains(url, name) {
@@ -115,18 +122,79 @@ func (s *Stats) GetUpdateChannel(url string) chan<- ClientData {
return nil
}
+func parseStreamerDescriptionElement(desc, desctype string) (e []string, err error) {
+ parts := strings.Split(desc, ",")
+ for _, p := range parts {
+ if p = strings.TrimSpace(p); p == "" {
+ err = fmt.Errorf("invalid streamer description %s: '%s'", desctype, desc)
+ return
+ }
+ e = append(e, p)
+ }
+ return
+}
+
+func parseStreamerDescription(desc string) (c, f, q []string, err error) {
+ parts := strings.Split(desc, "/")
+ if len(parts) != 3 {
+ err = fmt.Errorf("invalid streamer description: '%s'", desc)
+ return
+ }
+ if c, err = parseStreamerDescriptionElement(parts[0], "content-id"); err != nil {
+ return
+ }
+ if f, err = parseStreamerDescriptionElement(parts[1], "format"); err != nil {
+ return
+ }
+ if q, err = parseStreamerDescriptionElement(parts[2], "quality"); err != nil {
+ return
+ }
+ return
+}
+
+func generateStreamerName(format, c, f, q string) string {
+ return os.Expand(format, func(k string) string {
+ switch k {
+ case "content-id":
+ return c
+ case "format":
+ return f
+ case "quality":
+ return q
+ default:
+ return ""
+ }
+ })
+}
+
func NewStats(conf *Config) (s *Stats, err error) {
s = &Stats{conf: conf}
if s.sock, err = net.DialTimeout("unixgram", conf.SFive.Sock, time.Second); err != nil {
return
}
s5l.Printf("STATS: connected to sfive-hub on '%s'", conf.SFive.Sock)
- s.updates = make(chan DataUpdate, 100)
+ s.updates = make(chan *DataUpdate, 100)
s.workers = make(map[string]StatsWorker)
- // TODO: create stats-worker depending on conf.SFive.Streamer
- // TODO: generate stats-worker name using conf.SFive.Format
- s.workers["av-orig-webm-high"] = NewStatsWorker(StreamID{"av-orig", "webm", "high"}, s.updates)
+ var content, format, quality []string
+ for _, desc := range conf.SFive.Streamer {
+ if content, format, quality, err = parseStreamerDescription(desc); err != nil {
+ return
+ }
+ for _, c := range content {
+ for _, f := range format {
+ for _, q := range quality {
+ name := generateStreamerName(conf.SFive.Format, c, f, q)
+ s.workers[name] = NewStatsWorker(StreamID{c, f, q}, s.updates)
+ s5l.Printf("STATS: adding streamer '%s'", name)
+ }
+ }
+ }
+ }
+ if len(s.workers) < 1 {
+ err = errors.New("no streamers defined")
+ return
+ }
return
}
@@ -134,14 +202,25 @@ func (s *Stats) Run() (err error) {
for _, worker := range s.workers {
go worker.Run()
}
- // TODO: create ticker
+ offset := s.conf.SFive.Duration.Value - (time.Duration(time.Now().UnixNano()) % (s.conf.SFive.Duration.Value))
+ s5l.Printf("STATS: waiting %v for the next duration boundary", offset)
+ time.Sleep(offset)
+ s5l.Printf("STATS: starting ticker")
+ ticker := time.NewTicker(s.conf.SFive.Duration.Value)
+ for _, worker := range s.workers {
+ worker.trigger <- time.Unix(0, 0) // stats-worker will not send data but just reset
+ }
for {
select {
- //TODO: wait for ticker
+ case t := <-ticker.C:
+ t = t.Add(-1 * s.conf.SFive.Duration.Value)
+ for _, worker := range s.workers {
+ worker.trigger <- t
+ }
case upd := <-s.updates:
s5l.Printf("STATS: got data update for '%v'", upd.StreamID)
- // TODO: send it out on socket
+ // TODO: fill missing info and send it out on socket
// TODO: if sock error ... try reconnect
}
}