summaryrefslogtreecommitdiff
path: root/src/daq/s5proxy
diff options
context:
space:
mode:
Diffstat (limited to 'src/daq/s5proxy')
-rw-r--r--src/daq/s5proxy/sample.json4
-rw-r--r--src/daq/s5proxy/src/s5proxy/config.go2
-rw-r--r--src/daq/s5proxy/src/s5proxy/stats.go52
3 files changed, 31 insertions, 27 deletions
diff --git a/src/daq/s5proxy/sample.json b/src/daq/s5proxy/sample.json
index b13ec7d..b1abcdc 100644
--- a/src/daq/s5proxy/sample.json
+++ b/src/daq/s5proxy/sample.json
@@ -18,8 +18,8 @@
"hostname": "public1",
"tags": [ "hello", "world" ],
"duration": "15s",
- "format" : "/${content-id}-${format}-${quality}",
- "streamer": [
+ "format" : "/${content}-${format}-${quality}",
+ "stream": [
"av-orig/flash,webm/high,medium,low,mini"
]
}
diff --git a/src/daq/s5proxy/src/s5proxy/config.go b/src/daq/s5proxy/src/s5proxy/config.go
index 8a94c48..280b9fd 100644
--- a/src/daq/s5proxy/src/s5proxy/config.go
+++ b/src/daq/s5proxy/src/s5proxy/config.go
@@ -113,7 +113,7 @@ type SFiveConf struct {
Tags []string `json:"tags"`
Duration Duration `json:"duration"`
Format string `json:"format"`
- Streamer []string `json:"streamer"`
+ Stream []string `json:"stream"`
}
type Config struct {
diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go
index 96ccd74..803dfa3 100644
--- a/src/daq/s5proxy/src/s5proxy/stats.go
+++ b/src/daq/s5proxy/src/s5proxy/stats.go
@@ -44,8 +44,12 @@ import (
// TODO: this is basically a copy from src/hub....
-type StreamID struct {
- ContentID string `json:"content-id"`
+const (
+ ProtocolVersion = 2
+)
+
+type Stream struct {
+ ContentID string `json:"content"`
Format string `json:"format"`
Quality string `json:"quality"`
}
@@ -56,32 +60,32 @@ type ClientData struct {
BytesSent uint `json:"bytes-sent"`
}
-type SourceData struct {
+type UpdateData struct {
ClientCount uint `json:"client-count"`
BytesReceived uint `json:"bytes-received"`
BytesSent uint `json:"bytes-sent"`
Clients []ClientData `json:"clients,omitempty"`
}
-type DataUpdate struct {
+type Update struct {
Version uint `json:"version"`
Hostname string `json:"hostname"`
- StreamID StreamID `json:"streamer-id"`
+ Stream Stream `json:"stream"`
Tags []string `json:"tags,omitempty"`
StartTime time.Time `json:"start-time"`
Duration int64 `json:"duration-ms"`
- Data SourceData `json:"data"`
+ Data UpdateData `json:"data"`
}
type StatsWorker struct {
- stream StreamID
+ stream Stream
current map[string]*ClientData
trigger chan time.Time
- output chan<- *DataUpdate
+ output chan<- *Update
input chan *ClientData
}
-func NewStatsWorker(stream StreamID, updates chan<- *DataUpdate) (sw StatsWorker) {
+func NewStatsWorker(stream Stream, updates chan<- *Update) (sw StatsWorker) {
sw.stream = stream
sw.current = make(map[string]*ClientData)
sw.trigger = make(chan time.Time)
@@ -95,7 +99,7 @@ func (sw StatsWorker) Run() {
select {
case t := <-sw.trigger:
if t.UnixNano() != 0 {
- upd := &DataUpdate{StreamID: sw.stream, StartTime: t}
+ upd := &Update{Stream: sw.stream, StartTime: t}
upd.Data.ClientCount = uint(len(sw.current))
for _, c := range sw.current {
upd.Data.Clients = append(upd.Data.Clients, *c)
@@ -122,7 +126,7 @@ type Stats struct {
conf *Config
sock net.Conn
dataEncoder *json.Encoder
- updates chan *DataUpdate
+ updates chan *Update
workers map[string]StatsWorker
}
@@ -136,7 +140,7 @@ func (s *Stats) GetUpdateChannel(url string) chan<- *ClientData {
return nil
}
-func parseStreamerDescriptionElement(desc, desctype string) (e []string, err error) {
+func parseStreamDescriptionElement(desc, desctype string) (e []string, err error) {
parts := strings.Split(desc, ",")
for _, p := range parts {
if p = strings.TrimSpace(p); p == "" {
@@ -148,28 +152,28 @@ func parseStreamerDescriptionElement(desc, desctype string) (e []string, err err
return
}
-func parseStreamerDescription(desc string) (c, f, q []string, err error) {
+func parseStreamDescription(desc string) (c, f, q []string, err error) {
parts := strings.Split(desc, "/")
if len(parts) != 3 {
err = fmt.Errorf("invalid streamer description: '%s'", desc)
return
}
- if c, err = parseStreamerDescriptionElement(parts[0], "content-id"); err != nil {
+ if c, err = parseStreamDescriptionElement(parts[0], "content"); err != nil {
return
}
- if f, err = parseStreamerDescriptionElement(parts[1], "format"); err != nil {
+ if f, err = parseStreamDescriptionElement(parts[1], "format"); err != nil {
return
}
- if q, err = parseStreamerDescriptionElement(parts[2], "quality"); err != nil {
+ if q, err = parseStreamDescriptionElement(parts[2], "quality"); err != nil {
return
}
return
}
-func generateStreamerName(format, c, f, q string) string {
+func generateStreamName(format, c, f, q string) string {
return os.Expand(format, func(k string) string {
switch k {
- case "content-id":
+ case "content":
return c
case "format":
return f
@@ -187,18 +191,18 @@ func NewStats(conf *Config) (s *Stats, err error) {
return
}
- s.updates = make(chan *DataUpdate, 1000)
+ s.updates = make(chan *Update, 1000)
s.workers = make(map[string]StatsWorker)
var content, format, quality []string
- for _, desc := range conf.SFive.Streamer {
- if content, format, quality, err = parseStreamerDescription(desc); err != nil {
+ for _, desc := range conf.SFive.Stream {
+ if content, format, quality, err = parseStreamDescription(desc); err != nil {
return
}
for _, c := range content {
for _, f := range format {
for _, q := range quality {
- name := generateStreamerName(conf.SFive.Format, c, f, q)
- s.workers[name] = NewStatsWorker(StreamID{c, f, q}, s.updates)
+ name := generateStreamName(conf.SFive.Format, c, f, q)
+ s.workers[name] = NewStatsWorker(Stream{c, f, q}, s.updates)
s5l.Printf("STATS: adding streamer '%s'", name)
}
}
@@ -223,7 +227,7 @@ func (s *Stats) connectToHub() (err error) {
func (s *Stats) sendUpdates() {
for {
upd := <-s.updates
- upd.Version = 1
+ upd.Version = ProtocolVersion
upd.Hostname = s.conf.SFive.Hostname
upd.Tags = s.conf.SFive.Tags
upd.Duration = int64(s.conf.SFive.Duration)