summaryrefslogtreecommitdiff
path: root/src/daq/s5proxy
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2016-10-16 17:38:02 +0200
committerChristian Pointner <equinox@spreadspace.org>2016-10-16 17:38:02 +0200
commit7d04d05d11dd293ac4a29d450d8bd9159cb5680e (patch)
tree5a416810f575173dc680722c77c4824d1d1869de /src/daq/s5proxy
parentstats connecto to hub works now, improved logging and error handling (diff)
introduced statsworker
Diffstat (limited to 'src/daq/s5proxy')
-rw-r--r--src/daq/s5proxy/sample.json7
-rw-r--r--src/daq/s5proxy/src/s5proxy/config.go4
-rw-r--r--src/daq/s5proxy/src/s5proxy/stats.go74
3 files changed, 65 insertions, 20 deletions
diff --git a/src/daq/s5proxy/sample.json b/src/daq/s5proxy/sample.json
index b501382..06a3604 100644
--- a/src/daq/s5proxy/sample.json
+++ b/src/daq/s5proxy/sample.json
@@ -14,6 +14,11 @@
{ "op": "time", "header": "Expires", "value": "-1s" }
],
"sfive": {
- "socket": "/var/run/sfive/pipegram"
+ "socket": "/var/run/sfive/pipegram",
+ "format" : "/${content-id}-${format}-${quality}",
+ "streamer": [
+ "av-dt,av-en/flash,webm/high,medium,low,mini",
+ "audio-orig/mp3,ogg/high,medium,low"
+ }
}
}
diff --git a/src/daq/s5proxy/src/s5proxy/config.go b/src/daq/s5proxy/src/s5proxy/config.go
index da7360e..5f8dd0a 100644
--- a/src/daq/s5proxy/src/s5proxy/config.go
+++ b/src/daq/s5proxy/src/s5proxy/config.go
@@ -93,7 +93,9 @@ type HeaderOperation struct {
}
type SFiveConf struct {
- Sock string `json:"socket"`
+ Sock string `json:"socket"`
+ Format string `json:"format"`
+ Streamer []string `json:"streamer"`
}
type Config struct {
diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go
index 1d12f74..e3499d1 100644
--- a/src/daq/s5proxy/src/s5proxy/stats.go
+++ b/src/daq/s5proxy/src/s5proxy/stats.go
@@ -33,15 +33,15 @@
package main
import (
- "encoding/json"
"net"
+ "strings"
"time"
)
// TODO: this is basically a copy from src/hub....
-type StreamId struct {
- ContentId string `json:"content-id"`
+type StreamID struct {
+ ContentID string `json:"content-id"`
Format string `json:"format"`
Quality string `json:"quality"`
}
@@ -62,24 +62,57 @@ type SourceData struct {
type DataUpdate struct {
Version uint `json:"version"`
Hostname string `json:"hostname"`
- StreamId StreamId `json:"streamer-id"`
+ StreamID StreamID `json:"streamer-id"`
Tags []string `json:"tags,omitempty"`
StartTime time.Time `json:"start-time"`
Duration int64 `json:"duration-ms"`
Data SourceData `json:"data"`
}
+type StatsWorker struct {
+ stream StreamID
+ current DataUpdate
+ trigger chan time.Time
+ output chan<- DataUpdate
+ input chan ClientData
+}
+
+func NewStatsWorker(stream StreamID, updates chan<- DataUpdate) (sw StatsWorker) {
+ sw.stream = stream
+ sw.trigger = make(chan time.Time)
+ sw.output = updates
+ sw.input = make(chan ClientData, 100)
+ return
+}
+
+func (sw StatsWorker) Run() {
+ for {
+ select {
+ case t := <-sw.trigger:
+ s5l.Printf("STATS(%v): sending report @%v", sw.stream, t)
+ // TODO: send out `current` on `output` and reset it
+ case upd := <-sw.input:
+ s5l.Printf("STATS(%v): got client data: IP: %s, UA: %s, %d Bytes", sw.stream, upd.IP, upd.UserAgent, upd.BytesSent)
+ // TODO: add `upd` to `current`
+ }
+ }
+}
+
type Stats struct {
- conf *Config
- sock net.Conn
- // TODO: create datastructure to hold update channels
- updateCH chan ClientData
+ conf *Config
+ sock net.Conn
+ updates chan DataUpdate
+ workers map[string]StatsWorker
}
func (s *Stats) GetUpdateChannel(url string) chan<- ClientData {
s5l.Printf("STATS: got new client for url: %s", url)
-
- return s.updateCH
+ for name, worker := range s.workers {
+ if strings.Contains(url, name) {
+ return worker.input
+ }
+ }
+ return nil
}
func NewStats(conf *Config) (s *Stats, err error) {
@@ -88,23 +121,28 @@ func NewStats(conf *Config) (s *Stats, err error) {
return
}
s5l.Printf("STATS: connected to sfive-hub on '%s'", conf.SFive.Sock)
+ s.updates = make(chan DataUpdate, 100)
- // TODO: create update-channels according to conf
- s.updateCH = make(chan ClientData, 512)
+ s.workers = make(map[string]StatsWorker)
+ // TODO: create stats-worker depending on conf.SFive.Streamer
+ // TODO: generate stats-worker name using conf.SFive.Format
+ s.workers["av-orig-webm-high"] = NewStatsWorker(StreamID{"av-orig", "webm", "high"}, s.updates)
return
}
func (s *Stats) Run() (err error) {
- update := DataUpdate{Version: 1, Hostname: "hugo", StartTime: time.Now()}
- update.Tags = append(update.Tags, "hello", "world")
- if err = json.NewEncoder(s.sock).Encode(update); err != nil {
- return
+ for _, worker := range s.workers {
+ go worker.Run()
}
+ // TODO: create ticker
for {
select {
- case upd := <-s.updateCH:
- s5l.Printf("STATS: got client data: IP: %s, UA: %s, %d Bytes", upd.IP, upd.UserAgent, upd.BytesSent)
+ //TODO: wait for ticker
+ case upd := <-s.updates:
+ s5l.Printf("STATS: got data update for '%v'", upd.StreamID)
+ // TODO: send it out on socket
+ // TODO: if sock error ... try reconnect
}
}
return