From 32a7ba9e97d58a12ba63b7ce124695b4eb863311 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sun, 16 Oct 2016 01:23:53 +0200 Subject: stats connecto to hub works now, improved logging and error handling --- src/daq/s5proxy/.gitignore | 2 ++ src/daq/s5proxy/sample.json | 5 ++++- src/daq/s5proxy/src/s5proxy/config.go | 5 +++++ src/daq/s5proxy/src/s5proxy/proxy.go | 3 +++ src/daq/s5proxy/src/s5proxy/s5proxy.go | 26 +++++++++++++++++++------- src/daq/s5proxy/src/s5proxy/stats.go | 30 ++++++++++++++++++------------ 6 files changed, 51 insertions(+), 20 deletions(-) (limited to 'src') diff --git a/src/daq/s5proxy/.gitignore b/src/daq/s5proxy/.gitignore index c63ff13..aad55e0 100644 --- a/src/daq/s5proxy/.gitignore +++ b/src/daq/s5proxy/.gitignore @@ -2,3 +2,5 @@ /pkg *.a *.o +/*.key +/*.pem diff --git a/src/daq/s5proxy/sample.json b/src/daq/s5proxy/sample.json index 65dc768..b501382 100644 --- a/src/daq/s5proxy/sample.json +++ b/src/daq/s5proxy/sample.json @@ -12,5 +12,8 @@ { "op": "add", "header": "Cache-Control", "value": "must-revalidate" }, { "op": "add", "header": "Cache-Control", "value": "max-age=0" }, { "op": "time", "header": "Expires", "value": "-1s" } - ] + ], + "sfive": { + "socket": "/var/run/sfive/pipegram" + } } diff --git a/src/daq/s5proxy/src/s5proxy/config.go b/src/daq/s5proxy/src/s5proxy/config.go index c61a152..da7360e 100644 --- a/src/daq/s5proxy/src/s5proxy/config.go +++ b/src/daq/s5proxy/src/s5proxy/config.go @@ -92,6 +92,10 @@ type HeaderOperation struct { Value string `json:"value,omitempty"` } +type SFiveConf struct { + Sock string `json:"socket"` +} + type Config struct { ListenAddr string `json:"listen"` ConnectAddr string `json:"connect"` @@ -99,6 +103,7 @@ type Config struct { KeyFile string `json:"key"` RequestHeader []HeaderOperation `json:"request_header"` ResponseHeader []HeaderOperation `json:"response_header"` + SFive SFiveConf `json:"sfive"` } func readConfig(configfile string) (conf *Config, err error) { diff --git a/src/daq/s5proxy/src/s5proxy/proxy.go b/src/daq/s5proxy/src/s5proxy/proxy.go index 23fac28..19a6a1e 100644 --- a/src/daq/s5proxy/src/s5proxy/proxy.go +++ b/src/daq/s5proxy/src/s5proxy/proxy.go @@ -121,6 +121,8 @@ func NewProxy(conf *Config, stats *Stats) (p *Proxy, err error) { if err != nil { return } + s5l.Printf("PROXY: forwarding traffic to '%s'", remote.String()) + p.proxy = httputil.NewSingleHostReverseProxy(remote) origDir := p.proxy.Director @@ -170,5 +172,6 @@ func NewProxy(conf *Config, stats *Stats) (p *Proxy, err error) { } func (p *Proxy) Run() error { + s5l.Printf("PROXY: listening on '%s'", p.srv.Addr) return p.srv.ListenAndServeTLS(p.conf.CertFile, p.conf.KeyFile) } diff --git a/src/daq/s5proxy/src/s5proxy/s5proxy.go b/src/daq/s5proxy/src/s5proxy/s5proxy.go index 6d267c8..0eb91cf 100644 --- a/src/daq/s5proxy/src/s5proxy/s5proxy.go +++ b/src/daq/s5proxy/src/s5proxy/s5proxy.go @@ -43,7 +43,6 @@ var s5l = log.New(os.Stderr, "[s5proxy]\t", log.LstdFlags) func main() { cf := flag.String("config", "/etc/s5proxy/default.json", "path to the config file") - help := flag.Bool("help", false, "show usage") flag.Parse() @@ -61,27 +60,40 @@ func main() { } var stats *Stats - // TODO: make this configurable - stats, err = NewStats(conf) + if conf.SFive.Sock != "" { + stats, err = NewStats(conf) + if err != nil { + s5l.Printf("Error reading config file: %v\n", err) + os.Exit(1) + } + } var proxy *Proxy proxy, err = NewProxy(conf, stats) + if err != nil { + s5l.Printf("Error reading config file: %v\n", err) + os.Exit(1) + } done := make(chan bool) if stats != nil { go func() { defer func() { done <- true }() - s5l.Println("starting sfive-stats-module") - stats.Run() + s5l.Println("running sfive-stats-module") + if err := stats.Run(); err != nil { + s5l.Printf("ERROR: sfive-stats-module: %v", err) + } s5l.Println("ERROR: sfive-stats-module stopped") }() } go func() { defer func() { done <- true }() - s5l.Println("starting HTTP-Proxy") - proxy.Run() + s5l.Println("running HTTP-Proxy") + if err := proxy.Run(); err != nil { + s5l.Printf("ERROR: HTTP-Proxy: %v", err) + } s5l.Println("ERROR: HTTP-Proxy stopped") }() diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go index 8c16c77..1d12f74 100644 --- a/src/daq/s5proxy/src/s5proxy/stats.go +++ b/src/daq/s5proxy/src/s5proxy/stats.go @@ -33,8 +33,8 @@ package main import ( - // "encoding/json" - // "os" + "encoding/json" + "net" "time" ) @@ -71,13 +71,23 @@ type DataUpdate struct { type Stats struct { conf *Config - + sock net.Conn // TODO: create datastructure to hold update channels updateCH chan ClientData } +func (s *Stats) GetUpdateChannel(url string) chan<- ClientData { + s5l.Printf("STATS: got new client for url: %s", url) + + return s.updateCH +} + func NewStats(conf *Config) (s *Stats, err error) { s = &Stats{conf: conf} + if s.sock, err = net.DialTimeout("unixgram", conf.SFive.Sock, time.Second); err != nil { + return + } + s5l.Printf("STATS: connected to sfive-hub on '%s'", conf.SFive.Sock) // TODO: create update-channels according to conf s.updateCH = make(chan ClientData, 512) @@ -85,9 +95,11 @@ func NewStats(conf *Config) (s *Stats, err error) { } func (s *Stats) Run() (err error) { - // update := DataUpdate{Version: 1, Hostname: "hugo"} - // update.Tags = append(update.Tags, "hello", "world") - // err = json.NewEncoder(os.Stdout).Encode(update) + update := DataUpdate{Version: 1, Hostname: "hugo", StartTime: time.Now()} + update.Tags = append(update.Tags, "hello", "world") + if err = json.NewEncoder(s.sock).Encode(update); err != nil { + return + } for { select { @@ -97,9 +109,3 @@ func (s *Stats) Run() (err error) { } return } - -func (s *Stats) GetUpdateChannel(url string) chan<- ClientData { - s5l.Printf("STATS: got new client for url: %s", url) - - return s.updateCH -} -- cgit v1.2.3