From 5f2c46fe5252a51cb565e6e6227b984b65556d88 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sat, 15 Oct 2016 04:27:48 +0200 Subject: external interface for statistics module done. --- src/daq/s5proxy/src/s5proxy/proxy.go | 29 ++++++++++++++++++++++++----- src/daq/s5proxy/src/s5proxy/s5proxy.go | 19 ++++++++++++++++--- src/daq/s5proxy/src/s5proxy/stats.go | 31 +++++++++++++++++++++++++------ 3 files changed, 65 insertions(+), 14 deletions(-) diff --git a/src/daq/s5proxy/src/s5proxy/proxy.go b/src/daq/s5proxy/src/s5proxy/proxy.go index 33cbc14..26ce8f7 100644 --- a/src/daq/s5proxy/src/s5proxy/proxy.go +++ b/src/daq/s5proxy/src/s5proxy/proxy.go @@ -51,8 +51,11 @@ func generateTime(input string) string { } type s5proxyResponseWriter struct { - wrapped http.ResponseWriter - conf *Config + wrapped http.ResponseWriter + conf *Config + StatsCh chan<- ClientData + IP string + UserAgent string } func (r s5proxyResponseWriter) Header() http.Header { @@ -60,7 +63,15 @@ func (r s5proxyResponseWriter) Header() http.Header { } func (r s5proxyResponseWriter) Write(data []byte) (int, error) { - return r.wrapped.Write(data) + sent, err := r.wrapped.Write(data) + if r.StatsCh != nil { + stats := ClientData{IP: r.IP, UserAgent: r.UserAgent, BytesSent: uint(sent)} + select { + case r.StatsCh <- stats: + default: + } + } + return sent, err } func (r s5proxyResponseWriter) WriteHeader(status int) { @@ -83,19 +94,27 @@ func (r s5proxyResponseWriter) WriteHeader(status int) { func proxyHandler(p *Proxy, w http.ResponseWriter, r *http.Request) { pw := s5proxyResponseWriter{wrapped: w, conf: p.conf} + if p.stats != nil { + //pw.IP, _, _ = net.SplitHostPort(r.RemoteAddr) + pw.IP = r.RemoteAddr // Use IP:Port for now since IP alone might not be unique + pw.UserAgent = r.UserAgent() + pw.StatsCh = p.stats.GetUpdateChannel(r.URL.String()) + } + p.proxy.ServeHTTP(pw, r) } type Proxy struct { conf *Config + stats *Stats proxy *httputil.ReverseProxy mux *http.ServeMux srv *http.Server } -func NewProxy(conf *Config) (p *Proxy, err error) { +func NewProxy(conf *Config, stats *Stats) (p *Proxy, err error) { - p = &Proxy{conf: conf} + p = &Proxy{conf: conf, stats: stats} var remote *url.URL remote, err = url.Parse(conf.ConnectAddr) diff --git a/src/daq/s5proxy/src/s5proxy/s5proxy.go b/src/daq/s5proxy/src/s5proxy/s5proxy.go index 47ba8b7..6d267c8 100644 --- a/src/daq/s5proxy/src/s5proxy/s5proxy.go +++ b/src/daq/s5proxy/src/s5proxy/s5proxy.go @@ -60,16 +60,29 @@ func main() { os.Exit(1) } + var stats *Stats + // TODO: make this configurable + stats, err = NewStats(conf) + var proxy *Proxy - proxy, err = NewProxy(conf) + proxy, err = NewProxy(conf, stats) done := make(chan bool) + if stats != nil { + go func() { + defer func() { done <- true }() + s5l.Println("starting sfive-stats-module") + stats.Run() + s5l.Println("ERROR: sfive-stats-module stopped") + }() + } + go func() { defer func() { done <- true }() - s5l.Println("start proxy") + s5l.Println("starting HTTP-Proxy") proxy.Run() - s5l.Println("ERROR: proxy stopped") + s5l.Println("ERROR: HTTP-Proxy stopped") }() c := make(chan os.Signal, 1) diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go index 4a9c775..8c16c77 100644 --- a/src/daq/s5proxy/src/s5proxy/stats.go +++ b/src/daq/s5proxy/src/s5proxy/stats.go @@ -33,8 +33,8 @@ package main import ( - "encoding/json" - "os" + // "encoding/json" + // "os" "time" ) @@ -47,7 +47,7 @@ type StreamId struct { } type ClientData struct { - Ip string `json:"ip"` + IP string `json:"ip"` UserAgent string `json:"user-agent"` BytesSent uint `json:"bytes-sent"` } @@ -71,16 +71,35 @@ type DataUpdate struct { type Stats struct { conf *Config + + // TODO: create datastructure to hold update channels + updateCH chan ClientData } func NewStats(conf *Config) (s *Stats, err error) { s = &Stats{conf: conf} + + // TODO: create update-channels according to conf + s.updateCH = make(chan ClientData, 512) return } func (s *Stats) Run() (err error) { - update := DataUpdate{Version: 1, Hostname: "hugo"} - update.Tags = append(update.Tags, "hello", "world") - err = json.NewEncoder(os.Stdout).Encode(update) + // update := DataUpdate{Version: 1, Hostname: "hugo"} + // update.Tags = append(update.Tags, "hello", "world") + // err = json.NewEncoder(os.Stdout).Encode(update) + + for { + select { + case upd := <-s.updateCH: + s5l.Printf("STATS: got client data: IP: %s, UA: %s, %d Bytes", upd.IP, upd.UserAgent, upd.BytesSent) + } + } return } + +func (s *Stats) GetUpdateChannel(url string) chan<- ClientData { + s5l.Printf("STATS: got new client for url: %s", url) + + return s.updateCH +} -- cgit v1.2.3