summaryrefslogtreecommitdiff
path: root/src/daq
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-06-05 15:42:55 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-06-05 15:42:55 +0200
commitf17897ee7d4d57af4ecdd4f13cdf666011fddb74 (patch)
treeac4376afaa826b25e72a35bee73313df9d157ad6 /src/daq
parentgeo ip lookup implemented (diff)
updated s5proxy to new type names
Diffstat (limited to 'src/daq')
-rw-r--r--src/daq/s5proxy/sample.json2
-rw-r--r--src/daq/s5proxy/src/s5proxy/config.go2
-rw-r--r--src/daq/s5proxy/src/s5proxy/main.go2
-rw-r--r--src/daq/s5proxy/src/s5proxy/proxy.go22
-rw-r--r--src/daq/s5proxy/src/s5proxy/stats.go74
-rwxr-xr-xsrc/daq/s5proxy/test3
6 files changed, 72 insertions, 33 deletions
diff --git a/src/daq/s5proxy/sample.json b/src/daq/s5proxy/sample.json
index b1abcdc..d45a4d5 100644
--- a/src/daq/s5proxy/sample.json
+++ b/src/daq/s5proxy/sample.json
@@ -14,7 +14,7 @@
{ "op": "time", "header": "Expires", "value": "-1s" }
],
"sfive": {
- "socket": "/run/sfive/pipegram",
+ "socket": "../../hub/test/pipegram",
"hostname": "public1",
"tags": [ "hello", "world" ],
"duration": "15s",
diff --git a/src/daq/s5proxy/src/s5proxy/config.go b/src/daq/s5proxy/src/s5proxy/config.go
index 280b9fd..1fc721d 100644
--- a/src/daq/s5proxy/src/s5proxy/config.go
+++ b/src/daq/s5proxy/src/s5proxy/config.go
@@ -12,7 +12,7 @@
// live and recorded data.
//
//
-// Copyright (C) 2014-2016 Christian Pointner <equinox@spreadspace.org>
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
// Markus Grüneis <gimpf@gimpf.org>
//
// This file is part of sfive.
diff --git a/src/daq/s5proxy/src/s5proxy/main.go b/src/daq/s5proxy/src/s5proxy/main.go
index fd57e42..8871c71 100644
--- a/src/daq/s5proxy/src/s5proxy/main.go
+++ b/src/daq/s5proxy/src/s5proxy/main.go
@@ -12,7 +12,7 @@
// live and recorded data.
//
//
-// Copyright (C) 2014-2016 Christian Pointner <equinox@spreadspace.org>
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
// Markus Grüneis <gimpf@gimpf.org>
//
// This file is part of sfive.
diff --git a/src/daq/s5proxy/src/s5proxy/proxy.go b/src/daq/s5proxy/src/s5proxy/proxy.go
index b5c7a72..6ef1742 100644
--- a/src/daq/s5proxy/src/s5proxy/proxy.go
+++ b/src/daq/s5proxy/src/s5proxy/proxy.go
@@ -12,7 +12,7 @@
// live and recorded data.
//
//
-// Copyright (C) 2014-2016 Christian Pointner <equinox@spreadspace.org>
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
// Markus Grüneis <gimpf@gimpf.org>
//
// This file is part of sfive.
@@ -34,6 +34,7 @@ package main
import (
"crypto/tls"
+ "net"
"net/http"
"net/http/httputil"
"net/url"
@@ -53,8 +54,9 @@ func generateTime(input string) string {
type s5proxyResponseWriter struct {
wrapped http.ResponseWriter
conf *Config
- StatsCh chan<- *ClientData
+ StatsCh chan<- *Client
IP string
+ Port int
UserAgent string
}
@@ -65,7 +67,7 @@ func (r s5proxyResponseWriter) Header() http.Header {
func (r s5proxyResponseWriter) Write(data []byte) (int, error) {
sent, err := r.wrapped.Write(data)
if r.StatsCh != nil {
- stats := &ClientData{IP: r.IP, UserAgent: r.UserAgent, BytesSent: uint(sent)}
+ stats := &Client{IP: r.IP, Port: uint(r.Port), UserAgent: r.UserAgent, BytesSent: uint(sent)}
select {
case r.StatsCh <- stats:
default:
@@ -95,10 +97,16 @@ func (r s5proxyResponseWriter) WriteHeader(status int) {
func proxyHandler(p *Proxy, w http.ResponseWriter, r *http.Request) {
pw := s5proxyResponseWriter{wrapped: w, conf: p.conf}
if p.stats != nil {
- //pw.IP, _, _ = net.SplitHostPort(r.RemoteAddr)
- pw.IP = r.RemoteAddr // Use IP:Port for now since IP alone might not be unique
- pw.UserAgent = r.UserAgent()
- pw.StatsCh = p.stats.GetUpdateChannel(r.URL.String())
+ if ip, port, err := net.SplitHostPort(r.RemoteAddr); err != nil {
+ s5l.Printf("PROXY: client '%s' error: invalid address/port info: %v, no statistics will be gathered", r.RemoteAddr, err)
+ } else {
+ pw.IP = ip
+ if pw.Port, err = net.LookupPort("tcp", port); err != nil {
+ s5l.Printf("PROXY: client '%s' warning: can't parse port or service name: %v", r.RemoteAddr, err)
+ }
+ pw.UserAgent = r.UserAgent()
+ pw.StatsCh = p.stats.GetUpdateChannel(r.URL.String())
+ }
}
p.proxy.ServeHTTP(pw, r)
diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go
index 803dfa3..078c1ad 100644
--- a/src/daq/s5proxy/src/s5proxy/stats.go
+++ b/src/daq/s5proxy/src/s5proxy/stats.go
@@ -12,7 +12,7 @@
// live and recorded data.
//
//
-// Copyright (C) 2014-2016 Christian Pointner <equinox@spreadspace.org>
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
// Markus Grüneis <gimpf@gimpf.org>
//
// This file is part of sfive.
@@ -54,43 +54,71 @@ type Stream struct {
Quality string `json:"quality"`
}
-type ClientData struct {
+type Source struct {
+ Hostname string `json:"hostname"`
+ Stream Stream `json:"stream"`
+ Tags []string `json:"tags,omitempty"`
+}
+
+type GeoInfo struct {
+ CountryName string `json:"country,omitempty"`
+ CountryCode2 string `json:"country-code2,omitempty"`
+ RegionName string `json:"region,omitempty"`
+ RegionCode string `json:"region-code,omitempty"`
+ CityName string `json:"city,omitempty"`
+ Latitude float64 `json:"latitude,omitempty"`
+ Longitude float64 `json:"longitude,omitempty"`
+}
+
+type Client struct {
IP string `json:"ip"`
- UserAgent string `json:"user-agent"`
- BytesSent uint `json:"bytes-sent"`
+ Port uint `json:"port,omitempty"`
+ UserAgent string `json:"user-agent,omitempty"`
+ BytesSent uint `json:"bytes-sent,omitempty"`
+ GeoInfo
}
type UpdateData struct {
- ClientCount uint `json:"client-count"`
- BytesReceived uint `json:"bytes-received"`
- BytesSent uint `json:"bytes-sent"`
- Clients []ClientData `json:"clients,omitempty"`
+ ClientCount uint `json:"client-count"`
+ BytesReceived uint `json:"bytes-received,omitempty"`
+ BytesSent uint `json:"bytes-sent"`
+ Clients []Client `json:"clients,omitempty"`
}
type Update struct {
- Version uint `json:"version"`
- Hostname string `json:"hostname"`
- Stream Stream `json:"stream"`
- Tags []string `json:"tags,omitempty"`
StartTime time.Time `json:"start-time"`
Duration int64 `json:"duration-ms"`
Data UpdateData `json:"data"`
}
+type Header struct {
+ Version uint `json:"version,omitempty"` // omitempty is needed for data only messages and for REST API
+ SourceHubUUID string `json:"SourceHubUuid,omitempty"`
+ SourceHubUpdateID int `json:"SourceHubUpdateId,omitempty"`
+ ForwardHubUUID string `json:"ForwardHubUuid,omitempty"`
+ ForwardHubUpdateID int `json:"ForwardHubUpdateId,omitempty"`
+}
+
+type UpdateFull struct {
+ Header
+ Source
+ Update
+}
+
type StatsWorker struct {
stream Stream
- current map[string]*ClientData
+ current map[string]*Client
trigger chan time.Time
- output chan<- *Update
- input chan *ClientData
+ output chan<- *UpdateFull
+ input chan *Client
}
-func NewStatsWorker(stream Stream, updates chan<- *Update) (sw StatsWorker) {
+func NewStatsWorker(stream Stream, updates chan<- *UpdateFull) (sw StatsWorker) {
sw.stream = stream
- sw.current = make(map[string]*ClientData)
+ sw.current = make(map[string]*Client)
sw.trigger = make(chan time.Time)
sw.output = updates
- sw.input = make(chan *ClientData, 100)
+ sw.input = make(chan *Client, 100)
return
}
@@ -99,7 +127,7 @@ func (sw StatsWorker) Run() {
select {
case t := <-sw.trigger:
if t.UnixNano() != 0 {
- upd := &Update{Stream: sw.stream, StartTime: t}
+ upd := &UpdateFull{Source: Source{Stream: sw.stream}, Update: Update{StartTime: t}}
upd.Data.ClientCount = uint(len(sw.current))
for _, c := range sw.current {
upd.Data.Clients = append(upd.Data.Clients, *c)
@@ -111,7 +139,7 @@ func (sw StatsWorker) Run() {
s5l.Printf("STATS: worker(%v): writing to output channel would block, dropping data update...", sw.stream)
}
}
- sw.current = make(map[string]*ClientData)
+ sw.current = make(map[string]*Client)
case upd := <-sw.input:
if data, exists := sw.current[upd.IP]; exists {
data.BytesSent += upd.BytesSent
@@ -126,11 +154,11 @@ type Stats struct {
conf *Config
sock net.Conn
dataEncoder *json.Encoder
- updates chan *Update
+ updates chan *UpdateFull
workers map[string]StatsWorker
}
-func (s *Stats) GetUpdateChannel(url string) chan<- *ClientData {
+func (s *Stats) GetUpdateChannel(url string) chan<- *Client {
s5l.Printf("STATS: got new client for url: %s", url)
for name, worker := range s.workers {
if strings.Contains(url, name) {
@@ -191,7 +219,7 @@ func NewStats(conf *Config) (s *Stats, err error) {
return
}
- s.updates = make(chan *Update, 1000)
+ s.updates = make(chan *UpdateFull, 1000)
s.workers = make(map[string]StatsWorker)
var content, format, quality []string
for _, desc := range conf.SFive.Stream {
diff --git a/src/daq/s5proxy/test b/src/daq/s5proxy/test
new file mode 100755
index 0000000..f27f060
--- /dev/null
+++ b/src/daq/s5proxy/test
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+exec bin/s5proxy -config sample.json