From 04552015bf98cad1a29f45027955948627f5cc41 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Wed, 5 Jul 2017 00:02:54 +0200 Subject: server interfaces now use config structs cleanup at log messages --- src/hub/src/spreadspace.org/sfive-hub/s5hub.go | 20 +++++-------- src/hub/src/spreadspace.org/sfive/s5srv.go | 12 ++++---- src/hub/src/spreadspace.org/sfive/s5srvConf.go | 4 +-- src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 41 +++++++++++++++----------- src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 19 ++++++------ 5 files changed, 48 insertions(+), 48 deletions(-) (limited to 'src') diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go index f508dd2..016976d 100644 --- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go +++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go @@ -78,10 +78,10 @@ func main() { cfg := sfive.SrvConfig{} if *startPipe { - cfg.Interfaces.Pipe = sfive.PipeInterfaceConfig{ListenAddr: *pipe} + cfg.Interfaces.Pipe = sfive.PipeInterfaceConfig{ListenPath: *pipe} } if *startPipegram { - cfg.Interfaces.Pipegram = sfive.PipegramInterfaceConfig{ListenAddr: *pipegram} + cfg.Interfaces.Pipegram = sfive.PipegramInterfaceConfig{ListenPath: *pipegram} } if *startWeb { cfg.Interfaces.Web = sfive.WebInterfaceConfig{ListenAddr: *web} @@ -104,23 +104,19 @@ func main() { var wg sync.WaitGroup - if cfg.Interfaces.Pipe.ListenAddr != "" { + if cfg.Interfaces.Pipe.ListenPath != "" { wg.Add(1) go func() { defer wg.Done() - s5hl.Printf("start pipe at %v\n", cfg.Interfaces.Pipe.ListenAddr) - srv.ServePipe(cfg.Interfaces.Pipe.ListenAddr) - s5hl.Println("pipe finished") + srv.ServePipe(cfg.Interfaces.Pipe) }() } - if cfg.Interfaces.Pipegram.ListenAddr != "" { + if cfg.Interfaces.Pipegram.ListenPath != "" { wg.Add(1) go func() { defer wg.Done() - s5hl.Printf("starting pipegram at %v\n", cfg.Interfaces.Pipegram.ListenAddr) - srv.ServePipegram(cfg.Interfaces.Pipegram.ListenAddr) - s5hl.Println("pipegram finished") + srv.ServePipegram(cfg.Interfaces.Pipegram) }() } @@ -128,9 +124,7 @@ func main() { wg.Add(1) go func() { defer wg.Done() - s5hl.Printf("starting web at %v\n", cfg.Interfaces.Web.ListenAddr) - srv.ServeWeb(cfg.Interfaces.Web.ListenAddr) - s5hl.Println("web finished") + srv.ServeWeb(cfg.Interfaces.Web) }() } diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 579aec7..979cd83 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -67,7 +67,7 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull { if srv.geoip != nil { if info, err := srv.geoip.Lookup(client.IP); err != nil { - s5l.Printf("transform: Geo-IP lookup failed: %v", err) + s5l.Printf("server|transform: Geo-IP lookup failed: %v", err) } else { client.GeoInfo = *info } @@ -75,7 +75,7 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull { if srv.anonymization != nil { if aIP, err := srv.anonymization.Anonymize(client.IP); err != nil { - s5l.Printf("transform: anonymization failed: %v", err) + s5l.Printf("server|transform: anonymization failed: %v", err) } else { client.IP = aIP } @@ -87,13 +87,13 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull { if uint(len(update.Data.Clients)) > update.Data.ClientCount { if update.Data.ClientCount > 0 { - s5l.Printf("transform: fixing client-count: %d -> %d", update.Data.ClientCount, len(update.Data.Clients)) + s5l.Printf("server|transform: fixing client-count: %d -> %d", update.Data.ClientCount, len(update.Data.Clients)) } update.Data.ClientCount = uint(len(update.Data.Clients)) } if bytesSentTotal > update.Data.BytesSent { if update.Data.BytesSent > 0 { - s5l.Printf("transform: fixing bytes-sent: %d -> %d", update.Data.BytesSent, bytesSentTotal) + s5l.Printf("server|transform: fixing bytes-sent: %d -> %d", update.Data.BytesSent, bytesSentTotal) } update.Data.BytesSent = bytesSentTotal } @@ -159,7 +159,7 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) { err = errors.New("failed to initialize IP address anonymization: " + err.Error()) return } - s5l.Printf("using IP address anonymization: %s", srv.anonymization) + s5l.Printf("server|transform: using IP address anonymization: %s", srv.anonymization) } if cfg.Transform.GeoipDB != "" { @@ -167,7 +167,7 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) { err = errors.New("failed to initialize Geo-IP Lookup: " + err.Error()) return } - s5l.Printf("using Geo-IP Lookup: %s", srv.geoip) + s5l.Printf("server|transform: using Geo-IP Lookup: %s", srv.geoip) } srv.numWorker = runtime.NumCPU() // TODO: make this configurable diff --git a/src/hub/src/spreadspace.org/sfive/s5srvConf.go b/src/hub/src/spreadspace.org/sfive/s5srvConf.go index 6826583..c4a6158 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvConf.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvConf.go @@ -33,11 +33,11 @@ package sfive type PipeInterfaceConfig struct { - ListenAddr string `json:"listen" yaml:"listen" toml:"listen"` + ListenPath string `json:"listen" yaml:"listen" toml:"listen"` } type PipegramInterfaceConfig struct { - ListenAddr string `json:"listen" yaml:"listen" toml:"listen"` + ListenPath string `json:"listen" yaml:"listen" toml:"listen"` } type WebInterfaceConfig struct { diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 53a5068..3335878 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -51,15 +51,13 @@ func (srv Server) pipeHandle(conn net.Conn) { dec, err := NewStatefulDecoder(conn) if err != nil { - s5l.Printf("pipe: failed to read init message: %v\n", err) + s5l.Printf("server|pipe: read(init) failed: %v\n", err) return } slug := dec.Slug() - s5l.Printf("pipe: new connection: %s\n", slug) - defer func() { - s5l.Printf("pipe(%s): connection closed\n", slug) - }() + s5l.Printf("server|pipe: new connection: %s\n", slug) + defer s5l.Printf("server|pipe(%s): connection closed\n", slug) for { update, err := dec.Decode() @@ -71,33 +69,37 @@ func (srv Server) pipeHandle(conn net.Conn) { opErr, isOpErr := err.(*net.OpError) if isOpErr && opErr.Temporary() { - s5l.Printf("pipe(%s): failed to read data message: %v (temporary error)\n", slug, err) + s5l.Printf("server|pipe(%s): read(data) failed: %v (temporary error)\n", slug, err) } else { - s5l.Printf("pipe(%s): failed to read data message: %v\n", slug, err) + s5l.Printf("server|pipe(%s): read(data) failed: %v\n", slug, err) break } } if err = srv.Ingest(update); err != nil { - s5l.Printf("pipe(%s): failed to store data: %v\n", slug, err) + s5l.Printf("server|pipe(%s): storing data failed: %v\n", slug, err) // TODO: send NACK? break } // TODO: send ACK? } } -func (srv Server) ServePipe(pipePath string) { - ln, err := net.Listen("unix", pipePath) + +func (srv Server) ServePipe(cfg PipeInterfaceConfig) { + ln, err := net.Listen("unix", cfg.ListenPath) if err != nil { - s5l.Printf("pipe: failed to connect: %v", err) + s5l.Printf("server|pipe: listen() failed: %v", err) return } defer ln.Close() + s5l.Printf("server|pipe: listening on '%s'", cfg.ListenPath) + defer s5l.Println("server|pipe: interface stopped") + for { conn, err := ln.Accept() if err != nil { - s5l.Printf("pipe: failed accept: %v", err) + s5l.Printf("server|pipe: accept() failed: %v", err) // ignore continue } @@ -115,30 +117,33 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) { for { n, _, err := pconn.ReadFrom(buffer) if err != nil { - s5l.Printf("pipegram: failed read: %v", err) + s5l.Printf("pipegram: read() failed: %v", err) continue } data := buffer[0:n] update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode() if err != nil { - s5l.Printf("pipegram: failed to decode data message: %v\n", err) + s5l.Printf("pipegram: decoding data message failed: %v\n", err) continue } if err = srv.Ingest(update); err != nil { - s5l.Printf("pipegram: failed to store data: %v\n", err) + s5l.Printf("pipegram: storing data failed: %v\n", err) } } } -func (srv Server) ServePipegram(pipePath string) { - pconn, err := net.ListenPacket("unixgram", pipePath) +func (srv Server) ServePipegram(cfg PipegramInterfaceConfig) { + pconn, err := net.ListenPacket("unixgram", cfg.ListenPath) if err != nil { - s5l.Printf("pipegram: failed to listen: %v", err) + s5l.Printf("pipegram: listen() failed: %v", err) return } defer pconn.Close() + s5l.Printf("server|pipegram: listening on '%s'", cfg.ListenPath) + defer s5l.Println("server|pipegram: interface stopped") + srv.pipegramHandle(pconn) } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 64c73e4..9c9521a 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -224,12 +224,12 @@ func webUpdatesGetBulk(srv *Server, w http.ResponseWriter, r *http.Request) { enc, err := NewStatefulEncoder(w) if err != nil { - s5l.Printf("Error while sending init: %v", err) + s5l.Printf("server|web: sending response(init) failed: %v", err) return } for _, update := range updates { if err := enc.Encode(update); err != nil { - s5l.Printf("Error while sending data: %v", err) + s5l.Printf("server|web: sending response(data) failed: %v", err) return } } @@ -368,7 +368,7 @@ func sendWebResponse(w http.ResponseWriter, status int, respdata interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) if err := json.NewEncoder(w).Encode(respdata); err != nil { - s5l.Printf("Error while sending data: %v", err) + s5l.Printf("server|web: sending response failed: %v", err) } } @@ -414,14 +414,15 @@ func webRun(listener *net.TCPListener, srv *Server) (err error) { return server.Serve(tcpKeepAliveListener{listener}) } -func (srv Server) ServeWeb(addr string) { - if addr == "" { - addr = ":http" - } - ln, err := net.Listen("tcp", addr) +func (srv Server) ServeWeb(cfg WebInterfaceConfig) { + ln, err := net.Listen("tcp", cfg.ListenAddr) if err != nil { - s5l.Printf("web: failed to listen: %v", err) + s5l.Printf("server|web: listen() failed: %v", err) return } + + s5l.Printf("server|web: listening on '%s'", cfg.ListenAddr) + defer s5l.Println("server|web: interface stopped") + webRun(ln.(*net.TCPListener), &srv) } -- cgit v1.2.3