summaryrefslogtreecommitdiff
path: root/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srvPipe.go')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go41
1 files changed, 23 insertions, 18 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 53a5068..3335878 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -51,15 +51,13 @@ func (srv Server) pipeHandle(conn net.Conn) {
dec, err := NewStatefulDecoder(conn)
if err != nil {
- s5l.Printf("pipe: failed to read init message: %v\n", err)
+ s5l.Printf("server|pipe: read(init) failed: %v\n", err)
return
}
slug := dec.Slug()
- s5l.Printf("pipe: new connection: %s\n", slug)
- defer func() {
- s5l.Printf("pipe(%s): connection closed\n", slug)
- }()
+ s5l.Printf("server|pipe: new connection: %s\n", slug)
+ defer s5l.Printf("server|pipe(%s): connection closed\n", slug)
for {
update, err := dec.Decode()
@@ -71,33 +69,37 @@ func (srv Server) pipeHandle(conn net.Conn) {
opErr, isOpErr := err.(*net.OpError)
if isOpErr && opErr.Temporary() {
- s5l.Printf("pipe(%s): failed to read data message: %v (temporary error)\n", slug, err)
+ s5l.Printf("server|pipe(%s): read(data) failed: %v (temporary error)\n", slug, err)
} else {
- s5l.Printf("pipe(%s): failed to read data message: %v\n", slug, err)
+ s5l.Printf("server|pipe(%s): read(data) failed: %v\n", slug, err)
break
}
}
if err = srv.Ingest(update); err != nil {
- s5l.Printf("pipe(%s): failed to store data: %v\n", slug, err)
+ s5l.Printf("server|pipe(%s): storing data failed: %v\n", slug, err)
// TODO: send NACK?
break
}
// TODO: send ACK?
}
}
-func (srv Server) ServePipe(pipePath string) {
- ln, err := net.Listen("unix", pipePath)
+
+func (srv Server) ServePipe(cfg PipeInterfaceConfig) {
+ ln, err := net.Listen("unix", cfg.ListenPath)
if err != nil {
- s5l.Printf("pipe: failed to connect: %v", err)
+ s5l.Printf("server|pipe: listen() failed: %v", err)
return
}
defer ln.Close()
+ s5l.Printf("server|pipe: listening on '%s'", cfg.ListenPath)
+ defer s5l.Println("server|pipe: interface stopped")
+
for {
conn, err := ln.Accept()
if err != nil {
- s5l.Printf("pipe: failed accept: %v", err)
+ s5l.Printf("server|pipe: accept() failed: %v", err)
// ignore
continue
}
@@ -115,30 +117,33 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) {
for {
n, _, err := pconn.ReadFrom(buffer)
if err != nil {
- s5l.Printf("pipegram: failed read: %v", err)
+ s5l.Printf("pipegram: read() failed: %v", err)
continue
}
data := buffer[0:n]
update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode()
if err != nil {
- s5l.Printf("pipegram: failed to decode data message: %v\n", err)
+ s5l.Printf("pipegram: decoding data message failed: %v\n", err)
continue
}
if err = srv.Ingest(update); err != nil {
- s5l.Printf("pipegram: failed to store data: %v\n", err)
+ s5l.Printf("pipegram: storing data failed: %v\n", err)
}
}
}
-func (srv Server) ServePipegram(pipePath string) {
- pconn, err := net.ListenPacket("unixgram", pipePath)
+func (srv Server) ServePipegram(cfg PipegramInterfaceConfig) {
+ pconn, err := net.ListenPacket("unixgram", cfg.ListenPath)
if err != nil {
- s5l.Printf("pipegram: failed to listen: %v", err)
+ s5l.Printf("pipegram: listen() failed: %v", err)
return
}
defer pconn.Close()
+ s5l.Printf("server|pipegram: listening on '%s'", cfg.ListenPath)
+ defer s5l.Println("server|pipegram: interface stopped")
+
srv.pipegramHandle(pconn)
}