diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 18 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForward.go | 22 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go | 28 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go | 12 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go | 14 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 12 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 12 |
7 files changed, 59 insertions, 59 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 979cd83..2a312bd 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -67,7 +67,7 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull { if srv.geoip != nil { if info, err := srv.geoip.Lookup(client.IP); err != nil { - s5l.Printf("server|transform: Geo-IP lookup failed: %v", err) + s5l.Printf("srv|xfrm: Geo-IP lookup failed: %v", err) } else { client.GeoInfo = *info } @@ -75,7 +75,7 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull { if srv.anonymization != nil { if aIP, err := srv.anonymization.Anonymize(client.IP); err != nil { - s5l.Printf("server|transform: anonymization failed: %v", err) + s5l.Printf("srv|xfrm: anonymization failed: %v", err) } else { client.IP = aIP } @@ -87,13 +87,13 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull { if uint(len(update.Data.Clients)) > update.Data.ClientCount { if update.Data.ClientCount > 0 { - s5l.Printf("server|transform: fixing client-count: %d -> %d", update.Data.ClientCount, len(update.Data.Clients)) + s5l.Printf("srv|xfrm: fixing client-count: %d -> %d", update.Data.ClientCount, len(update.Data.Clients)) } update.Data.ClientCount = uint(len(update.Data.Clients)) } if bytesSentTotal > update.Data.BytesSent { if update.Data.BytesSent > 0 { - s5l.Printf("server|transform: fixing bytes-sent: %d -> %d", update.Data.BytesSent, bytesSentTotal) + s5l.Printf("srv|xfrm: fixing bytes-sent: %d -> %d", update.Data.BytesSent, bytesSentTotal) } update.Data.BytesSent = bytesSentTotal } @@ -137,14 +137,14 @@ func (srv Server) IngestMany(updates []*UpdateFull) error { } func (srv Server) Close() { - s5l.Printf("server: shutting down") + s5l.Printf("srv: shutting down") close(srv.quit) srv.done.Wait() close(srv.ingestChan) close(srv.ingestManyChan) srv.store.Close() - s5l.Printf("server: finished") + s5l.Printf("srv: finished") } func NewServer(cfg SrvConfig) (srv *Server, err error) { @@ -159,7 +159,7 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) { err = errors.New("failed to initialize IP address anonymization: " + err.Error()) return } - s5l.Printf("server|transform: using IP address anonymization: %s", srv.anonymization) + s5l.Printf("srv|xfrm: using IP address anonymization: %s", srv.anonymization) } if cfg.Transform.GeoipDB != "" { @@ -167,7 +167,7 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) { err = errors.New("failed to initialize Geo-IP Lookup: " + err.Error()) return } - s5l.Printf("server|transform: using Geo-IP Lookup: %s", srv.geoip) + s5l.Printf("srv|xfrm: using Geo-IP Lookup: %s", srv.geoip) } srv.numWorker = runtime.NumCPU() // TODO: make this configurable @@ -182,6 +182,6 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) { srv.ingestWorker(idx) }(i) } - s5l.Printf("server: started") + s5l.Printf("srv: started") return } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 2d07844..a3e5c1f 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -56,19 +56,19 @@ func fwdGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) (la var resp *http.Response resp, err = client.Get(baseUrl + "/lastupdate/" + hubUUID) if err != nil { - s5l.Printf("fwd: failed to query for lastupdate: %v\n", err) + s5l.Printf("srv|fwd: querying for lastupdate failed: %v", err) return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - s5l.Printf("fwd: server failed to fulfill query for lastupdate: %v\n", resp.StatusCode) + s5l.Printf("srv|fwd: remote hub failed to fulfill query for lastupdate: %v", resp.StatusCode) return } result := WebLastUpdateIDResponse{} if err = json.NewDecoder(resp.Body).Decode(&result); err != nil { - s5l.Printf("fwd: server failed to fulfill query for lastupdate: %v\n", err) + s5l.Printf("srv|fwd: remote hub failed to fulfill query for lastupdate: %v", err) return } @@ -81,12 +81,12 @@ func fwdWriteUpdates(updates []*UpdateFull, pw *io.PipeWriter) { enc, err := NewStatefulEncoder(pw) if err != nil { - s5l.Printf("fwd: failed encoding/sending init message: %v", err) + s5l.Printf("srv|fwd: encoding/sending init message failed: %v", err) return } for _, upd := range updates { if err := enc.Encode(upd); err != nil { - s5l.Printf("fwd: failed encoding/sending updates: %v", err) + s5l.Printf("srv|fwd: encoding/sending updates failed: %v", err) return } } @@ -116,17 +116,17 @@ tryResync: for { lastID, err := fwdGetLastUpdateID(baseUrl, client, hubUUID) if err != nil { - s5l.Printf("fwd: lastupdate returned err: %v", err) + s5l.Printf("srv|fwd: fetching lastupdate failed: %v", err) time.Sleep(5 * time.Second) continue tryResync } - s5l.Printf("fwd: lastupdate: %d", lastID) + s5l.Printf("srv|fwd: new lastupdate: %d", lastID) nextBatch: for { updates, err := srv.store.GetUpdatesAfter(lastID, 5000) if err != nil { - s5l.Printf("fwd: failed reading updates: %v", err) + s5l.Printf("srv|fwd: reading updates failed: %v", err) time.Sleep(500 * time.Millisecond) continue nextBatch } @@ -138,15 +138,15 @@ tryResync: pr, pw := io.Pipe() go fwdWriteUpdates(updates, pw) if num, err := fwdPostUpdates(client, url, pr); err != nil { - s5l.Printf("fwd: failed sending updates: %v", err) + s5l.Printf("srv|fwd: sending updates failed: %v", err) continue tryResync } else if num != len(updates) { - s5l.Printf("fwd: server acknowledged wrong number of updates: expected %d, got: %d", len(updates), num) + s5l.Printf("srv|fwd: server acknowledged wrong number of updates: expected %d, got: %d", len(updates), num) continue tryResync } lastID = findMaxID(updates) - s5l.Printf("fwd: successfully forwarded %d updates, new lastid: %d", len(updates), lastID) + s5l.Printf("srv|fwd: successfully forwarded %d updates, new lastid: %d", len(updates), lastID) } } } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go index d61c5fc..71313aa 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go @@ -51,29 +51,29 @@ func fwdEsGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) ( url := baseUrl + "/dataupdate/_search?search_type=count" queryJson := fmt.Sprintf(forwardEsLastUpdateIDJson, hubUUID) - s5dl.Printf("fwd-es: query: %s", queryJson) + s5dl.Printf("srv|fwd-es: query: %s", queryJson) var resp *http.Response resp, err = client.Post(url, "application/json", strings.NewReader(queryJson)) if err != nil { - s5l.Printf("fwd-es: failed to query for lastupdate: %v\n", err) + s5l.Printf("srv|fwd-es: querying for lastupdate failed: %v", err) return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - s5l.Printf("fwd-es: server failed to fulfill query for lastupdate: %v\n", resp.StatusCode) + s5l.Printf("srv|fwd-es: elasticsearch failed to fulfill query for lastupdate: %v", resp.StatusCode) return } var body []byte body, err = ioutil.ReadAll(resp.Body) // TODO: read from Body using json.Decoder if err != nil { - s5l.Printf("fwd-es: failed to read lastupdate response: %v\n", err) + s5l.Printf("srv|fwd-es: reading lastupdate response failed: %v", err) return } - s5dl.Printf("fwd-es: lastupdate response: %s\n", body) + s5dl.Printf("srv|fwd-es: lastupdate response: %s", body) if len(body) == 0 { lastID = -1 @@ -81,13 +81,13 @@ func fwdEsGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) ( var value interface{} err = json.Unmarshal(body, &value) if err != nil { - s5l.Printf("fwd-es: invalid lastupdate response: %v\n", err) + s5l.Printf("srv|fwd-es: invalid lastupdate response: %v", err) return } idstrcntr := value.(map[string]interface{})["aggregations"].(map[string]interface{})["last-id"].(map[string]interface{})["value"] if idstrcntr == nil { - s5l.Printf("fwd-es: we are new here\n") + s5l.Printf("srv|fwd-es: we are new here") lastID = -1 return } @@ -105,17 +105,17 @@ tryResync: for { lastID, err := fwdEsGetLastUpdateID(baseUrl, client, hubUUID) if err != nil { - s5l.Printf("fwd-es: lastupdate returned err: %v", err) + s5l.Printf("srv|fwd-es: fetching lastupdate failed: %v", err) time.Sleep(5 * time.Second) continue tryResync } - s5l.Printf("fwd-es: lastupdate: %d", lastID) + s5l.Printf("srv|fwd-es: new lastupdate: %d", lastID) nextBatch: for { updates, err := srv.store.GetUpdatesAfter(lastID, 5000) if err != nil { - s5l.Printf("fwd-es: failed reading updates: %v\n", err) + s5l.Printf("srv|fwd-es: reading updates failed: %v", err) time.Sleep(500 * time.Millisecond) continue nextBatch } @@ -129,7 +129,7 @@ tryResync: for _, update := range updates { data, err := json.Marshal(update) // TODO: move to seperate function, use StatelessEncoder and write to io.PipeWriter if err != nil { - s5l.Panicf("fwd-es: encode failed: %v\n", err) + s5l.Panicf("srv|fwd-es: encoding updates failed: %v", err) } postData.WriteString(`{"index":{"_type":"dataupdate"}}`) postData.WriteRune('\n') @@ -140,21 +140,21 @@ tryResync: // TODO: move to seperate function and use io.PipeReader as body resp, err := client.Post(url, "application/json", bytes.NewReader(postData.Bytes())) if err != nil { - s5l.Printf("fwd-es: post failed: %v\n", err) + s5l.Printf("srv|fwd-es: posting updates failed: %v", err) time.Sleep(1 * time.Second) continue tryResync } if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { body, _ := ioutil.ReadAll(resp.Body) - s5l.Printf("fwd-es: post failed: %s\n%s\n", resp.Status, body) + s5l.Printf("srv|fwd-es: posting updates failed: %s\n%s", resp.Status, body) time.Sleep(1 * time.Second) continue tryResync } resp.Body.Close() // TODO: check result from elasticsearch lastID = findMaxID(updates) - s5l.Printf("fwd-es: successfully forwarded %d updates, new lastid: %d", len(updates), lastID) + s5l.Printf("srv|fwd-es: successfully forwarded %d updates, new lastid: %d", len(updates), lastID) } } } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go index 2972b91..90afb69 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go @@ -44,25 +44,25 @@ tryResync: for { client, err := graphite.NewGraphiteFromAddress(forwardHost) if err != nil { - s5l.Printf("fwd-graphite: connect returned err: %v", err) + s5l.Printf("srv|fwd-graphite: connecting failed: %v", err) time.Sleep(5 * time.Second) continue tryResync } lastID, err := srv.store.GetLastUpdateID() if err != nil { - s5l.Printf("fwd-graphite: lastupdate returned err: %v", err) + s5l.Printf("srv|fwd-graphite: fetching lastupdate failed: %v", err) client.Disconnect() time.Sleep(5 * time.Second) continue tryResync } - s5l.Printf("fwd-graphite: lastupdate: %d", lastID) + s5l.Printf("srv|fwd-graphite: new lastupdate: %d", lastID) nextBatch: for { updates, err := srv.store.GetUpdatesAfter(lastID, 5000) if err != nil { - s5l.Printf("fwd-graphite: failed reading updates: %v", err) + s5l.Printf("srv|fwd-graphite: reading updates failed: %v", err) time.Sleep(500 * time.Millisecond) continue nextBatch } @@ -85,13 +85,13 @@ tryResync: err = client.SendMetrics(metrics) if err != nil { - s5l.Printf("fwd-graphite: sending metrics failed: %v", err) + s5l.Printf("srv|fwd-graphite: sending metrics failed: %v", err) time.Sleep(1 * time.Second) continue tryResync } lastID = findMaxID(updates) - s5l.Printf("fwd-graphite: successfully forwarded %d updates, new lastid: %d", len(updates), lastID) + s5l.Printf("srv|fwd-graphite: successfully forwarded %d updates, new lastid: %d", len(updates), lastID) } } } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go index 7dc034e..198a53c 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go @@ -62,17 +62,17 @@ tryResync: // lastID, err := srv.forwardPiwikGetLastUpdateID(piwikURL, siteURL, siteID, token, client, hubUuid) lastID, err := srv.store.GetLastUpdateID() if err != nil { - s5l.Printf("fwd-piwik: lastupdate returned err: %v", err) + s5l.Printf("srv|fwd-piwik: fetching lastupdate failed: %v", err) time.Sleep(5 * time.Second) continue tryResync } - s5l.Printf("fwd-piwik: lastupdate: %d", lastID) + s5l.Printf("srv|fwd-piwik: new lastupdate: %d", lastID) nextBatch: for { updates, err := srv.store.GetUpdatesAfter(lastID, 5000) if err != nil { - s5l.Printf("fwd-piwik: failed reading updates: %v\n", err) + s5l.Printf("srv|fwd-piwik: reading updates failed: %v", err) time.Sleep(500 * time.Millisecond) continue nextBatch } @@ -114,26 +114,26 @@ tryResync: postData := bytes.Buffer{} if err := json.NewEncoder(&postData).Encode(req); err != nil { - s5l.Panicf("fwd-piwik: encode failed: %v\n", err) + s5l.Panicf("srv|fwd-piwik: encoding updates failed: %v", err) } // TODO: move this to seperate function and read from io.PipeReader resp, err := client.Post(piwikURL, "application/json", &postData) if err != nil { - s5l.Printf("fwd-piwik: post failed: %v\n", err) + s5l.Printf("srv|fwd-piwik: posting updates failed: %v", err) time.Sleep(1 * time.Second) continue tryResync } if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { body, _ := ioutil.ReadAll(resp.Body) - s5l.Printf("fwd-piwik: post failed: %s\n%s\n", resp.Status, body) + s5l.Printf("srv|fwd-piwik: posting updates failed: %s\n%s", resp.Status, body) time.Sleep(1 * time.Second) continue tryResync } resp.Body.Close() // TODO: check result from Piwik lastID = findMaxID(updates) - s5l.Printf("fwd-piwik: successfully forwarded %d updates, new lastid: %d", len(updates), lastID) + s5l.Printf("srv|fwd-piwik: successfully forwarded %d updates, new lastid: %d", len(updates), lastID) } } } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 3335878..4755c35 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -117,19 +117,19 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) { for { n, _, err := pconn.ReadFrom(buffer) if err != nil { - s5l.Printf("pipegram: read() failed: %v", err) + s5l.Printf("srv|pgram: read() failed: %v", err) continue } data := buffer[0:n] update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode() if err != nil { - s5l.Printf("pipegram: decoding data message failed: %v\n", err) + s5l.Printf("srv|pgram: decoding data message failed: %v\n", err) continue } if err = srv.Ingest(update); err != nil { - s5l.Printf("pipegram: storing data failed: %v\n", err) + s5l.Printf("srv|pgram: storing data failed: %v\n", err) } } } @@ -137,13 +137,13 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) { func (srv Server) ServePipegram(cfg PipegramInterfaceConfig) { pconn, err := net.ListenPacket("unixgram", cfg.ListenPath) if err != nil { - s5l.Printf("pipegram: listen() failed: %v", err) + s5l.Printf("srv|pgram: listen() failed: %v", err) return } defer pconn.Close() - s5l.Printf("server|pipegram: listening on '%s'", cfg.ListenPath) - defer s5l.Println("server|pipegram: interface stopped") + s5l.Printf("server|pgram: listening on '%s'", cfg.ListenPath) + defer s5l.Println("server|pgram: interface stopped") srv.pipegramHandle(pconn) } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 9c9521a..9e0b474 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -224,12 +224,12 @@ func webUpdatesGetBulk(srv *Server, w http.ResponseWriter, r *http.Request) { enc, err := NewStatefulEncoder(w) if err != nil { - s5l.Printf("server|web: sending response(init) failed: %v", err) + s5l.Printf("srv|web: sending response(init) failed: %v", err) return } for _, update := range updates { if err := enc.Encode(update); err != nil { - s5l.Printf("server|web: sending response(data) failed: %v", err) + s5l.Printf("srv|web: sending response(data) failed: %v", err) return } } @@ -368,7 +368,7 @@ func sendWebResponse(w http.ResponseWriter, status int, respdata interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) if err := json.NewEncoder(w).Encode(respdata); err != nil { - s5l.Printf("server|web: sending response failed: %v", err) + s5l.Printf("srv|web: sending response failed: %v", err) } } @@ -417,12 +417,12 @@ func webRun(listener *net.TCPListener, srv *Server) (err error) { func (srv Server) ServeWeb(cfg WebInterfaceConfig) { ln, err := net.Listen("tcp", cfg.ListenAddr) if err != nil { - s5l.Printf("server|web: listen() failed: %v", err) + s5l.Printf("srv|web: listen() failed: %v", err) return } - s5l.Printf("server|web: listening on '%s'", cfg.ListenAddr) - defer s5l.Println("server|web: interface stopped") + s5l.Printf("srv|web: listening on '%s'", cfg.ListenAddr) + defer s5l.Println("srv|web: interface stopped") webRun(ln.(*net.TCPListener), &srv) } |