summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-05-10 23:13:11 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-05-10 23:15:47 +0200
commitb53f49f8cbd3b7666c9267f0e2d88fce28ae6c1a (patch)
tree5da51d292af7c5ad16a7198552f785f8cdf872f6 /src
parentand one more variable refactoring (diff)
new protocol version with streamlined names for variables
Diffstat (limited to 'src')
-rwxr-xr-xsrc/daq/accesslog/s5-accesslog44
-rw-r--r--src/daq/flumotion-plug/s5.py12
-rwxr-xr-xsrc/daq/flumotion-rrd/s5-flumotion-rrd16
-rwxr-xr-xsrc/daq/nginx-lua/s5-nginx-lua-fetch44
-rw-r--r--src/daq/s5proxy/sample.json4
-rw-r--r--src/daq/s5proxy/src/s5proxy/config.go2
-rw-r--r--src/daq/s5proxy/src/s5proxy/stats.go52
-rw-r--r--src/es5/es-query.json2
-rw-r--r--src/es5/sfive-init.json4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt_test.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesApi.go8
-rwxr-xr-xsrc/hub/test-import12
14 files changed, 99 insertions, 107 deletions
diff --git a/src/daq/accesslog/s5-accesslog b/src/daq/accesslog/s5-accesslog
index e91c523..29ec9f3 100755
--- a/src/daq/accesslog/s5-accesslog
+++ b/src/daq/accesslog/s5-accesslog
@@ -138,7 +138,7 @@ class AccessLog():
self._duration = properties['duration']
self._tags = properties['tags']
self._logfile = properties['logfile']
- self._initStreamerIds(properties['streamer-ids'])
+ self._initStreamerIds(properties['streams'])
self._proto = None
self._conn = None
@@ -175,7 +175,7 @@ class AccessLog():
if len(cs.keys()) == 1:
self._content_id = cs.keys()[0]
- print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id
+ print 'SFive: only one content detected "%s" - will include it in init messages only' % self._content_id
if len(fs.keys()) == 1:
self._format = fs.keys()[0]
@@ -345,9 +345,9 @@ class AccessLog():
def _sendDatasetFull(self, timestamp, duration, content_id, format, quality):
clients = self._streamer[content_id][format][quality]
- data = { "version": 1, "hostname": self._hostname, "tags": self._tags,
- "streamer-id": {
- "content-id": content_id,
+ data = { "version": 2, "hostname": self._hostname, "tags": self._tags,
+ "stream": {
+ "content": content_id,
"format": format,
"quality": quality
},
@@ -362,17 +362,17 @@ class AccessLog():
self._proto.sendDatagram('%s\n' % (json.dumps(data)))
def _sendInit(self):
- initdata = { "version": 1, "hostname": self._hostname,
- "streamer-id" : { },
+ initdata = { "version": 2, "hostname": self._hostname,
+ "stream" : { },
"tags": self._tags }
if self._content_id:
- initdata["streamer-id"]["content-id"] = self._content_id
+ initdata["stream"]["content"] = self._content_id
if self._format:
- initdata["streamer-id"]["format"] = self._format
+ initdata["stream"]["format"] = self._format
if self._quality:
- initdata["streamer-id"]["quality"] = self._quality
- if len(initdata["streamer-id"].keys()) == 0:
- del initdata["streamer-id"]
+ initdata["stream"]["quality"] = self._quality
+ if len(initdata["stream"].keys()) == 0:
+ del initdata["stream"]
self._proto.sendDatagram('%s\n' % (json.dumps(initdata)))
@@ -380,7 +380,7 @@ class AccessLog():
clients = self._streamer[content_id][format][quality]
data = { "start-time": timestamp.isoformat('T') + 'Z',
"duration-ms": duration * 1000,
- "streamer-id": { },
+ "stream": { },
"data": {
"clients": list(clients.values()),
"client-count": clients.getCnt(),
@@ -388,13 +388,13 @@ class AccessLog():
}
}
if not self._content_id:
- data["streamer-id"]["content-id"] = content_id
+ data["stream"]["content"] = content_id
if not self._format:
- data["streamer-id"]["format"] = format
+ data["stream"]["format"] = format
if not self._quality:
- data["streamer-id"]["quality"] = quality
- if len(data["streamer-id"].keys()) == 0:
- del data["streamer-id"]
+ data["stream"]["quality"] = quality
+ if len(data["stream"].keys()) == 0:
+ del data["stream"]
self._proto.sendDatagram('%s\n' % (json.dumps(data)))
@@ -411,8 +411,8 @@ if __name__ == '__main__':
help='time (in seconds) between updates; defaults to 15')
parser.add_argument('--tag', '-t', dest='tags', action='append',
help='tag to be added to the statistic data, can be invoked several times')
- parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append',
- help='a streamer description like <content-id1>[,<content-id2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times')
+ parser.add_argument('--stream', '-S', dest='streams', action='append',
+ help='a streamer description like <content1>[,<content2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times')
parser.add_argument('--logfile', '-l', dest='logfile', required=True,
help='path to the logfile or \'-\' for standard input')
parser.add_argument('--nameformat', '-F', dest='nameformat', required=False,
@@ -426,8 +426,8 @@ if __name__ == '__main__':
args['duration'] = int(args['duration'])
if not args['nameformat']:
args['nameformat'] = '/[^/]+/(?P<format>hls|dash)/(?P<content>.+)-(?P<quality>[^-]+)/.*'
- if not args['streamer-ids']:
- print 'SFive: you have to specify at least one streamer-id!'
+ if not args['streams']:
+ print 'SFive: you have to specify at least one stream!'
exit(-1)
importer = AccessLog(args)
importer.run()
diff --git a/src/daq/flumotion-plug/s5.py b/src/daq/flumotion-plug/s5.py
index 0db01e4..58dc71a 100644
--- a/src/daq/flumotion-plug/s5.py
+++ b/src/daq/flumotion-plug/s5.py
@@ -126,7 +126,7 @@ class ComponentSFivePlug(base.ComponentPlug):
properties = self.args['properties']
self._socket = properties['socket']
self._hostname = properties['hostname']
- self._content_id = properties.get('content-id')
+ self._content_id = properties.get('content')
self._format = properties.get('format')
self._quality = properties.get('quality')
tagstring = properties.get('tags', '')
@@ -208,9 +208,9 @@ class ComponentSFivePlug(base.ComponentPlug):
def _sendDatasetFull(self, timestamp, duration, client_count, bytes_sent, bytes_received):
- data = { "version": 1, "hostname": self._hostname,
- "streamer-id": {
- "content-id": self._content_id,
+ data = { "version": 2, "hostname": self._hostname,
+ "stream": {
+ "content": self._content_id,
"format": self._format,
"quality": self._quality
},
@@ -226,8 +226,8 @@ class ComponentSFivePlug(base.ComponentPlug):
self._proto.sendDatagram('%s\n' % (json.dumps(data)))
def _sendInit(self):
- initdata = { "version": 1, "hostname": self._hostname,
- "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
+ initdata = { "version": 2, "hostname": self._hostname,
+ "stream": { "content": self._content_id, "format": self._format, "quality": self._quality },
"tags": self._tags }
self._proto.sendDatagram('%s\n' % (json.dumps(initdata)))
diff --git a/src/daq/flumotion-rrd/s5-flumotion-rrd b/src/daq/flumotion-rrd/s5-flumotion-rrd
index 197cb14..3170030 100755
--- a/src/daq/flumotion-rrd/s5-flumotion-rrd
+++ b/src/daq/flumotion-rrd/s5-flumotion-rrd
@@ -100,7 +100,7 @@ class FlumotionRRD():
self._socket = properties['socket']
self._hostname = properties['hostname']
- self._content_id = properties['content-id']
+ self._content_id = properties['content']
self._format = properties['format']
self._quality = properties['quality']
self._tags = properties['tags']
@@ -191,9 +191,9 @@ class FlumotionRRD():
def _sendDatasetFull(self, timestamp, duration, client_count, bytes_sent):
client_count = int(round(client_count)) if client_count else 0
bytes_sent = int(round(bytes_sent)) if bytes_sent else 0
- data = { "version": 1, "hostname": self._hostname,
- "streamer-id": {
- "content-id": self._content_id,
+ data = { "version": 2, "hostname": self._hostname,
+ "stream": {
+ "content": self._content_id,
"format": self._format,
"quality": self._quality
},
@@ -208,8 +208,8 @@ class FlumotionRRD():
self._proto.sendDatagram('%s\n' % (json.dumps(data)))
def _sendInit(self):
- initdata = { "version": 1, "hostname": self._hostname,
- "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
+ initdata = { "version": 2, "hostname": self._hostname,
+ "stream": { "content": self._content_id, "format": self._format, "quality": self._quality },
"tags": self._tags }
self._proto.sendDatagram('%s\n' % (json.dumps(initdata)))
@@ -234,8 +234,8 @@ if __name__ == '__main__':
help='the path to the data socket of the local SFive hub')
parser.add_argument('--hostname', '-n', dest='hostname', required=True,
help='the hostname of the machine')
- parser.add_argument('--content-id', '-c', dest='content-id', required=True,
- help='the content-id (i.e. av)')
+ parser.add_argument('--content', '-c', dest='content', required=True,
+ help='the content (i.e. av)')
parser.add_argument('--format', '-f', dest='format', required=True,
help='the format (i.e. webm)')
parser.add_argument('--quality', '-q', dest='quality', required=True,
diff --git a/src/daq/nginx-lua/s5-nginx-lua-fetch b/src/daq/nginx-lua/s5-nginx-lua-fetch
index 65e2c83..565a24f 100755
--- a/src/daq/nginx-lua/s5-nginx-lua-fetch
+++ b/src/daq/nginx-lua/s5-nginx-lua-fetch
@@ -169,7 +169,7 @@ class NGXLuaFetcher():
self._duration = properties['duration']
self._tags = properties['tags']
self._url = properties['url']
- self._initStreamerIds(properties['streamer-ids'])
+ self._initStreamerIds(properties['streams'])
self._proto = None
self._conn = None
@@ -207,7 +207,7 @@ class NGXLuaFetcher():
if len(cs.keys()) == 1:
self._content_id = cs.keys()[0]
-# print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id
+# print 'SFive: only one content detected "%s" - will include it in init messages only' % self._content_id
if len(fs.keys()) == 1:
self._format = fs.keys()[0]
@@ -310,9 +310,9 @@ class NGXLuaFetcher():
def _sendDatasetFull(self, timestamp, duration, content_id, format, quality):
clients = self._streamer[content_id][format][quality]
- data = { "version": 1, "hostname": self._hostname, "tags": self._tags,
- "streamer-id": {
- "content-id": content_id,
+ data = { "version": 2, "hostname": self._hostname, "tags": self._tags,
+ "stream": {
+ "content": content_id,
"format": format,
"quality": quality
},
@@ -327,17 +327,17 @@ class NGXLuaFetcher():
self._proto.sendDatagram('%s\n' % (json.dumps(data)))
def _sendInit(self):
- initdata = { "version": 1, "hostname": self._hostname,
- "streamer-id" : { },
+ initdata = { "version": 2, "hostname": self._hostname,
+ "stream" : { },
"tags": self._tags }
if self._content_id:
- initdata["streamer-id"]["content-id"] = self._content_id
+ initdata["stream"]["content"] = self._content_id
if self._format:
- initdata["streamer-id"]["format"] = self._format
+ initdata["stream"]["format"] = self._format
if self._quality:
- initdata["streamer-id"]["quality"] = self._quality
- if len(initdata["streamer-id"].keys()) == 0:
- del initdata["streamer-id"]
+ initdata["stream"]["quality"] = self._quality
+ if len(initdata["stream"].keys()) == 0:
+ del initdata["stream"]
self._proto.sendDatagram('%s\n' % (json.dumps(initdata)))
@@ -345,7 +345,7 @@ class NGXLuaFetcher():
clients = self._streamer[content_id][format][quality]
data = { "start-time": timestamp.isoformat('T') + 'Z',
"duration-ms": duration * 1000,
- "streamer-id": { },
+ "stream": { },
"data": {
"clients": list(clients.values()),
"client-count": clients.getCnt(),
@@ -353,13 +353,13 @@ class NGXLuaFetcher():
}
}
if not self._content_id:
- data["streamer-id"]["content-id"] = content_id
+ data["stream"]["content"] = content_id
if not self._format:
- data["streamer-id"]["format"] = format
+ data["stream"]["format"] = format
if not self._quality:
- data["streamer-id"]["quality"] = quality
- if len(data["streamer-id"].keys()) == 0:
- del data["streamer-id"]
+ data["stream"]["quality"] = quality
+ if len(data["stream"].keys()) == 0:
+ del data["stream"]
self._proto.sendDatagram('%s\n' % (json.dumps(data)))
@@ -376,8 +376,8 @@ if __name__ == '__main__':
help='time (in seconds) between updates; defaults to 15')
parser.add_argument('--tag', '-t', dest='tags', action='append',
help='tag to be added to the statistic data, can be invoked several times')
- parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append',
- help='a streamer description like <content-id1>[,<content-id2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times')
+ parser.add_argument('--stream', '-S', dest='streams', action='append',
+ help='a streamer description like <content1>[,<content2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times')
parser.add_argument('--url', '-u', dest='url', required=False,
help='the url of the nginx sfive exporert, default http://localhost/sfive')
parser.add_argument('--nameformat', '-F', dest='nameformat', required=False,
@@ -393,8 +393,8 @@ if __name__ == '__main__':
args['url'] = 'http://localhost/sfive'
if not args['nameformat']:
args['nameformat'] = '/[^/]+/(?P<format>hls|dash)/(?P<content>.+)-(?P<quality>[^-]+)/.*'
- if not args['streamer-ids']:
- print 'SFive: you have to specify at least one streamer-id!'
+ if not args['streams']:
+ print 'SFive: you have to specify at least one stream!'
exit(-1)
fetcher = NGXLuaFetcher(args)
diff --git a/src/daq/s5proxy/sample.json b/src/daq/s5proxy/sample.json
index b13ec7d..b1abcdc 100644
--- a/src/daq/s5proxy/sample.json
+++ b/src/daq/s5proxy/sample.json
@@ -18,8 +18,8 @@
"hostname": "public1",
"tags": [ "hello", "world" ],
"duration": "15s",
- "format" : "/${content-id}-${format}-${quality}",
- "streamer": [
+ "format" : "/${content}-${format}-${quality}",
+ "stream": [
"av-orig/flash,webm/high,medium,low,mini"
]
}
diff --git a/src/daq/s5proxy/src/s5proxy/config.go b/src/daq/s5proxy/src/s5proxy/config.go
index 8a94c48..280b9fd 100644
--- a/src/daq/s5proxy/src/s5proxy/config.go
+++ b/src/daq/s5proxy/src/s5proxy/config.go
@@ -113,7 +113,7 @@ type SFiveConf struct {
Tags []string `json:"tags"`
Duration Duration `json:"duration"`
Format string `json:"format"`
- Streamer []string `json:"streamer"`
+ Stream []string `json:"stream"`
}
type Config struct {
diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go
index 96ccd74..803dfa3 100644
--- a/src/daq/s5proxy/src/s5proxy/stats.go
+++ b/src/daq/s5proxy/src/s5proxy/stats.go
@@ -44,8 +44,12 @@ import (
// TODO: this is basically a copy from src/hub....
-type StreamID struct {
- ContentID string `json:"content-id"`
+const (
+ ProtocolVersion = 2
+)
+
+type Stream struct {
+ ContentID string `json:"content"`
Format string `json:"format"`
Quality string `json:"quality"`
}
@@ -56,32 +60,32 @@ type ClientData struct {
BytesSent uint `json:"bytes-sent"`
}
-type SourceData struct {
+type UpdateData struct {
ClientCount uint `json:"client-count"`
BytesReceived uint `json:"bytes-received"`
BytesSent uint `json:"bytes-sent"`
Clients []ClientData `json:"clients,omitempty"`
}
-type DataUpdate struct {
+type Update struct {
Version uint `json:"version"`
Hostname string `json:"hostname"`
- StreamID StreamID `json:"streamer-id"`
+ Stream Stream `json:"stream"`
Tags []string `json:"tags,omitempty"`
StartTime time.Time `json:"start-time"`
Duration int64 `json:"duration-ms"`
- Data SourceData `json:"data"`
+ Data UpdateData `json:"data"`
}
type StatsWorker struct {
- stream StreamID
+ stream Stream
current map[string]*ClientData
trigger chan time.Time
- output chan<- *DataUpdate
+ output chan<- *Update
input chan *ClientData
}
-func NewStatsWorker(stream StreamID, updates chan<- *DataUpdate) (sw StatsWorker) {
+func NewStatsWorker(stream Stream, updates chan<- *Update) (sw StatsWorker) {
sw.stream = stream
sw.current = make(map[string]*ClientData)
sw.trigger = make(chan time.Time)
@@ -95,7 +99,7 @@ func (sw StatsWorker) Run() {
select {
case t := <-sw.trigger:
if t.UnixNano() != 0 {
- upd := &DataUpdate{StreamID: sw.stream, StartTime: t}
+ upd := &Update{Stream: sw.stream, StartTime: t}
upd.Data.ClientCount = uint(len(sw.current))
for _, c := range sw.current {
upd.Data.Clients = append(upd.Data.Clients, *c)
@@ -122,7 +126,7 @@ type Stats struct {
conf *Config
sock net.Conn
dataEncoder *json.Encoder
- updates chan *DataUpdate
+ updates chan *Update
workers map[string]StatsWorker
}
@@ -136,7 +140,7 @@ func (s *Stats) GetUpdateChannel(url string) chan<- *ClientData {
return nil
}
-func parseStreamerDescriptionElement(desc, desctype string) (e []string, err error) {
+func parseStreamDescriptionElement(desc, desctype string) (e []string, err error) {
parts := strings.Split(desc, ",")
for _, p := range parts {
if p = strings.TrimSpace(p); p == "" {
@@ -148,28 +152,28 @@ func parseStreamerDescriptionElement(desc, desctype string) (e []string, err err
return
}
-func parseStreamerDescription(desc string) (c, f, q []string, err error) {
+func parseStreamDescription(desc string) (c, f, q []string, err error) {
parts := strings.Split(desc, "/")
if len(parts) != 3 {
err = fmt.Errorf("invalid streamer description: '%s'", desc)
return
}
- if c, err = parseStreamerDescriptionElement(parts[0], "content-id"); err != nil {
+ if c, err = parseStreamDescriptionElement(parts[0], "content"); err != nil {
return
}
- if f, err = parseStreamerDescriptionElement(parts[1], "format"); err != nil {
+ if f, err = parseStreamDescriptionElement(parts[1], "format"); err != nil {
return
}
- if q, err = parseStreamerDescriptionElement(parts[2], "quality"); err != nil {
+ if q, err = parseStreamDescriptionElement(parts[2], "quality"); err != nil {
return
}
return
}
-func generateStreamerName(format, c, f, q string) string {
+func generateStreamName(format, c, f, q string) string {
return os.Expand(format, func(k string) string {
switch k {
- case "content-id":
+ case "content":
return c
case "format":
return f
@@ -187,18 +191,18 @@ func NewStats(conf *Config) (s *Stats, err error) {
return
}
- s.updates = make(chan *DataUpdate, 1000)
+ s.updates = make(chan *Update, 1000)
s.workers = make(map[string]StatsWorker)
var content, format, quality []string
- for _, desc := range conf.SFive.Streamer {
- if content, format, quality, err = parseStreamerDescription(desc); err != nil {
+ for _, desc := range conf.SFive.Stream {
+ if content, format, quality, err = parseStreamDescription(desc); err != nil {
return
}
for _, c := range content {
for _, f := range format {
for _, q := range quality {
- name := generateStreamerName(conf.SFive.Format, c, f, q)
- s.workers[name] = NewStatsWorker(StreamID{c, f, q}, s.updates)
+ name := generateStreamName(conf.SFive.Format, c, f, q)
+ s.workers[name] = NewStatsWorker(Stream{c, f, q}, s.updates)
s5l.Printf("STATS: adding streamer '%s'", name)
}
}
@@ -223,7 +227,7 @@ func (s *Stats) connectToHub() (err error) {
func (s *Stats) sendUpdates() {
for {
upd := <-s.updates
- upd.Version = 1
+ upd.Version = ProtocolVersion
upd.Hostname = s.conf.SFive.Hostname
upd.Tags = s.conf.SFive.Tags
upd.Duration = int64(s.conf.SFive.Duration)
diff --git a/src/es5/es-query.json b/src/es5/es-query.json
index b51f3f7..226a379 100644
--- a/src/es5/es-query.json
+++ b/src/es5/es-query.json
@@ -17,7 +17,7 @@
"aggregations": {
"by_source": {
"terms": {
- "script": "[ doc['hostname'].value, doc['streamer-id.content-id'].value, doc['streamer-id.format'].value, doc['streamer-id.quality'].value ].join('/') ",
+ "script": "[ doc['hostname'].value, doc['stream.content'].value, doc['stream.format'].value, doc['stream.quality'].value ].join('/') ",
"size": 0
},
"aggregations": {
diff --git a/src/es5/sfive-init.json b/src/es5/sfive-init.json
index bc89227..cf2a980 100644
--- a/src/es5/sfive-init.json
+++ b/src/es5/sfive-init.json
@@ -50,9 +50,9 @@
"type" : "date",
"format" : "dateOptionalTime"
},
- "streamer-id" : {
+ "stream" : {
"properties" : {
- "content-id" : {
+ "content" : {
"type" : "string",
"index" : "not_analyzed"
},
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go
index 2741a5a..0abbdef 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go
@@ -39,7 +39,7 @@ import (
)
const (
- ProtocolVersion = 1
+ ProtocolVersion = 2
)
//
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
index 46b5705..55d8415 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
@@ -42,7 +42,7 @@ import (
)
var (
- initEncoded = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"]`
+ initEncoded = `"hostname": "localhost", "stream": {"quality": "low", "content": "av", "format": "webm"}, "tags": ["elevate", "2014"]`
initStruct = Source{Hostname: "localhost", Stream: Stream{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}}
updateEncoded = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000`
updateStruct = Update{Data: UpdateData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
index 005e03d..2794a24 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
@@ -44,7 +44,7 @@ import (
const forwardEsLastUpdateIdJson = `{
"query": {"match": { "SourceHubUuid": "%s" } },
- "aggregations": { "last-id" : { "max" : { "field": "SourceHubDataUpdateId" } } }
+ "aggregations": { "last-id" : { "max" : { "field": "SourceHubUpdateId" } } }
}`
func fwdEsGetLastUpdateId(baseUrl string, client *http.Client, hubUuid string) (lastId int, err error) {
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
index 5d36e0d..0a30162 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
@@ -35,14 +35,14 @@ package sfive
import "time"
type Stream struct {
- ContentId string `json:"content-id"`
+ ContentId string `json:"content"`
Format string `json:"format"`
Quality string `json:"quality"`
}
type Source struct {
Hostname string `json:"hostname"`
- Stream Stream `json:"streamer-id"`
+ Stream Stream `json:"stream"`
Tags []string `json:"tags,omitempty"`
}
@@ -68,9 +68,9 @@ type Update struct {
type Header struct {
Version uint `json:"version,omitempty"` // omitempty is needed for data only messages and for REST API
SourceHubUuid string `json:"SourceHubUuid,omitempty"`
- SourceHubUpdateId int `json:"SourceHubDataUpdateId,omitempty"`
+ SourceHubUpdateId int `json:"SourceHubUpdateId,omitempty"`
ForwardHubUuid string `json:"ForwardHubUuid,omitempty"`
- ForwardHubUpdateId int `json:"ForwardHubDataUpdateId,omitempty"`
+ ForwardHubUpdateId int `json:"ForwardHubUpdateId,omitempty"`
}
type UpdateFull struct {
diff --git a/src/hub/test-import b/src/hub/test-import
deleted file mode 100755
index a6c2942..0000000
--- a/src/hub/test-import
+++ /dev/null
@@ -1,12 +0,0 @@
-#!/bin/sh
-
-TEST_D="./test"
-
-echo pipe: import sample.json
-echo ------------------------
-
-[ -f ../../dat/sample-access.json ] || zcat ../../dat/sample-accesslog.json.gz > ../../dat/sample-accesslog.json
-
-socat file:../../dat/sample-accesslog.json,rdonly "unix-client:$TEST_D/pipe"
-
-echo '\n\ndone'