diff options
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srvPipe.go')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 41 |
1 files changed, 23 insertions, 18 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 53a5068..3335878 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -51,15 +51,13 @@ func (srv Server) pipeHandle(conn net.Conn) { dec, err := NewStatefulDecoder(conn) if err != nil { - s5l.Printf("pipe: failed to read init message: %v\n", err) + s5l.Printf("server|pipe: read(init) failed: %v\n", err) return } slug := dec.Slug() - s5l.Printf("pipe: new connection: %s\n", slug) - defer func() { - s5l.Printf("pipe(%s): connection closed\n", slug) - }() + s5l.Printf("server|pipe: new connection: %s\n", slug) + defer s5l.Printf("server|pipe(%s): connection closed\n", slug) for { update, err := dec.Decode() @@ -71,33 +69,37 @@ func (srv Server) pipeHandle(conn net.Conn) { opErr, isOpErr := err.(*net.OpError) if isOpErr && opErr.Temporary() { - s5l.Printf("pipe(%s): failed to read data message: %v (temporary error)\n", slug, err) + s5l.Printf("server|pipe(%s): read(data) failed: %v (temporary error)\n", slug, err) } else { - s5l.Printf("pipe(%s): failed to read data message: %v\n", slug, err) + s5l.Printf("server|pipe(%s): read(data) failed: %v\n", slug, err) break } } if err = srv.Ingest(update); err != nil { - s5l.Printf("pipe(%s): failed to store data: %v\n", slug, err) + s5l.Printf("server|pipe(%s): storing data failed: %v\n", slug, err) // TODO: send NACK? break } // TODO: send ACK? } } -func (srv Server) ServePipe(pipePath string) { - ln, err := net.Listen("unix", pipePath) + +func (srv Server) ServePipe(cfg PipeInterfaceConfig) { + ln, err := net.Listen("unix", cfg.ListenPath) if err != nil { - s5l.Printf("pipe: failed to connect: %v", err) + s5l.Printf("server|pipe: listen() failed: %v", err) return } defer ln.Close() + s5l.Printf("server|pipe: listening on '%s'", cfg.ListenPath) + defer s5l.Println("server|pipe: interface stopped") + for { conn, err := ln.Accept() if err != nil { - s5l.Printf("pipe: failed accept: %v", err) + s5l.Printf("server|pipe: accept() failed: %v", err) // ignore continue } @@ -115,30 +117,33 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) { for { n, _, err := pconn.ReadFrom(buffer) if err != nil { - s5l.Printf("pipegram: failed read: %v", err) + s5l.Printf("pipegram: read() failed: %v", err) continue } data := buffer[0:n] update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode() if err != nil { - s5l.Printf("pipegram: failed to decode data message: %v\n", err) + s5l.Printf("pipegram: decoding data message failed: %v\n", err) continue } if err = srv.Ingest(update); err != nil { - s5l.Printf("pipegram: failed to store data: %v\n", err) + s5l.Printf("pipegram: storing data failed: %v\n", err) } } } -func (srv Server) ServePipegram(pipePath string) { - pconn, err := net.ListenPacket("unixgram", pipePath) +func (srv Server) ServePipegram(cfg PipegramInterfaceConfig) { + pconn, err := net.ListenPacket("unixgram", cfg.ListenPath) if err != nil { - s5l.Printf("pipegram: failed to listen: %v", err) + s5l.Printf("pipegram: listen() failed: %v", err) return } defer pconn.Close() + s5l.Printf("server|pipegram: listening on '%s'", cfg.ListenPath) + defer s5l.Println("server|pipegram: interface stopped") + srv.pipegramHandle(pconn) } |