diff options
-rw-r--r-- | dat/sample-pipegram.json | 6 | ||||
-rw-r--r-- | src/hub/Makefile | 1 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive-hub/s5hub.go | 15 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 372 | ||||
-rwxr-xr-x | src/hub/test-srv | 2 | ||||
-rwxr-xr-x | src/hub/test-srv-ro | 2 |
6 files changed, 229 insertions, 169 deletions
diff --git a/dat/sample-pipegram.json b/dat/sample-pipegram.json index 09e5943..a9c919a 100644 --- a/dat/sample-pipegram.json +++ b/dat/sample-pipegram.json @@ -1,3 +1,3 @@ -{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["suppa", "toll"], "version": 1, "data": {"bytes-sent": 1519, "client-count": 0}, "start-time": "2013-10-21T12:30:00Z", "duration-ms": 300000} -{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["suppa", "toll"], "version": 1, "data": {"bytes-sent": 22849, "client-count": 1}, "start-time": "2013-10-21T12:35:00Z", "duration-ms": 300000} -{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["suppa", "toll"], "version": 1, "data": {"bytes-sent": 33100, "client-count": 1}, "start-time": "2013-10-21T12:40:00Z", "duration-ms": 300000} +{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 1, "data": {"bytes-sent": 1519, "client-count": 0}, "start-time": "2013-10-21T12:30:00Z", "duration-ms": 300000} +{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 1, "data": {"bytes-sent": 22849, "client-count": 1}, "start-time": "2013-10-21T12:35:00Z", "duration-ms": 300000} +{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 1, "data": {"bytes-sent": 33100, "client-count": 1}, "start-time": "2013-10-21T12:40:00Z", "duration-ms": 300000} diff --git a/src/hub/Makefile b/src/hub/Makefile index 87890a4..a65f145 100644 --- a/src/hub/Makefile +++ b/src/hub/Makefile @@ -39,7 +39,6 @@ endif EXECUTEABLE := sfive-hub LIBS := "github.com/boltdb/bolt" \ - "github.com/zenazn/goji" \ "github.com/pborman/uuid" \ "github.com/equinox0815/graphite-golang" diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go index 4028f3f..68db727 100644 --- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go +++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go @@ -47,9 +47,10 @@ func main() { db := flag.String("db", "/var/lib/sfive/db.bolt", "path to the database file") readOnly := flag.Bool("read-only", false, "open database in read-only mode") pipe := flag.String("pipe", "/var/run/sfive/pipe", "path to the unix pipe for the pipeserver") - ppipe := flag.String("pipegram", "/var/run/sfive/pipegram", "path to the unix datagram pipe for the pipeserver") startPipe := flag.Bool("start-pipe-server", true, "start a connection oriented pipe server; see option pipe") - startGramPipe := flag.Bool("start-pipegram-server", true, "start a datagram oriented pipe server; see option pipegram") + pipegram := flag.String("pipegram", "/var/run/sfive/pipegram", "path to the unix datagram pipe for the pipeserver") + startPipegram := flag.Bool("start-pipegram-server", true, "start a datagram oriented pipe server; see option pipegram") + web := flag.String("web", ":8000", "port for the webserver to listen on") startWeb := flag.Bool("start-web-server", true, "start a webserver") forward := flag.String("forward-url", "", "forward to another sfive-server with http server at base-url") forwardES := flag.String("forward-es-url", "", "forward to an ElasticSearch *index* via http") @@ -87,12 +88,12 @@ func main() { }() } - if *startGramPipe { + if *startPipegram { wg.Add(1) go func() { defer wg.Done() - s5hl.Printf("starting pipegram at %v\n", *ppipe) - srv.ServePipegram(*ppipe) + s5hl.Printf("starting pipegram at %v\n", *pipegram) + srv.ServePipegram(*pipegram) s5hl.Println("pipegram finished") }() } @@ -101,8 +102,8 @@ func main() { wg.Add(1) go func() { defer wg.Done() - s5hl.Println("starting web") - srv.ServeWeb() + s5hl.Printf("starting web at %v\n", *web) + srv.ServeWeb(*web) s5hl.Println("web finished") }() } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 4f2276e..9b5542b 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -34,189 +34,249 @@ package sfive import ( "encoding/json" - "fmt" - "io" + // "io" + "net" "net/http" - "strconv" - - "github.com/zenazn/goji" - "github.com/zenazn/goji/web" + "time" + // "strconv" ) -func (srv Server) webHealthz(c web.C, w http.ResponseWriter, r *http.Request) { - // TODO: do a more sophisticated check - fmt.Fprintf(w, "%s\n", srv.GetHubId()) +type webNotFoundResponse struct { + Error string `json:"error,omitempty"` } -func (srv Server) webGetHubsList(c web.C, w http.ResponseWriter, r *http.Request) { - const resourceName = "hubs" - values, err := srv.store.GetHubs() - if err != nil { - http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) - return - } - jsonString, err := json.Marshal(GenericDataContainer{values}) - if err != nil { - http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError) - return - } - fmt.Fprintf(w, "%s", jsonString) +func webNotFound(srv *Server, w http.ResponseWriter, r *http.Request) { + sendWebResponse(w, http.StatusNotFound, webNotFoundResponse{"not found"}) } -func (srv Server) webGetSourcesList(c web.C, w http.ResponseWriter, r *http.Request) { - const resourceName = "sources" - values, err := srv.store.GetSources() - if err != nil { - http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) - return - } - jsonString, err := json.Marshal(GenericDataContainer{values}) - if err != nil { - http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError) - return - } - fmt.Fprintf(w, "%s", jsonString) +type webHealthzResponse struct { + Error string `json:"error,omitempty"` + Status string `json:"status"` + HubId string `json:"hub-uuid"` } -func (srv Server) webGetUpdateList(c web.C, w http.ResponseWriter, r *http.Request) { - const resourceName = "updates" - from := 0 - if fromStr := r.FormValue("from"); fromStr != "" { - fromInt, err := strconv.Atoi(fromStr) - if err != nil { - http.Error(w, fmt.Sprintf("failed to parse from field: %v", err), http.StatusBadRequest) - return - } - from = fromInt - } +func webHealthz(srv *Server, w http.ResponseWriter, r *http.Request) { + resp := webHealthzResponse{} + resp.HubId = srv.GetHubId() + resp.Status = "OK" // TODO: do a more sophisticated check + sendWebResponse(w, http.StatusOK, resp) +} - limit := -1 - if limitStr := r.FormValue("limit"); limitStr != "" { - limitInt, err := strconv.Atoi(limitStr) - if err != nil { - http.Error(w, fmt.Sprintf("failed to parse limit field: %v", err), http.StatusBadRequest) - return - } - limit = limitInt - } +// func (srv Server) webGetHubsList(c web.C, w http.ResponseWriter, r *http.Request) { +// const resourceName = "hubs" +// values, err := srv.store.GetHubs() +// if err != nil { +// http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) +// return +// } +// jsonString, err := json.Marshal(GenericDataContainer{values}) +// if err != nil { +// http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError) +// return +// } +// fmt.Fprintf(w, "%s", jsonString) +// } - values, err := srv.GetUpdatesAfter(from, limit) - if err != nil { - http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) - return - } - jsonString, err := json.Marshal(GenericDataContainer{values}) - if err != nil { - http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError) - return - } - fmt.Fprintf(w, "%s", jsonString) -} +// func (srv Server) webGetSourcesList(c web.C, w http.ResponseWriter, r *http.Request) { +// const resourceName = "sources" +// values, err := srv.store.GetSources() +// if err != nil { +// http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) +// return +// } +// jsonString, err := json.Marshal(GenericDataContainer{values}) +// if err != nil { +// http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError) +// return +// } +// fmt.Fprintf(w, "%s", jsonString) +// } -func (srv Server) webGetUpdate(c web.C, w http.ResponseWriter, r *http.Request) { - const resourceName = "updates" - id, err := strconv.ParseInt(c.URLParams["id"], 10, 64) - if err != nil { - http.Error(w, fmt.Sprintf("invalid id: %s: %v", resourceName, err), http.StatusBadRequest) - return - } - value, err := srv.store.GetUpdate(int(id)) - if err != nil { - if err == ErrNotFound { - http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusNotFound) - } else { - http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) - } - return - } - jsonString, err := json.Marshal(value) - if err != nil { - http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError) - return - } - fmt.Fprintf(w, "%s", jsonString) -} +// func (srv Server) webGetUpdateList(c web.C, w http.ResponseWriter, r *http.Request) { +// const resourceName = "updates" +// from := 0 +// if fromStr := r.FormValue("from"); fromStr != "" { +// fromInt, err := strconv.Atoi(fromStr) +// if err != nil { +// http.Error(w, fmt.Sprintf("failed to parse from field: %v", err), http.StatusBadRequest) +// return +// } +// from = fromInt +// } -func (srv Server) webPostUpdateBulk(c web.C, w http.ResponseWriter, r *http.Request) { - decoder, err := NewStatefulDecoder(r.Body) - if err != nil { - http.Error(w, fmt.Sprintf("failed to read/decode init message: %v", err), http.StatusBadRequest) - return - } +// limit := -1 +// if limitStr := r.FormValue("limit"); limitStr != "" { +// limitInt, err := strconv.Atoi(limitStr) +// if err != nil { +// http.Error(w, fmt.Sprintf("failed to parse limit field: %v", err), http.StatusBadRequest) +// return +// } +// limit = limitInt +// } - data := []DataUpdateFull{} - for { - value, err := decoder.Decode() - if err != nil { - if err != io.EOF { - http.Error(w, fmt.Sprintf("failed to read/decode update message: %v", err), http.StatusBadRequest) - return - } - // TODO: check for temporary error? - return - } - data = append(data, value) - } +// values, err := srv.GetUpdatesAfter(from, limit) +// if err != nil { +// http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) +// return +// } +// jsonString, err := json.Marshal(GenericDataContainer{values}) +// if err != nil { +// http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError) +// return +// } +// fmt.Fprintf(w, "%s", jsonString) +// } - if err = srv.AppendMany(data); err != nil { - http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError) - } else { - fmt.Fprintf(w, "%d update(s) successfully stored.", len(data)) - } -} +// func (srv Server) webGetUpdate(c web.C, w http.ResponseWriter, r *http.Request) { +// const resourceName = "updates" +// id, err := strconv.Atoi(c.URLParams["id"]) +// if err != nil { +// http.Error(w, fmt.Sprintf("invalid id: %s: %v", resourceName, err), http.StatusBadRequest) +// return +// } +// value, err := srv.store.GetUpdate(int(id)) +// if err != nil { +// if err == ErrNotFound { +// http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusNotFound) +// } else { +// http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) +// } +// return +// } +// jsonString, err := json.Marshal(value) +// if err != nil { +// http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError) +// return +// } +// fmt.Fprintf(w, "%s", jsonString) +// } -func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request) { - const resourceName = "updates" +// func (srv Server) webPostUpdateBulk(c web.C, w http.ResponseWriter, r *http.Request) { +// decoder, err := NewStatefulDecoder(r.Body) +// if err != nil { +// http.Error(w, fmt.Sprintf("failed to read/decode init message: %v", err), http.StatusBadRequest) +// return +// } - if bulk := r.FormValue("bulk"); bulk != "" { - srv.webPostUpdateBulk(c, w, r) - return - } +// data := []DataUpdateFull{} +// for { +// value, err := decoder.Decode() +// if err != nil { +// if err != io.EOF { +// http.Error(w, fmt.Sprintf("failed to read/decode update message: %v", err), http.StatusBadRequest) +// return +// } +// // TODO: check for temporary error? +// return +// } +// data = append(data, value) +// } - decoder := NewStatelessDecoder(r.Body) - data, err := decoder.Decode() - if err != nil && err != io.EOF { - s5l.Printf("web: failed to decode: %v\n", err) - http.Error(w, fmt.Sprintf("failed to read/decode update message: %v", err), http.StatusBadRequest) - return - } +// if err = srv.AppendMany(data); err != nil { +// http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError) +// } else { +// fmt.Fprintf(w, "%d update(s) successfully stored.", len(data)) +// } +// } - if err = srv.Append(data); err != nil { - http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError) - } else { - fmt.Fprintf(w, "1 update successfully stored.") - } +// func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request) { +// const resourceName = "updates" + +// if bulk := r.FormValue("bulk"); bulk != "" { +// srv.webPostUpdateBulk(c, w, r) +// return +// } + +// decoder := NewStatelessDecoder(r.Body) +// data, err := decoder.Decode() +// if err != nil && err != io.EOF { +// s5l.Printf("web: failed to decode: %v\n", err) +// http.Error(w, fmt.Sprintf("failed to read/decode update message: %v", err), http.StatusBadRequest) +// return +// } + +// if err = srv.Append(data); err != nil { +// http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError) +// } else { +// fmt.Fprintf(w, "1 update successfully stored.") +// } +// } + +// func (srv Server) webGetLastUpdateId(c web.C, w http.ResponseWriter, r *http.Request) { +// const resourceName = "lastupdate" +// value, err := srv.store.GetLastUpdateId() +// if err != nil { +// http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) +// return +// } +// fmt.Fprintf(w, "%d", value) +// } + +// func (srv Server) webGetLastUpdateIdForUuid(c web.C, w http.ResponseWriter, r *http.Request) { +// const resourceName = "lastupdate" +// id := c.URLParams["id"] +// value, err := srv.store.GetLastUpdateForUuid(id) +// if err != nil { +// http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) +// return +// } +// fmt.Fprintf(w, "%d", value) +// } + +func sendWebResponse(w http.ResponseWriter, status int, respdata interface{}) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + encoder := json.NewEncoder(w) + encoder.Encode(respdata) +} + +type webHandler struct { + srv *Server + H func(*Server, http.ResponseWriter, *http.Request) +} + +func (wh webHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + wh.H(wh.srv, w, r) +} + +// This is from golang http package - why is this not exported? +type tcpKeepAliveListener struct { + *net.TCPListener } -func (srv Server) webGetLastUpdateId(c web.C, w http.ResponseWriter, r *http.Request) { - const resourceName = "lastupdate" - value, err := srv.store.GetLastUpdateId() +func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { + tc, err := ln.AcceptTCP() if err != nil { - http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) return } - fmt.Fprintf(w, "%d", value) + tc.SetKeepAlive(true) + tc.SetKeepAlivePeriod(180 * time.Second) + return tc, nil } -func (srv Server) webGetLastUpdateIdForUuid(c web.C, w http.ResponseWriter, r *http.Request) { - const resourceName = "lastupdate" - id := c.URLParams["id"] - value, err := srv.store.GetLastUpdateForUuid(id) +func webRun(listener *net.TCPListener, srv *Server) (err error) { + mux := http.NewServeMux() + mux.Handle("/healthz", webHandler{srv, webHealthz}) + // mux.Handle("/hubs", webHandler{srv, webGetHubsList}) + // mux.Handle("/sources", webHandler{srv, webGetSourcesList}) + // mux.Handle("/updates", webHandler{srv, webGetUpdateList}) + // mux.Handle("/lastupdate", webHandler{srv, webGetLastUpdateId}) + + // mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir( ..staticDir.. )))) + mux.Handle("/", webHandler{srv, webNotFound}) + + server := &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 120 * time.Second} + return server.Serve(tcpKeepAliveListener{listener}) +} + +func (srv Server) ServeWeb(addr string) { + if addr == "" { + addr = ":http" + } + ln, err := net.Listen("tcp", addr) if err != nil { - http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) + s5l.Printf("web: failed to listen: %v", err) return } - fmt.Fprintf(w, "%d", value) -} - -func (srv Server) ServeWeb() { - goji.Get("/healthz", srv.webHealthz) - goji.Get("/hubs", srv.webGetHubsList) - goji.Get("/sources", srv.webGetSourcesList) - goji.Get("/updates", srv.webGetUpdateList) - goji.Get("/updates/:id", srv.webGetUpdate) - goji.Post("/updates", srv.webPostUpdate) - goji.Get("/lastupdate", srv.webGetLastUpdateId) - goji.Get("/lastupdate/:id", srv.webGetLastUpdateIdForUuid) - goji.Serve() + webRun(ln.(*net.TCPListener), &srv) } diff --git a/src/hub/test-srv b/src/hub/test-srv index 064fa3a..4f223e0 100755 --- a/src/hub/test-srv +++ b/src/hub/test-srv @@ -10,4 +10,4 @@ TEST_DB="$TEST_D/$1.bolt" mkdir -p "$TEST_D" rm -f "$TEST_D/pipe" "$TEST_D/pipegram" -exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -bind=":8000" +exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -web ":8000" diff --git a/src/hub/test-srv-ro b/src/hub/test-srv-ro index 56fe440..0f79921 100755 --- a/src/hub/test-srv-ro +++ b/src/hub/test-srv-ro @@ -10,4 +10,4 @@ TEST_DB="$TEST_D/$1.bolt" mkdir -p "$TEST_D" rm -f "$TEST_D/pipe" "$TEST_D/pipegram" -exec ./bin/sfive-hub -db "$TEST_DB" -read-only -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -bind=":8000" +exec ./bin/sfive-hub -db "$TEST_DB" -read-only -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -web ":8000" |