From 08d29755d741920e71ef347e2fa3d5fdee0fd2b2 Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Wed, 22 Oct 2014 22:58:21 +0200 Subject: hub: add support for datagram unix-sockets --- src/hub/src/spreadspace.org/sfive-hub/s5hub.go | 14 +++++++++- src/hub/src/spreadspace.org/sfive/s5log.go | 2 +- src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 38 +++++++++++++++++++++++--- src/hub/test-client | 5 +++- src/hub/test-srv | 2 +- 5 files changed, 53 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go index 40cfcc6..3f41fea 100644 --- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go +++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go @@ -9,12 +9,14 @@ import ( "sync" ) -var s5hl = log.New(os.Stderr, "s5hub", log.LstdFlags) +var s5hl = log.New(os.Stderr, "[s5hub]\t", log.LstdFlags) func main() { db := flag.String("db", "/var/lib/sfive/db.sqlite", "path to the sqlite3 database file") pipe := flag.String("pipe", "/var/run/sfive/pipe", "path to the unix pipe for the pipeserver") + ppipe := flag.String("pipegram", "/var/run/sfive/pipegram", "path to the unix datagram pipe for the pipeserver") startPipe := flag.Bool("start-pipe-server", true, "start a connection oriented pipe server; see option pipe") + startGramPipe := flag.Bool("start-pipegram-server", true, "start a datagram oriented pipe server; see option pipegram") startWeb := flag.Bool("start-web-server", true, "start a webserver") forward := flag.String("forward-url", "", "forward to another sfive-server with http server at base-url") @@ -39,6 +41,16 @@ func main() { }() } + if *startGramPipe { + wg.Add(1) + go func() { + defer wg.Done() + s5hl.Printf("start pipegram at %v\n", *ppipe) + server.ServeGramPipe(*ppipe) + s5hl.Println("pipegram finished") + }() + } + if *startWeb { wg.Add(1) go func() { diff --git a/src/hub/src/spreadspace.org/sfive/s5log.go b/src/hub/src/spreadspace.org/sfive/s5log.go index fea0b10..679718b 100644 --- a/src/hub/src/spreadspace.org/sfive/s5log.go +++ b/src/hub/src/spreadspace.org/sfive/s5log.go @@ -5,4 +5,4 @@ import ( "os" ) -var s5l = log.New(os.Stderr, "s5", log.LstdFlags) +var s5l = log.New(os.Stderr, "[s5]\t", log.LstdFlags) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 0bce7a6..8084461 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -9,19 +9,19 @@ func (self StatsSinkServer) handleConnection(conn net.Conn) { reader := bufio.NewReader(conn) buffer, err := reader.ReadBytes('\n') if err != nil { - s5l.Printf("failed to read from connection: %v\n", err) + s5l.Printf("pipe: failed to read from connection: %v\n", err) return } marshaller, err := NewStatefulDecoder(buffer) if err != nil { - s5l.Printf("failed initializing decoder with init message: %v\n", err) + s5l.Printf("pipe: failed initializing decoder with init message: %v\n", err) return } for { buffer, err := reader.ReadBytes('\n') if err != nil { - s5l.Printf("failed to read from connection: %v\n", err) + s5l.Printf("pipe: failed to read from connection: %v\n", err) return } @@ -29,7 +29,7 @@ func (self StatsSinkServer) handleConnection(conn net.Conn) { value, err := marshaller.Decode(buffer) if err != nil { - s5l.Printf("failed to decode message: %v\n", err) + s5l.Printf("pipe: failed to decode message: %v\n", err) continue } @@ -37,6 +37,25 @@ func (self StatsSinkServer) handleConnection(conn net.Conn) { } } +func (self StatsSinkServer) handlePacketConn(pconn net.PacketConn) { + decoder := NewPlainDecoder() + buffer := make([]byte, 64*1024) + for { + n, _, err := pconn.ReadFrom(buffer) + if err != nil { + s5l.Printf("p-pipe: failed read: %v", err) + continue + } + data := buffer[0:n] + value, err := decoder.Decode(data) + if err == nil { + self.appendData <- value + } else { + s5l.Printf("p-pipe: failed to decode message: %v\n", err) + } + } +} + func (self StatsSinkServer) ServePipe(pipePath string) { ln, err := net.Listen("unix", pipePath) if err != nil { @@ -55,3 +74,14 @@ func (self StatsSinkServer) ServePipe(pipePath string) { go self.handleConnection(conn) } } + +func (self StatsSinkServer) ServeGramPipe(pipePath string) { + pconn, err := net.ListenPacket("unixgram", pipePath) + if err != nil { + s5l.Printf("p-pipe: failed to listen: %v", err) + return + } + defer pconn.Close() + + self.handlePacketConn(pconn) +} diff --git a/src/hub/test-client b/src/hub/test-client index 0696bc5..a26ce75 100755 --- a/src/hub/test-client +++ b/src/hub/test-client @@ -1,7 +1,10 @@ #!/bin/sh -echo import sample.json +echo pipe: import sample.json socat file:../../dat/sample.json,rdonly unix-client:/run/sfive/pipe +echo pipe-gram: import sample-gram.json +while read x; do echo "$x" | socat stdio unix-sendto:/run/sfive/pipegram; done < ../../dat/sample-gram.json + echo show query result wget -q -S -O - 'http://localhost:8000/updates?from=2013-10-24T05:00:00Z&to=2013-10-24T00:05:20Z' diff --git a/src/hub/test-srv b/src/hub/test-srv index ff1369d..8240868 100755 --- a/src/hub/test-srv +++ b/src/hub/test-srv @@ -1,4 +1,4 @@ #!/bin/sh -rm -f /run/sfive/pipe +rm -f /run/sfive/pipe /run/sfive/pipegram ./bin/sfive-hub -db /var/lib/sfive/db.sqlite -start-pipe-server -pipe /var/run/sfive/pipe -start-web-server -- cgit v1.2.3