diff options
Diffstat (limited to 'src/daq/s5proxy')
-rw-r--r-- | src/daq/s5proxy/sample.json | 4 | ||||
-rw-r--r-- | src/daq/s5proxy/src/s5proxy/config.go | 2 | ||||
-rw-r--r-- | src/daq/s5proxy/src/s5proxy/stats.go | 52 |
3 files changed, 31 insertions, 27 deletions
diff --git a/src/daq/s5proxy/sample.json b/src/daq/s5proxy/sample.json index b13ec7d..b1abcdc 100644 --- a/src/daq/s5proxy/sample.json +++ b/src/daq/s5proxy/sample.json @@ -18,8 +18,8 @@ "hostname": "public1", "tags": [ "hello", "world" ], "duration": "15s", - "format" : "/${content-id}-${format}-${quality}", - "streamer": [ + "format" : "/${content}-${format}-${quality}", + "stream": [ "av-orig/flash,webm/high,medium,low,mini" ] } diff --git a/src/daq/s5proxy/src/s5proxy/config.go b/src/daq/s5proxy/src/s5proxy/config.go index 8a94c48..280b9fd 100644 --- a/src/daq/s5proxy/src/s5proxy/config.go +++ b/src/daq/s5proxy/src/s5proxy/config.go @@ -113,7 +113,7 @@ type SFiveConf struct { Tags []string `json:"tags"` Duration Duration `json:"duration"` Format string `json:"format"` - Streamer []string `json:"streamer"` + Stream []string `json:"stream"` } type Config struct { diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go index 96ccd74..803dfa3 100644 --- a/src/daq/s5proxy/src/s5proxy/stats.go +++ b/src/daq/s5proxy/src/s5proxy/stats.go @@ -44,8 +44,12 @@ import ( // TODO: this is basically a copy from src/hub.... -type StreamID struct { - ContentID string `json:"content-id"` +const ( + ProtocolVersion = 2 +) + +type Stream struct { + ContentID string `json:"content"` Format string `json:"format"` Quality string `json:"quality"` } @@ -56,32 +60,32 @@ type ClientData struct { BytesSent uint `json:"bytes-sent"` } -type SourceData struct { +type UpdateData struct { ClientCount uint `json:"client-count"` BytesReceived uint `json:"bytes-received"` BytesSent uint `json:"bytes-sent"` Clients []ClientData `json:"clients,omitempty"` } -type DataUpdate struct { +type Update struct { Version uint `json:"version"` Hostname string `json:"hostname"` - StreamID StreamID `json:"streamer-id"` + Stream Stream `json:"stream"` Tags []string `json:"tags,omitempty"` StartTime time.Time `json:"start-time"` Duration int64 `json:"duration-ms"` - Data SourceData `json:"data"` + Data UpdateData `json:"data"` } type StatsWorker struct { - stream StreamID + stream Stream current map[string]*ClientData trigger chan time.Time - output chan<- *DataUpdate + output chan<- *Update input chan *ClientData } -func NewStatsWorker(stream StreamID, updates chan<- *DataUpdate) (sw StatsWorker) { +func NewStatsWorker(stream Stream, updates chan<- *Update) (sw StatsWorker) { sw.stream = stream sw.current = make(map[string]*ClientData) sw.trigger = make(chan time.Time) @@ -95,7 +99,7 @@ func (sw StatsWorker) Run() { select { case t := <-sw.trigger: if t.UnixNano() != 0 { - upd := &DataUpdate{StreamID: sw.stream, StartTime: t} + upd := &Update{Stream: sw.stream, StartTime: t} upd.Data.ClientCount = uint(len(sw.current)) for _, c := range sw.current { upd.Data.Clients = append(upd.Data.Clients, *c) @@ -122,7 +126,7 @@ type Stats struct { conf *Config sock net.Conn dataEncoder *json.Encoder - updates chan *DataUpdate + updates chan *Update workers map[string]StatsWorker } @@ -136,7 +140,7 @@ func (s *Stats) GetUpdateChannel(url string) chan<- *ClientData { return nil } -func parseStreamerDescriptionElement(desc, desctype string) (e []string, err error) { +func parseStreamDescriptionElement(desc, desctype string) (e []string, err error) { parts := strings.Split(desc, ",") for _, p := range parts { if p = strings.TrimSpace(p); p == "" { @@ -148,28 +152,28 @@ func parseStreamerDescriptionElement(desc, desctype string) (e []string, err err return } -func parseStreamerDescription(desc string) (c, f, q []string, err error) { +func parseStreamDescription(desc string) (c, f, q []string, err error) { parts := strings.Split(desc, "/") if len(parts) != 3 { err = fmt.Errorf("invalid streamer description: '%s'", desc) return } - if c, err = parseStreamerDescriptionElement(parts[0], "content-id"); err != nil { + if c, err = parseStreamDescriptionElement(parts[0], "content"); err != nil { return } - if f, err = parseStreamerDescriptionElement(parts[1], "format"); err != nil { + if f, err = parseStreamDescriptionElement(parts[1], "format"); err != nil { return } - if q, err = parseStreamerDescriptionElement(parts[2], "quality"); err != nil { + if q, err = parseStreamDescriptionElement(parts[2], "quality"); err != nil { return } return } -func generateStreamerName(format, c, f, q string) string { +func generateStreamName(format, c, f, q string) string { return os.Expand(format, func(k string) string { switch k { - case "content-id": + case "content": return c case "format": return f @@ -187,18 +191,18 @@ func NewStats(conf *Config) (s *Stats, err error) { return } - s.updates = make(chan *DataUpdate, 1000) + s.updates = make(chan *Update, 1000) s.workers = make(map[string]StatsWorker) var content, format, quality []string - for _, desc := range conf.SFive.Streamer { - if content, format, quality, err = parseStreamerDescription(desc); err != nil { + for _, desc := range conf.SFive.Stream { + if content, format, quality, err = parseStreamDescription(desc); err != nil { return } for _, c := range content { for _, f := range format { for _, q := range quality { - name := generateStreamerName(conf.SFive.Format, c, f, q) - s.workers[name] = NewStatsWorker(StreamID{c, f, q}, s.updates) + name := generateStreamName(conf.SFive.Format, c, f, q) + s.workers[name] = NewStatsWorker(Stream{c, f, q}, s.updates) s5l.Printf("STATS: adding streamer '%s'", name) } } @@ -223,7 +227,7 @@ func (s *Stats) connectToHub() (err error) { func (s *Stats) sendUpdates() { for { upd := <-s.updates - upd.Version = 1 + upd.Version = ProtocolVersion upd.Hostname = s.conf.SFive.Hostname upd.Tags = s.conf.SFive.Tags upd.Duration = int64(s.conf.SFive.Duration) |