summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-05-06 20:03:51 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-05-06 20:03:51 +0200
commitd4db4c6876026a24bc420bb1ae3978310d3e71e0 (patch)
treef1c9501b8a012c2df6429566721665d73491da41
parentcleanup and minor improvements for pipe and pipegram interface (diff)
drop goji which never did much anyway
-rw-r--r--dat/sample-pipegram.json6
-rw-r--r--src/hub/Makefile1
-rw-r--r--src/hub/src/spreadspace.org/sfive-hub/s5hub.go15
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go372
-rwxr-xr-xsrc/hub/test-srv2
-rwxr-xr-xsrc/hub/test-srv-ro2
6 files changed, 229 insertions, 169 deletions
diff --git a/dat/sample-pipegram.json b/dat/sample-pipegram.json
index 09e5943..a9c919a 100644
--- a/dat/sample-pipegram.json
+++ b/dat/sample-pipegram.json
@@ -1,3 +1,3 @@
-{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["suppa", "toll"], "version": 1, "data": {"bytes-sent": 1519, "client-count": 0}, "start-time": "2013-10-21T12:30:00Z", "duration-ms": 300000}
-{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["suppa", "toll"], "version": 1, "data": {"bytes-sent": 22849, "client-count": 1}, "start-time": "2013-10-21T12:35:00Z", "duration-ms": 300000}
-{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["suppa", "toll"], "version": 1, "data": {"bytes-sent": 33100, "client-count": 1}, "start-time": "2013-10-21T12:40:00Z", "duration-ms": 300000}
+{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 1, "data": {"bytes-sent": 1519, "client-count": 0}, "start-time": "2013-10-21T12:30:00Z", "duration-ms": 300000}
+{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 1, "data": {"bytes-sent": 22849, "client-count": 1}, "start-time": "2013-10-21T12:35:00Z", "duration-ms": 300000}
+{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 1, "data": {"bytes-sent": 33100, "client-count": 1}, "start-time": "2013-10-21T12:40:00Z", "duration-ms": 300000}
diff --git a/src/hub/Makefile b/src/hub/Makefile
index 87890a4..a65f145 100644
--- a/src/hub/Makefile
+++ b/src/hub/Makefile
@@ -39,7 +39,6 @@ endif
EXECUTEABLE := sfive-hub
LIBS := "github.com/boltdb/bolt" \
- "github.com/zenazn/goji" \
"github.com/pborman/uuid" \
"github.com/equinox0815/graphite-golang"
diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
index 4028f3f..68db727 100644
--- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
+++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
@@ -47,9 +47,10 @@ func main() {
db := flag.String("db", "/var/lib/sfive/db.bolt", "path to the database file")
readOnly := flag.Bool("read-only", false, "open database in read-only mode")
pipe := flag.String("pipe", "/var/run/sfive/pipe", "path to the unix pipe for the pipeserver")
- ppipe := flag.String("pipegram", "/var/run/sfive/pipegram", "path to the unix datagram pipe for the pipeserver")
startPipe := flag.Bool("start-pipe-server", true, "start a connection oriented pipe server; see option pipe")
- startGramPipe := flag.Bool("start-pipegram-server", true, "start a datagram oriented pipe server; see option pipegram")
+ pipegram := flag.String("pipegram", "/var/run/sfive/pipegram", "path to the unix datagram pipe for the pipeserver")
+ startPipegram := flag.Bool("start-pipegram-server", true, "start a datagram oriented pipe server; see option pipegram")
+ web := flag.String("web", ":8000", "port for the webserver to listen on")
startWeb := flag.Bool("start-web-server", true, "start a webserver")
forward := flag.String("forward-url", "", "forward to another sfive-server with http server at base-url")
forwardES := flag.String("forward-es-url", "", "forward to an ElasticSearch *index* via http")
@@ -87,12 +88,12 @@ func main() {
}()
}
- if *startGramPipe {
+ if *startPipegram {
wg.Add(1)
go func() {
defer wg.Done()
- s5hl.Printf("starting pipegram at %v\n", *ppipe)
- srv.ServePipegram(*ppipe)
+ s5hl.Printf("starting pipegram at %v\n", *pipegram)
+ srv.ServePipegram(*pipegram)
s5hl.Println("pipegram finished")
}()
}
@@ -101,8 +102,8 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
- s5hl.Println("starting web")
- srv.ServeWeb()
+ s5hl.Printf("starting web at %v\n", *web)
+ srv.ServeWeb(*web)
s5hl.Println("web finished")
}()
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 4f2276e..9b5542b 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -34,189 +34,249 @@ package sfive
import (
"encoding/json"
- "fmt"
- "io"
+ // "io"
+ "net"
"net/http"
- "strconv"
-
- "github.com/zenazn/goji"
- "github.com/zenazn/goji/web"
+ "time"
+ // "strconv"
)
-func (srv Server) webHealthz(c web.C, w http.ResponseWriter, r *http.Request) {
- // TODO: do a more sophisticated check
- fmt.Fprintf(w, "%s\n", srv.GetHubId())
+type webNotFoundResponse struct {
+ Error string `json:"error,omitempty"`
}
-func (srv Server) webGetHubsList(c web.C, w http.ResponseWriter, r *http.Request) {
- const resourceName = "hubs"
- values, err := srv.store.GetHubs()
- if err != nil {
- http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
- return
- }
- jsonString, err := json.Marshal(GenericDataContainer{values})
- if err != nil {
- http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError)
- return
- }
- fmt.Fprintf(w, "%s", jsonString)
+func webNotFound(srv *Server, w http.ResponseWriter, r *http.Request) {
+ sendWebResponse(w, http.StatusNotFound, webNotFoundResponse{"not found"})
}
-func (srv Server) webGetSourcesList(c web.C, w http.ResponseWriter, r *http.Request) {
- const resourceName = "sources"
- values, err := srv.store.GetSources()
- if err != nil {
- http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
- return
- }
- jsonString, err := json.Marshal(GenericDataContainer{values})
- if err != nil {
- http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError)
- return
- }
- fmt.Fprintf(w, "%s", jsonString)
+type webHealthzResponse struct {
+ Error string `json:"error,omitempty"`
+ Status string `json:"status"`
+ HubId string `json:"hub-uuid"`
}
-func (srv Server) webGetUpdateList(c web.C, w http.ResponseWriter, r *http.Request) {
- const resourceName = "updates"
- from := 0
- if fromStr := r.FormValue("from"); fromStr != "" {
- fromInt, err := strconv.Atoi(fromStr)
- if err != nil {
- http.Error(w, fmt.Sprintf("failed to parse from field: %v", err), http.StatusBadRequest)
- return
- }
- from = fromInt
- }
+func webHealthz(srv *Server, w http.ResponseWriter, r *http.Request) {
+ resp := webHealthzResponse{}
+ resp.HubId = srv.GetHubId()
+ resp.Status = "OK" // TODO: do a more sophisticated check
+ sendWebResponse(w, http.StatusOK, resp)
+}
- limit := -1
- if limitStr := r.FormValue("limit"); limitStr != "" {
- limitInt, err := strconv.Atoi(limitStr)
- if err != nil {
- http.Error(w, fmt.Sprintf("failed to parse limit field: %v", err), http.StatusBadRequest)
- return
- }
- limit = limitInt
- }
+// func (srv Server) webGetHubsList(c web.C, w http.ResponseWriter, r *http.Request) {
+// const resourceName = "hubs"
+// values, err := srv.store.GetHubs()
+// if err != nil {
+// http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
+// return
+// }
+// jsonString, err := json.Marshal(GenericDataContainer{values})
+// if err != nil {
+// http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError)
+// return
+// }
+// fmt.Fprintf(w, "%s", jsonString)
+// }
- values, err := srv.GetUpdatesAfter(from, limit)
- if err != nil {
- http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
- return
- }
- jsonString, err := json.Marshal(GenericDataContainer{values})
- if err != nil {
- http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError)
- return
- }
- fmt.Fprintf(w, "%s", jsonString)
-}
+// func (srv Server) webGetSourcesList(c web.C, w http.ResponseWriter, r *http.Request) {
+// const resourceName = "sources"
+// values, err := srv.store.GetSources()
+// if err != nil {
+// http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
+// return
+// }
+// jsonString, err := json.Marshal(GenericDataContainer{values})
+// if err != nil {
+// http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError)
+// return
+// }
+// fmt.Fprintf(w, "%s", jsonString)
+// }
-func (srv Server) webGetUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
- const resourceName = "updates"
- id, err := strconv.ParseInt(c.URLParams["id"], 10, 64)
- if err != nil {
- http.Error(w, fmt.Sprintf("invalid id: %s: %v", resourceName, err), http.StatusBadRequest)
- return
- }
- value, err := srv.store.GetUpdate(int(id))
- if err != nil {
- if err == ErrNotFound {
- http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusNotFound)
- } else {
- http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
- }
- return
- }
- jsonString, err := json.Marshal(value)
- if err != nil {
- http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError)
- return
- }
- fmt.Fprintf(w, "%s", jsonString)
-}
+// func (srv Server) webGetUpdateList(c web.C, w http.ResponseWriter, r *http.Request) {
+// const resourceName = "updates"
+// from := 0
+// if fromStr := r.FormValue("from"); fromStr != "" {
+// fromInt, err := strconv.Atoi(fromStr)
+// if err != nil {
+// http.Error(w, fmt.Sprintf("failed to parse from field: %v", err), http.StatusBadRequest)
+// return
+// }
+// from = fromInt
+// }
-func (srv Server) webPostUpdateBulk(c web.C, w http.ResponseWriter, r *http.Request) {
- decoder, err := NewStatefulDecoder(r.Body)
- if err != nil {
- http.Error(w, fmt.Sprintf("failed to read/decode init message: %v", err), http.StatusBadRequest)
- return
- }
+// limit := -1
+// if limitStr := r.FormValue("limit"); limitStr != "" {
+// limitInt, err := strconv.Atoi(limitStr)
+// if err != nil {
+// http.Error(w, fmt.Sprintf("failed to parse limit field: %v", err), http.StatusBadRequest)
+// return
+// }
+// limit = limitInt
+// }
- data := []DataUpdateFull{}
- for {
- value, err := decoder.Decode()
- if err != nil {
- if err != io.EOF {
- http.Error(w, fmt.Sprintf("failed to read/decode update message: %v", err), http.StatusBadRequest)
- return
- }
- // TODO: check for temporary error?
- return
- }
- data = append(data, value)
- }
+// values, err := srv.GetUpdatesAfter(from, limit)
+// if err != nil {
+// http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
+// return
+// }
+// jsonString, err := json.Marshal(GenericDataContainer{values})
+// if err != nil {
+// http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError)
+// return
+// }
+// fmt.Fprintf(w, "%s", jsonString)
+// }
- if err = srv.AppendMany(data); err != nil {
- http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError)
- } else {
- fmt.Fprintf(w, "%d update(s) successfully stored.", len(data))
- }
-}
+// func (srv Server) webGetUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
+// const resourceName = "updates"
+// id, err := strconv.Atoi(c.URLParams["id"])
+// if err != nil {
+// http.Error(w, fmt.Sprintf("invalid id: %s: %v", resourceName, err), http.StatusBadRequest)
+// return
+// }
+// value, err := srv.store.GetUpdate(int(id))
+// if err != nil {
+// if err == ErrNotFound {
+// http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusNotFound)
+// } else {
+// http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
+// }
+// return
+// }
+// jsonString, err := json.Marshal(value)
+// if err != nil {
+// http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError)
+// return
+// }
+// fmt.Fprintf(w, "%s", jsonString)
+// }
-func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
- const resourceName = "updates"
+// func (srv Server) webPostUpdateBulk(c web.C, w http.ResponseWriter, r *http.Request) {
+// decoder, err := NewStatefulDecoder(r.Body)
+// if err != nil {
+// http.Error(w, fmt.Sprintf("failed to read/decode init message: %v", err), http.StatusBadRequest)
+// return
+// }
- if bulk := r.FormValue("bulk"); bulk != "" {
- srv.webPostUpdateBulk(c, w, r)
- return
- }
+// data := []DataUpdateFull{}
+// for {
+// value, err := decoder.Decode()
+// if err != nil {
+// if err != io.EOF {
+// http.Error(w, fmt.Sprintf("failed to read/decode update message: %v", err), http.StatusBadRequest)
+// return
+// }
+// // TODO: check for temporary error?
+// return
+// }
+// data = append(data, value)
+// }
- decoder := NewStatelessDecoder(r.Body)
- data, err := decoder.Decode()
- if err != nil && err != io.EOF {
- s5l.Printf("web: failed to decode: %v\n", err)
- http.Error(w, fmt.Sprintf("failed to read/decode update message: %v", err), http.StatusBadRequest)
- return
- }
+// if err = srv.AppendMany(data); err != nil {
+// http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError)
+// } else {
+// fmt.Fprintf(w, "%d update(s) successfully stored.", len(data))
+// }
+// }
- if err = srv.Append(data); err != nil {
- http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError)
- } else {
- fmt.Fprintf(w, "1 update successfully stored.")
- }
+// func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
+// const resourceName = "updates"
+
+// if bulk := r.FormValue("bulk"); bulk != "" {
+// srv.webPostUpdateBulk(c, w, r)
+// return
+// }
+
+// decoder := NewStatelessDecoder(r.Body)
+// data, err := decoder.Decode()
+// if err != nil && err != io.EOF {
+// s5l.Printf("web: failed to decode: %v\n", err)
+// http.Error(w, fmt.Sprintf("failed to read/decode update message: %v", err), http.StatusBadRequest)
+// return
+// }
+
+// if err = srv.Append(data); err != nil {
+// http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError)
+// } else {
+// fmt.Fprintf(w, "1 update successfully stored.")
+// }
+// }
+
+// func (srv Server) webGetLastUpdateId(c web.C, w http.ResponseWriter, r *http.Request) {
+// const resourceName = "lastupdate"
+// value, err := srv.store.GetLastUpdateId()
+// if err != nil {
+// http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
+// return
+// }
+// fmt.Fprintf(w, "%d", value)
+// }
+
+// func (srv Server) webGetLastUpdateIdForUuid(c web.C, w http.ResponseWriter, r *http.Request) {
+// const resourceName = "lastupdate"
+// id := c.URLParams["id"]
+// value, err := srv.store.GetLastUpdateForUuid(id)
+// if err != nil {
+// http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
+// return
+// }
+// fmt.Fprintf(w, "%d", value)
+// }
+
+func sendWebResponse(w http.ResponseWriter, status int, respdata interface{}) {
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(status)
+ encoder := json.NewEncoder(w)
+ encoder.Encode(respdata)
+}
+
+type webHandler struct {
+ srv *Server
+ H func(*Server, http.ResponseWriter, *http.Request)
+}
+
+func (wh webHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ wh.H(wh.srv, w, r)
+}
+
+// This is from golang http package - why is this not exported?
+type tcpKeepAliveListener struct {
+ *net.TCPListener
}
-func (srv Server) webGetLastUpdateId(c web.C, w http.ResponseWriter, r *http.Request) {
- const resourceName = "lastupdate"
- value, err := srv.store.GetLastUpdateId()
+func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
+ tc, err := ln.AcceptTCP()
if err != nil {
- http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
return
}
- fmt.Fprintf(w, "%d", value)
+ tc.SetKeepAlive(true)
+ tc.SetKeepAlivePeriod(180 * time.Second)
+ return tc, nil
}
-func (srv Server) webGetLastUpdateIdForUuid(c web.C, w http.ResponseWriter, r *http.Request) {
- const resourceName = "lastupdate"
- id := c.URLParams["id"]
- value, err := srv.store.GetLastUpdateForUuid(id)
+func webRun(listener *net.TCPListener, srv *Server) (err error) {
+ mux := http.NewServeMux()
+ mux.Handle("/healthz", webHandler{srv, webHealthz})
+ // mux.Handle("/hubs", webHandler{srv, webGetHubsList})
+ // mux.Handle("/sources", webHandler{srv, webGetSourcesList})
+ // mux.Handle("/updates", webHandler{srv, webGetUpdateList})
+ // mux.Handle("/lastupdate", webHandler{srv, webGetLastUpdateId})
+
+ // mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir( ..staticDir.. ))))
+ mux.Handle("/", webHandler{srv, webNotFound})
+
+ server := &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 120 * time.Second}
+ return server.Serve(tcpKeepAliveListener{listener})
+}
+
+func (srv Server) ServeWeb(addr string) {
+ if addr == "" {
+ addr = ":http"
+ }
+ ln, err := net.Listen("tcp", addr)
if err != nil {
- http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
+ s5l.Printf("web: failed to listen: %v", err)
return
}
- fmt.Fprintf(w, "%d", value)
-}
-
-func (srv Server) ServeWeb() {
- goji.Get("/healthz", srv.webHealthz)
- goji.Get("/hubs", srv.webGetHubsList)
- goji.Get("/sources", srv.webGetSourcesList)
- goji.Get("/updates", srv.webGetUpdateList)
- goji.Get("/updates/:id", srv.webGetUpdate)
- goji.Post("/updates", srv.webPostUpdate)
- goji.Get("/lastupdate", srv.webGetLastUpdateId)
- goji.Get("/lastupdate/:id", srv.webGetLastUpdateIdForUuid)
- goji.Serve()
+ webRun(ln.(*net.TCPListener), &srv)
}
diff --git a/src/hub/test-srv b/src/hub/test-srv
index 064fa3a..4f223e0 100755
--- a/src/hub/test-srv
+++ b/src/hub/test-srv
@@ -10,4 +10,4 @@ TEST_DB="$TEST_D/$1.bolt"
mkdir -p "$TEST_D"
rm -f "$TEST_D/pipe" "$TEST_D/pipegram"
-exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -bind=":8000"
+exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -web ":8000"
diff --git a/src/hub/test-srv-ro b/src/hub/test-srv-ro
index 56fe440..0f79921 100755
--- a/src/hub/test-srv-ro
+++ b/src/hub/test-srv-ro
@@ -10,4 +10,4 @@ TEST_DB="$TEST_D/$1.bolt"
mkdir -p "$TEST_D"
rm -f "$TEST_D/pipe" "$TEST_D/pipegram"
-exec ./bin/sfive-hub -db "$TEST_DB" -read-only -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -bind=":8000"
+exec ./bin/sfive-hub -db "$TEST_DB" -read-only -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -web ":8000"