summaryrefslogtreecommitdiff
path: root/src/daq/s5proxy
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2016-10-15 04:27:48 +0200
committerChristian Pointner <equinox@spreadspace.org>2016-10-15 04:27:48 +0200
commit5f2c46fe5252a51cb565e6e6227b984b65556d88 (patch)
tree717d7283b9ed9060c345510195bac06043f5b7bb /src/daq/s5proxy
parentadded s5stats (diff)
external interface for statistics module done.
Diffstat (limited to 'src/daq/s5proxy')
-rw-r--r--src/daq/s5proxy/src/s5proxy/proxy.go29
-rw-r--r--src/daq/s5proxy/src/s5proxy/s5proxy.go19
-rw-r--r--src/daq/s5proxy/src/s5proxy/stats.go31
3 files changed, 65 insertions, 14 deletions
diff --git a/src/daq/s5proxy/src/s5proxy/proxy.go b/src/daq/s5proxy/src/s5proxy/proxy.go
index 33cbc14..26ce8f7 100644
--- a/src/daq/s5proxy/src/s5proxy/proxy.go
+++ b/src/daq/s5proxy/src/s5proxy/proxy.go
@@ -51,8 +51,11 @@ func generateTime(input string) string {
}
type s5proxyResponseWriter struct {
- wrapped http.ResponseWriter
- conf *Config
+ wrapped http.ResponseWriter
+ conf *Config
+ StatsCh chan<- ClientData
+ IP string
+ UserAgent string
}
func (r s5proxyResponseWriter) Header() http.Header {
@@ -60,7 +63,15 @@ func (r s5proxyResponseWriter) Header() http.Header {
}
func (r s5proxyResponseWriter) Write(data []byte) (int, error) {
- return r.wrapped.Write(data)
+ sent, err := r.wrapped.Write(data)
+ if r.StatsCh != nil {
+ stats := ClientData{IP: r.IP, UserAgent: r.UserAgent, BytesSent: uint(sent)}
+ select {
+ case r.StatsCh <- stats:
+ default:
+ }
+ }
+ return sent, err
}
func (r s5proxyResponseWriter) WriteHeader(status int) {
@@ -83,19 +94,27 @@ func (r s5proxyResponseWriter) WriteHeader(status int) {
func proxyHandler(p *Proxy, w http.ResponseWriter, r *http.Request) {
pw := s5proxyResponseWriter{wrapped: w, conf: p.conf}
+ if p.stats != nil {
+ //pw.IP, _, _ = net.SplitHostPort(r.RemoteAddr)
+ pw.IP = r.RemoteAddr // Use IP:Port for now since IP alone might not be unique
+ pw.UserAgent = r.UserAgent()
+ pw.StatsCh = p.stats.GetUpdateChannel(r.URL.String())
+ }
+
p.proxy.ServeHTTP(pw, r)
}
type Proxy struct {
conf *Config
+ stats *Stats
proxy *httputil.ReverseProxy
mux *http.ServeMux
srv *http.Server
}
-func NewProxy(conf *Config) (p *Proxy, err error) {
+func NewProxy(conf *Config, stats *Stats) (p *Proxy, err error) {
- p = &Proxy{conf: conf}
+ p = &Proxy{conf: conf, stats: stats}
var remote *url.URL
remote, err = url.Parse(conf.ConnectAddr)
diff --git a/src/daq/s5proxy/src/s5proxy/s5proxy.go b/src/daq/s5proxy/src/s5proxy/s5proxy.go
index 47ba8b7..6d267c8 100644
--- a/src/daq/s5proxy/src/s5proxy/s5proxy.go
+++ b/src/daq/s5proxy/src/s5proxy/s5proxy.go
@@ -60,16 +60,29 @@ func main() {
os.Exit(1)
}
+ var stats *Stats
+ // TODO: make this configurable
+ stats, err = NewStats(conf)
+
var proxy *Proxy
- proxy, err = NewProxy(conf)
+ proxy, err = NewProxy(conf, stats)
done := make(chan bool)
+ if stats != nil {
+ go func() {
+ defer func() { done <- true }()
+ s5l.Println("starting sfive-stats-module")
+ stats.Run()
+ s5l.Println("ERROR: sfive-stats-module stopped")
+ }()
+ }
+
go func() {
defer func() { done <- true }()
- s5l.Println("start proxy")
+ s5l.Println("starting HTTP-Proxy")
proxy.Run()
- s5l.Println("ERROR: proxy stopped")
+ s5l.Println("ERROR: HTTP-Proxy stopped")
}()
c := make(chan os.Signal, 1)
diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go
index 4a9c775..8c16c77 100644
--- a/src/daq/s5proxy/src/s5proxy/stats.go
+++ b/src/daq/s5proxy/src/s5proxy/stats.go
@@ -33,8 +33,8 @@
package main
import (
- "encoding/json"
- "os"
+ // "encoding/json"
+ // "os"
"time"
)
@@ -47,7 +47,7 @@ type StreamId struct {
}
type ClientData struct {
- Ip string `json:"ip"`
+ IP string `json:"ip"`
UserAgent string `json:"user-agent"`
BytesSent uint `json:"bytes-sent"`
}
@@ -71,16 +71,35 @@ type DataUpdate struct {
type Stats struct {
conf *Config
+
+ // TODO: create datastructure to hold update channels
+ updateCH chan ClientData
}
func NewStats(conf *Config) (s *Stats, err error) {
s = &Stats{conf: conf}
+
+ // TODO: create update-channels according to conf
+ s.updateCH = make(chan ClientData, 512)
return
}
func (s *Stats) Run() (err error) {
- update := DataUpdate{Version: 1, Hostname: "hugo"}
- update.Tags = append(update.Tags, "hello", "world")
- err = json.NewEncoder(os.Stdout).Encode(update)
+ // update := DataUpdate{Version: 1, Hostname: "hugo"}
+ // update.Tags = append(update.Tags, "hello", "world")
+ // err = json.NewEncoder(os.Stdout).Encode(update)
+
+ for {
+ select {
+ case upd := <-s.updateCH:
+ s5l.Printf("STATS: got client data: IP: %s, UA: %s, %d Bytes", upd.IP, upd.UserAgent, upd.BytesSent)
+ }
+ }
return
}
+
+func (s *Stats) GetUpdateChannel(url string) chan<- ClientData {
+ s5l.Printf("STATS: got new client for url: %s", url)
+
+ return s.updateCH
+}