summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-07-05 00:02:54 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-07-05 00:02:57 +0200
commit04552015bf98cad1a29f45027955948627f5cc41 (patch)
tree10d7cb21601034f3ffda26d195c0dc0e50a1a060
parenthub uses config structs now (diff)
server interfaces now use config structs
cleanup at log messages
-rw-r--r--src/hub/src/spreadspace.org/sfive-hub/s5hub.go20
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go12
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvConf.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go41
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go19
5 files changed, 48 insertions, 48 deletions
diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
index f508dd2..016976d 100644
--- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
+++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
@@ -78,10 +78,10 @@ func main() {
cfg := sfive.SrvConfig{}
if *startPipe {
- cfg.Interfaces.Pipe = sfive.PipeInterfaceConfig{ListenAddr: *pipe}
+ cfg.Interfaces.Pipe = sfive.PipeInterfaceConfig{ListenPath: *pipe}
}
if *startPipegram {
- cfg.Interfaces.Pipegram = sfive.PipegramInterfaceConfig{ListenAddr: *pipegram}
+ cfg.Interfaces.Pipegram = sfive.PipegramInterfaceConfig{ListenPath: *pipegram}
}
if *startWeb {
cfg.Interfaces.Web = sfive.WebInterfaceConfig{ListenAddr: *web}
@@ -104,23 +104,19 @@ func main() {
var wg sync.WaitGroup
- if cfg.Interfaces.Pipe.ListenAddr != "" {
+ if cfg.Interfaces.Pipe.ListenPath != "" {
wg.Add(1)
go func() {
defer wg.Done()
- s5hl.Printf("start pipe at %v\n", cfg.Interfaces.Pipe.ListenAddr)
- srv.ServePipe(cfg.Interfaces.Pipe.ListenAddr)
- s5hl.Println("pipe finished")
+ srv.ServePipe(cfg.Interfaces.Pipe)
}()
}
- if cfg.Interfaces.Pipegram.ListenAddr != "" {
+ if cfg.Interfaces.Pipegram.ListenPath != "" {
wg.Add(1)
go func() {
defer wg.Done()
- s5hl.Printf("starting pipegram at %v\n", cfg.Interfaces.Pipegram.ListenAddr)
- srv.ServePipegram(cfg.Interfaces.Pipegram.ListenAddr)
- s5hl.Println("pipegram finished")
+ srv.ServePipegram(cfg.Interfaces.Pipegram)
}()
}
@@ -128,9 +124,7 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
- s5hl.Printf("starting web at %v\n", cfg.Interfaces.Web.ListenAddr)
- srv.ServeWeb(cfg.Interfaces.Web.ListenAddr)
- s5hl.Println("web finished")
+ srv.ServeWeb(cfg.Interfaces.Web)
}()
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 579aec7..979cd83 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -67,7 +67,7 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull {
if srv.geoip != nil {
if info, err := srv.geoip.Lookup(client.IP); err != nil {
- s5l.Printf("transform: Geo-IP lookup failed: %v", err)
+ s5l.Printf("server|transform: Geo-IP lookup failed: %v", err)
} else {
client.GeoInfo = *info
}
@@ -75,7 +75,7 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull {
if srv.anonymization != nil {
if aIP, err := srv.anonymization.Anonymize(client.IP); err != nil {
- s5l.Printf("transform: anonymization failed: %v", err)
+ s5l.Printf("server|transform: anonymization failed: %v", err)
} else {
client.IP = aIP
}
@@ -87,13 +87,13 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull {
if uint(len(update.Data.Clients)) > update.Data.ClientCount {
if update.Data.ClientCount > 0 {
- s5l.Printf("transform: fixing client-count: %d -> %d", update.Data.ClientCount, len(update.Data.Clients))
+ s5l.Printf("server|transform: fixing client-count: %d -> %d", update.Data.ClientCount, len(update.Data.Clients))
}
update.Data.ClientCount = uint(len(update.Data.Clients))
}
if bytesSentTotal > update.Data.BytesSent {
if update.Data.BytesSent > 0 {
- s5l.Printf("transform: fixing bytes-sent: %d -> %d", update.Data.BytesSent, bytesSentTotal)
+ s5l.Printf("server|transform: fixing bytes-sent: %d -> %d", update.Data.BytesSent, bytesSentTotal)
}
update.Data.BytesSent = bytesSentTotal
}
@@ -159,7 +159,7 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) {
err = errors.New("failed to initialize IP address anonymization: " + err.Error())
return
}
- s5l.Printf("using IP address anonymization: %s", srv.anonymization)
+ s5l.Printf("server|transform: using IP address anonymization: %s", srv.anonymization)
}
if cfg.Transform.GeoipDB != "" {
@@ -167,7 +167,7 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) {
err = errors.New("failed to initialize Geo-IP Lookup: " + err.Error())
return
}
- s5l.Printf("using Geo-IP Lookup: %s", srv.geoip)
+ s5l.Printf("server|transform: using Geo-IP Lookup: %s", srv.geoip)
}
srv.numWorker = runtime.NumCPU() // TODO: make this configurable
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvConf.go b/src/hub/src/spreadspace.org/sfive/s5srvConf.go
index 6826583..c4a6158 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvConf.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvConf.go
@@ -33,11 +33,11 @@
package sfive
type PipeInterfaceConfig struct {
- ListenAddr string `json:"listen" yaml:"listen" toml:"listen"`
+ ListenPath string `json:"listen" yaml:"listen" toml:"listen"`
}
type PipegramInterfaceConfig struct {
- ListenAddr string `json:"listen" yaml:"listen" toml:"listen"`
+ ListenPath string `json:"listen" yaml:"listen" toml:"listen"`
}
type WebInterfaceConfig struct {
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 53a5068..3335878 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -51,15 +51,13 @@ func (srv Server) pipeHandle(conn net.Conn) {
dec, err := NewStatefulDecoder(conn)
if err != nil {
- s5l.Printf("pipe: failed to read init message: %v\n", err)
+ s5l.Printf("server|pipe: read(init) failed: %v\n", err)
return
}
slug := dec.Slug()
- s5l.Printf("pipe: new connection: %s\n", slug)
- defer func() {
- s5l.Printf("pipe(%s): connection closed\n", slug)
- }()
+ s5l.Printf("server|pipe: new connection: %s\n", slug)
+ defer s5l.Printf("server|pipe(%s): connection closed\n", slug)
for {
update, err := dec.Decode()
@@ -71,33 +69,37 @@ func (srv Server) pipeHandle(conn net.Conn) {
opErr, isOpErr := err.(*net.OpError)
if isOpErr && opErr.Temporary() {
- s5l.Printf("pipe(%s): failed to read data message: %v (temporary error)\n", slug, err)
+ s5l.Printf("server|pipe(%s): read(data) failed: %v (temporary error)\n", slug, err)
} else {
- s5l.Printf("pipe(%s): failed to read data message: %v\n", slug, err)
+ s5l.Printf("server|pipe(%s): read(data) failed: %v\n", slug, err)
break
}
}
if err = srv.Ingest(update); err != nil {
- s5l.Printf("pipe(%s): failed to store data: %v\n", slug, err)
+ s5l.Printf("server|pipe(%s): storing data failed: %v\n", slug, err)
// TODO: send NACK?
break
}
// TODO: send ACK?
}
}
-func (srv Server) ServePipe(pipePath string) {
- ln, err := net.Listen("unix", pipePath)
+
+func (srv Server) ServePipe(cfg PipeInterfaceConfig) {
+ ln, err := net.Listen("unix", cfg.ListenPath)
if err != nil {
- s5l.Printf("pipe: failed to connect: %v", err)
+ s5l.Printf("server|pipe: listen() failed: %v", err)
return
}
defer ln.Close()
+ s5l.Printf("server|pipe: listening on '%s'", cfg.ListenPath)
+ defer s5l.Println("server|pipe: interface stopped")
+
for {
conn, err := ln.Accept()
if err != nil {
- s5l.Printf("pipe: failed accept: %v", err)
+ s5l.Printf("server|pipe: accept() failed: %v", err)
// ignore
continue
}
@@ -115,30 +117,33 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) {
for {
n, _, err := pconn.ReadFrom(buffer)
if err != nil {
- s5l.Printf("pipegram: failed read: %v", err)
+ s5l.Printf("pipegram: read() failed: %v", err)
continue
}
data := buffer[0:n]
update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode()
if err != nil {
- s5l.Printf("pipegram: failed to decode data message: %v\n", err)
+ s5l.Printf("pipegram: decoding data message failed: %v\n", err)
continue
}
if err = srv.Ingest(update); err != nil {
- s5l.Printf("pipegram: failed to store data: %v\n", err)
+ s5l.Printf("pipegram: storing data failed: %v\n", err)
}
}
}
-func (srv Server) ServePipegram(pipePath string) {
- pconn, err := net.ListenPacket("unixgram", pipePath)
+func (srv Server) ServePipegram(cfg PipegramInterfaceConfig) {
+ pconn, err := net.ListenPacket("unixgram", cfg.ListenPath)
if err != nil {
- s5l.Printf("pipegram: failed to listen: %v", err)
+ s5l.Printf("pipegram: listen() failed: %v", err)
return
}
defer pconn.Close()
+ s5l.Printf("server|pipegram: listening on '%s'", cfg.ListenPath)
+ defer s5l.Println("server|pipegram: interface stopped")
+
srv.pipegramHandle(pconn)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 64c73e4..9c9521a 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -224,12 +224,12 @@ func webUpdatesGetBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
enc, err := NewStatefulEncoder(w)
if err != nil {
- s5l.Printf("Error while sending init: %v", err)
+ s5l.Printf("server|web: sending response(init) failed: %v", err)
return
}
for _, update := range updates {
if err := enc.Encode(update); err != nil {
- s5l.Printf("Error while sending data: %v", err)
+ s5l.Printf("server|web: sending response(data) failed: %v", err)
return
}
}
@@ -368,7 +368,7 @@ func sendWebResponse(w http.ResponseWriter, status int, respdata interface{}) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
if err := json.NewEncoder(w).Encode(respdata); err != nil {
- s5l.Printf("Error while sending data: %v", err)
+ s5l.Printf("server|web: sending response failed: %v", err)
}
}
@@ -414,14 +414,15 @@ func webRun(listener *net.TCPListener, srv *Server) (err error) {
return server.Serve(tcpKeepAliveListener{listener})
}
-func (srv Server) ServeWeb(addr string) {
- if addr == "" {
- addr = ":http"
- }
- ln, err := net.Listen("tcp", addr)
+func (srv Server) ServeWeb(cfg WebInterfaceConfig) {
+ ln, err := net.Listen("tcp", cfg.ListenAddr)
if err != nil {
- s5l.Printf("web: failed to listen: %v", err)
+ s5l.Printf("server|web: listen() failed: %v", err)
return
}
+
+ s5l.Printf("server|web: listening on '%s'", cfg.ListenAddr)
+ defer s5l.Println("server|web: interface stopped")
+
webRun(ln.(*net.TCPListener), &srv)
}