summaryrefslogtreecommitdiff
path: root/src/hub
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-04-27 06:04:45 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-04-27 06:04:45 +0200
commit86773a81c7e609cb04c77da71175247a7ef2ae3a (patch)
tree7aaa80e0abbd4d4e37733c77718b093f73b7b256 /src/hub
parentdisable sleep during forward (diff)
remove vizualization
minor name refactoring
Diffstat (limited to 'src/hub')
-rw-r--r--src/hub/src/spreadspace.org/sfive-hub/s5hub.go31
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go7
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go18
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go45
-rwxr-xr-xsrc/hub/test-srv2
5 files changed, 46 insertions, 57 deletions
diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
index 9589812..6c6463b 100644
--- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
+++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
@@ -26,7 +26,6 @@ func main() {
piwikSiteURL := flag.String("piwik-site-url", "", "use this base url for the site")
piwikSiteID := flag.Uint("piwik-site-id", 1, "use this site-id for piwik")
piwikToken := flag.String("piwik-token", "", "the auth token for piwik")
- vizAppDir := flag.String("viz-dir", "/usr/share/sfive/viz", "base-path to the viz application")
help := flag.Bool("help", false, "show usage")
flag.Parse()
@@ -37,11 +36,11 @@ func main() {
return
}
- server, err := sfive.NewServer(*db)
+ srv, err := sfive.NewServer(*db)
if err != nil {
s5hl.Fatalf("failed to initialize: %v", err)
}
- defer server.Close()
+ defer srv.Close()
var wg sync.WaitGroup
@@ -50,7 +49,7 @@ func main() {
go func() {
defer wg.Done()
s5hl.Printf("start pipe at %v\n", *pipe)
- server.ServePipe(*pipe)
+ srv.ServePipe(*pipe)
s5hl.Println("pipe finished")
}()
}
@@ -59,8 +58,8 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
- s5hl.Printf("start pipegram at %v\n", *ppipe)
- server.ServeGramPipe(*ppipe)
+ s5hl.Printf("starting pipegram at %v\n", *ppipe)
+ srv.ServePipegram(*ppipe)
s5hl.Println("pipegram finished")
}()
}
@@ -69,8 +68,8 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
- s5hl.Println("start web-srv")
- server.ServeWeb(*vizAppDir)
+ s5hl.Println("starting web")
+ srv.ServeWeb()
s5hl.Println("web finished")
}()
}
@@ -79,8 +78,8 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
- s5hl.Println("start forward")
- server.RunForwarding(*forward)
+ s5hl.Println("starting forward")
+ srv.RunForwarding(*forward)
s5hl.Println("forward finished")
}()
}
@@ -89,8 +88,8 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
- s5hl.Println("start elastic-search forward")
- server.RunForwardingToElasticSearch(*forwardES)
+ s5hl.Println("starting elastic-search forward")
+ srv.RunForwardingToElasticSearch(*forwardES)
s5hl.Println("elastic-search forward finished")
}()
}
@@ -99,8 +98,8 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
- s5hl.Println("start graphite forward")
- server.RunForwardingToGraphite(*forwardGraphite, *graphiteBasePath)
+ s5hl.Println("starting graphite forward")
+ srv.RunForwardingToGraphite(*forwardGraphite, *graphiteBasePath)
s5hl.Println("graphite forward finished")
}()
}
@@ -109,8 +108,8 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
- s5hl.Println("start piwik forward")
- server.RunForwardingToPiwik(*forwardPiwik, *piwikSiteURL, *piwikSiteID, *piwikToken)
+ s5hl.Println("starting piwik forward")
+ srv.RunForwardingToPiwik(*forwardPiwik, *piwikSiteURL, *piwikSiteID, *piwikToken)
s5hl.Println("piwik forward finished")
}()
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 26eb0ac..c197dad 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -65,12 +65,12 @@ func (srv Server) appendActor() {
}
}
if err != nil {
- s5l.Printf("failed to store data: %v\n", err)
+ s5l.Printf("server: failed to store data: %v\n", err)
}
case token := <-srv.appendManyData:
err := srv.store.AppendMany(token.data)
if err != nil {
- s5l.Printf("failed to store many data: %v\n", err)
+ s5l.Printf("server: failed to store many data: %v\n", err)
token.response <- false
} else {
token.response <- true
@@ -112,6 +112,7 @@ func (srv Server) getLastUpdateIdInvoke() (int, error) {
}
func (srv Server) Close() {
+ s5l.Printf("server: shutting down\n")
srv.quit <- true
<-srv.done
close(srv.quit)
@@ -122,6 +123,7 @@ func (srv Server) Close() {
close(srv.getHubIdChan)
close(srv.getLastUpdateIdChan)
srv.store.Close()
+ s5l.Printf("server: finished\n")
}
func NewServer(dbPath string) (server *Server, err error) {
@@ -140,5 +142,6 @@ func NewServer(dbPath string) (server *Server, err error) {
server.getHubIdChan = make(chan getHubIdToken, 1)
server.getLastUpdateIdChan = make(chan getLastUpdateIdToken, 1)
go server.appendActor()
+ s5l.Printf("server: initialized\n")
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 9744e38..95904a7 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -6,7 +6,7 @@ import (
"net"
)
-func (srv Server) handleConnection(conn net.Conn) {
+func (srv Server) pipeHandle(conn net.Conn) {
reader := bufio.NewReader(conn)
buffer, err := reader.ReadBytes('\n')
if err != nil {
@@ -30,8 +30,6 @@ func (srv Server) handleConnection(conn net.Conn) {
return
}
- // s5l.Printf("msg: %v", string(buffer))
-
value, err := marshaller.Decode(buffer)
if err != nil {
s5l.Printf("pipe: failed to decode message: %v\n", err)
@@ -42,13 +40,13 @@ func (srv Server) handleConnection(conn net.Conn) {
}
}
-func (srv Server) handlePacketConn(pconn net.PacketConn) {
+func (srv Server) pipegramHandle(pconn net.PacketConn) {
decoder := NewPlainDecoder()
buffer := make([]byte, 64*1024)
for {
n, _, err := pconn.ReadFrom(buffer)
if err != nil {
- s5l.Printf("p-pipe: failed read: %v", err)
+ s5l.Printf("pipegram: failed read: %v", err)
continue
}
data := buffer[0:n]
@@ -56,7 +54,7 @@ func (srv Server) handlePacketConn(pconn net.PacketConn) {
if err == nil {
srv.appendData <- value
} else {
- s5l.Printf("p-pipe: failed to decode message: %v\n", err)
+ s5l.Printf("pipegram: failed to decode message: %v\n", err)
}
}
}
@@ -76,17 +74,17 @@ func (srv Server) ServePipe(pipePath string) {
// ignore
continue
}
- go srv.handleConnection(conn)
+ go srv.pipeHandle(conn)
}
}
-func (srv Server) ServeGramPipe(pipePath string) {
+func (srv Server) ServePipegram(pipePath string) {
pconn, err := net.ListenPacket("unixgram", pipePath)
if err != nil {
- s5l.Printf("p-pipe: failed to listen: %v", err)
+ s5l.Printf("pipegram: failed to listen: %v", err)
return
}
defer pconn.Close()
- srv.handlePacketConn(pconn)
+ srv.pipegramHandle(pconn)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 12e6ccb..0aad5ba 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -5,19 +5,18 @@ import (
"fmt"
"io/ioutil"
"net/http"
- "os"
"strconv"
"github.com/zenazn/goji"
"github.com/zenazn/goji/web"
)
-func (srv Server) healthz(c web.C, w http.ResponseWriter, r *http.Request) {
+func (srv Server) webHealthz(c web.C, w http.ResponseWriter, r *http.Request) {
// TODO: do a more sophisticated check
fmt.Fprintf(w, "%s\n", srv.store.GetStoreId())
}
-func (srv Server) getSourcesList(c web.C, w http.ResponseWriter, r *http.Request) {
+func (srv Server) webGetSourcesList(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "sources"
values, err := srv.store.GetSources()
if err != nil {
@@ -32,7 +31,7 @@ func (srv Server) getSourcesList(c web.C, w http.ResponseWriter, r *http.Request
fmt.Fprintf(w, "%s", jsonString)
}
-func (srv Server) getSource(c web.C, w http.ResponseWriter, r *http.Request) {
+func (srv Server) webGetSource(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "source"
id, err := strconv.ParseInt(c.URLParams["id"], 10, 64)
if err != nil {
@@ -56,7 +55,7 @@ func (srv Server) getSource(c web.C, w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "%s", jsonString)
}
-func (srv Server) getUpdateList(c web.C, w http.ResponseWriter, r *http.Request) {
+func (srv Server) webGetUpdateList(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "updates"
values, err := srv.getUpdatesAfterInvoke(-1, 3) // TODO: get start and limit from param
if err != nil {
@@ -71,7 +70,7 @@ func (srv Server) getUpdateList(c web.C, w http.ResponseWriter, r *http.Request)
fmt.Fprintf(w, "%s", jsonString)
}
-func (srv Server) getUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
+func (srv Server) webGetUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "update"
id, err := strconv.ParseInt(c.URLParams["id"], 10, 64)
if err != nil {
@@ -95,7 +94,7 @@ func (srv Server) getUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "%s", jsonString)
}
-func (srv Server) postUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
+func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "update"
decoder := NewPlainDecoder()
@@ -133,7 +132,7 @@ func (srv Server) postUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
// TODO send response channel, wait for OK
}
-func (srv Server) getLastUpdateId(c web.C, w http.ResponseWriter, r *http.Request) {
+func (srv Server) webGetLastUpdateId(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "lastupdateid"
value, err := srv.store.GetLastUpdateId()
if err != nil {
@@ -143,7 +142,7 @@ func (srv Server) getLastUpdateId(c web.C, w http.ResponseWriter, r *http.Reques
fmt.Fprintf(w, "%d", value)
}
-func (srv Server) getLastUpdateIdForUuid(c web.C, w http.ResponseWriter, r *http.Request) {
+func (srv Server) webGetLastUpdateIdForUuid(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "lastupdateid"
id := c.URLParams["id"]
value, err := srv.store.GetLastUpdateForUuid(id)
@@ -154,24 +153,14 @@ func (srv Server) getLastUpdateIdForUuid(c web.C, w http.ResponseWriter, r *http
fmt.Fprintf(w, "%d", value)
}
-func (srv Server) ServeWeb(vizAppLocation string) {
- if _, err := os.Stat(vizAppLocation); err != nil {
- if os.IsNotExist(err) {
- s5l.Panicf("web: viz-app at %s does not exist.", vizAppLocation)
- } else {
- s5l.Printf("web: failed to stat %s: %v", vizAppLocation, err)
- }
- }
-
- goji.Get("/healthz", srv.healthz)
- goji.Get("/sources", srv.getSourcesList)
- goji.Get("/sources/:id", srv.getSource)
- goji.Get("/updates", srv.getUpdateList)
- goji.Get("/updates/:id", srv.getUpdate)
- goji.Post("/updates", srv.postUpdate)
- goji.Get("/lastupdate", srv.getLastUpdateId)
- goji.Get("/lastupdate/:id", srv.getLastUpdateIdForUuid)
- goji.Handle("/viz/*", http.StripPrefix("/viz/", http.FileServer(http.Dir(vizAppLocation))))
-
+func (srv Server) ServeWeb() {
+ goji.Get("/healthz", srv.webHealthz)
+ goji.Get("/sources", srv.webGetSourcesList)
+ goji.Get("/sources/:id", srv.webGetSource)
+ goji.Get("/updates", srv.webGetUpdateList)
+ goji.Get("/updates/:id", srv.webGetUpdate)
+ goji.Post("/updates", srv.webPostUpdate)
+ goji.Get("/lastupdate", srv.webGetLastUpdateId)
+ goji.Get("/lastupdate/:id", srv.webGetLastUpdateIdForUuid)
goji.Serve()
}
diff --git a/src/hub/test-srv b/src/hub/test-srv
index 803ac37..19a6539 100755
--- a/src/hub/test-srv
+++ b/src/hub/test-srv
@@ -5,4 +5,4 @@ TEST_DB="$TEST_D/db.bolt"
mkdir -p "$TEST_D"
rm -f "$TEST_D/pipe" "$TEST_D/pipegram"
-exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -viz-dir "$(pwd)/../viz" -bind=":8000"
+exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -bind=":8000"