summaryrefslogtreecommitdiff
path: root/src/hub
diff options
context:
space:
mode:
authorMarkus Grüneis <gimpf@gimpf.org>2014-10-22 22:58:21 +0200
committerMarkus Grüneis <gimpf@gimpf.org>2014-10-22 22:58:57 +0200
commit08d29755d741920e71ef347e2fa3d5fdee0fd2b2 (patch)
tree8555e95b9ca2a2445b5a271efaf2e6e0a0eb369d /src/hub
parenthub: add stub for forwarding (diff)
hub: add support for datagram unix-sockets
Diffstat (limited to 'src/hub')
-rw-r--r--src/hub/src/spreadspace.org/sfive-hub/s5hub.go14
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5log.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go38
-rwxr-xr-xsrc/hub/test-client5
-rwxr-xr-xsrc/hub/test-srv2
5 files changed, 53 insertions, 8 deletions
diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
index 40cfcc6..3f41fea 100644
--- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
+++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
@@ -9,12 +9,14 @@ import (
"sync"
)
-var s5hl = log.New(os.Stderr, "s5hub", log.LstdFlags)
+var s5hl = log.New(os.Stderr, "[s5hub]\t", log.LstdFlags)
func main() {
db := flag.String("db", "/var/lib/sfive/db.sqlite", "path to the sqlite3 database file")
pipe := flag.String("pipe", "/var/run/sfive/pipe", "path to the unix pipe for the pipeserver")
+ ppipe := flag.String("pipegram", "/var/run/sfive/pipegram", "path to the unix datagram pipe for the pipeserver")
startPipe := flag.Bool("start-pipe-server", true, "start a connection oriented pipe server; see option pipe")
+ startGramPipe := flag.Bool("start-pipegram-server", true, "start a datagram oriented pipe server; see option pipegram")
startWeb := flag.Bool("start-web-server", true, "start a webserver")
forward := flag.String("forward-url", "", "forward to another sfive-server with http server at base-url")
@@ -39,6 +41,16 @@ func main() {
}()
}
+ if *startGramPipe {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ s5hl.Printf("start pipegram at %v\n", *ppipe)
+ server.ServeGramPipe(*ppipe)
+ s5hl.Println("pipegram finished")
+ }()
+ }
+
if *startWeb {
wg.Add(1)
go func() {
diff --git a/src/hub/src/spreadspace.org/sfive/s5log.go b/src/hub/src/spreadspace.org/sfive/s5log.go
index fea0b10..679718b 100644
--- a/src/hub/src/spreadspace.org/sfive/s5log.go
+++ b/src/hub/src/spreadspace.org/sfive/s5log.go
@@ -5,4 +5,4 @@ import (
"os"
)
-var s5l = log.New(os.Stderr, "s5", log.LstdFlags)
+var s5l = log.New(os.Stderr, "[s5]\t", log.LstdFlags)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 0bce7a6..8084461 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -9,19 +9,19 @@ func (self StatsSinkServer) handleConnection(conn net.Conn) {
reader := bufio.NewReader(conn)
buffer, err := reader.ReadBytes('\n')
if err != nil {
- s5l.Printf("failed to read from connection: %v\n", err)
+ s5l.Printf("pipe: failed to read from connection: %v\n", err)
return
}
marshaller, err := NewStatefulDecoder(buffer)
if err != nil {
- s5l.Printf("failed initializing decoder with init message: %v\n", err)
+ s5l.Printf("pipe: failed initializing decoder with init message: %v\n", err)
return
}
for {
buffer, err := reader.ReadBytes('\n')
if err != nil {
- s5l.Printf("failed to read from connection: %v\n", err)
+ s5l.Printf("pipe: failed to read from connection: %v\n", err)
return
}
@@ -29,7 +29,7 @@ func (self StatsSinkServer) handleConnection(conn net.Conn) {
value, err := marshaller.Decode(buffer)
if err != nil {
- s5l.Printf("failed to decode message: %v\n", err)
+ s5l.Printf("pipe: failed to decode message: %v\n", err)
continue
}
@@ -37,6 +37,25 @@ func (self StatsSinkServer) handleConnection(conn net.Conn) {
}
}
+func (self StatsSinkServer) handlePacketConn(pconn net.PacketConn) {
+ decoder := NewPlainDecoder()
+ buffer := make([]byte, 64*1024)
+ for {
+ n, _, err := pconn.ReadFrom(buffer)
+ if err != nil {
+ s5l.Printf("p-pipe: failed read: %v", err)
+ continue
+ }
+ data := buffer[0:n]
+ value, err := decoder.Decode(data)
+ if err == nil {
+ self.appendData <- value
+ } else {
+ s5l.Printf("p-pipe: failed to decode message: %v\n", err)
+ }
+ }
+}
+
func (self StatsSinkServer) ServePipe(pipePath string) {
ln, err := net.Listen("unix", pipePath)
if err != nil {
@@ -55,3 +74,14 @@ func (self StatsSinkServer) ServePipe(pipePath string) {
go self.handleConnection(conn)
}
}
+
+func (self StatsSinkServer) ServeGramPipe(pipePath string) {
+ pconn, err := net.ListenPacket("unixgram", pipePath)
+ if err != nil {
+ s5l.Printf("p-pipe: failed to listen: %v", err)
+ return
+ }
+ defer pconn.Close()
+
+ self.handlePacketConn(pconn)
+}
diff --git a/src/hub/test-client b/src/hub/test-client
index 0696bc5..a26ce75 100755
--- a/src/hub/test-client
+++ b/src/hub/test-client
@@ -1,7 +1,10 @@
#!/bin/sh
-echo import sample.json
+echo pipe: import sample.json
socat file:../../dat/sample.json,rdonly unix-client:/run/sfive/pipe
+echo pipe-gram: import sample-gram.json
+while read x; do echo "$x" | socat stdio unix-sendto:/run/sfive/pipegram; done < ../../dat/sample-gram.json
+
echo show query result
wget -q -S -O - 'http://localhost:8000/updates?from=2013-10-24T05:00:00Z&to=2013-10-24T00:05:20Z'
diff --git a/src/hub/test-srv b/src/hub/test-srv
index ff1369d..8240868 100755
--- a/src/hub/test-srv
+++ b/src/hub/test-srv
@@ -1,4 +1,4 @@
#!/bin/sh
-rm -f /run/sfive/pipe
+rm -f /run/sfive/pipe /run/sfive/pipegram
./bin/sfive-hub -db /var/lib/sfive/db.sqlite -start-pipe-server -pipe /var/run/sfive/pipe -start-web-server