summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-07-05 01:47:01 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-07-05 01:47:01 +0200
commit694ce302dcae463a9077f21d50727fa249fd15d9 (patch)
tree5b37beaf913f7124d56e05993ae756ab72350467
parentserver interfaces now use config structs (diff)
some more logging cleanup
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go18
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go22
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go28
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go12
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go14
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go12
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go12
7 files changed, 59 insertions, 59 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 979cd83..2a312bd 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -67,7 +67,7 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull {
if srv.geoip != nil {
if info, err := srv.geoip.Lookup(client.IP); err != nil {
- s5l.Printf("server|transform: Geo-IP lookup failed: %v", err)
+ s5l.Printf("srv|xfrm: Geo-IP lookup failed: %v", err)
} else {
client.GeoInfo = *info
}
@@ -75,7 +75,7 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull {
if srv.anonymization != nil {
if aIP, err := srv.anonymization.Anonymize(client.IP); err != nil {
- s5l.Printf("server|transform: anonymization failed: %v", err)
+ s5l.Printf("srv|xfrm: anonymization failed: %v", err)
} else {
client.IP = aIP
}
@@ -87,13 +87,13 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull {
if uint(len(update.Data.Clients)) > update.Data.ClientCount {
if update.Data.ClientCount > 0 {
- s5l.Printf("server|transform: fixing client-count: %d -> %d", update.Data.ClientCount, len(update.Data.Clients))
+ s5l.Printf("srv|xfrm: fixing client-count: %d -> %d", update.Data.ClientCount, len(update.Data.Clients))
}
update.Data.ClientCount = uint(len(update.Data.Clients))
}
if bytesSentTotal > update.Data.BytesSent {
if update.Data.BytesSent > 0 {
- s5l.Printf("server|transform: fixing bytes-sent: %d -> %d", update.Data.BytesSent, bytesSentTotal)
+ s5l.Printf("srv|xfrm: fixing bytes-sent: %d -> %d", update.Data.BytesSent, bytesSentTotal)
}
update.Data.BytesSent = bytesSentTotal
}
@@ -137,14 +137,14 @@ func (srv Server) IngestMany(updates []*UpdateFull) error {
}
func (srv Server) Close() {
- s5l.Printf("server: shutting down")
+ s5l.Printf("srv: shutting down")
close(srv.quit)
srv.done.Wait()
close(srv.ingestChan)
close(srv.ingestManyChan)
srv.store.Close()
- s5l.Printf("server: finished")
+ s5l.Printf("srv: finished")
}
func NewServer(cfg SrvConfig) (srv *Server, err error) {
@@ -159,7 +159,7 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) {
err = errors.New("failed to initialize IP address anonymization: " + err.Error())
return
}
- s5l.Printf("server|transform: using IP address anonymization: %s", srv.anonymization)
+ s5l.Printf("srv|xfrm: using IP address anonymization: %s", srv.anonymization)
}
if cfg.Transform.GeoipDB != "" {
@@ -167,7 +167,7 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) {
err = errors.New("failed to initialize Geo-IP Lookup: " + err.Error())
return
}
- s5l.Printf("server|transform: using Geo-IP Lookup: %s", srv.geoip)
+ s5l.Printf("srv|xfrm: using Geo-IP Lookup: %s", srv.geoip)
}
srv.numWorker = runtime.NumCPU() // TODO: make this configurable
@@ -182,6 +182,6 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) {
srv.ingestWorker(idx)
}(i)
}
- s5l.Printf("server: started")
+ s5l.Printf("srv: started")
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index 2d07844..a3e5c1f 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -56,19 +56,19 @@ func fwdGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) (la
var resp *http.Response
resp, err = client.Get(baseUrl + "/lastupdate/" + hubUUID)
if err != nil {
- s5l.Printf("fwd: failed to query for lastupdate: %v\n", err)
+ s5l.Printf("srv|fwd: querying for lastupdate failed: %v", err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
- s5l.Printf("fwd: server failed to fulfill query for lastupdate: %v\n", resp.StatusCode)
+ s5l.Printf("srv|fwd: remote hub failed to fulfill query for lastupdate: %v", resp.StatusCode)
return
}
result := WebLastUpdateIDResponse{}
if err = json.NewDecoder(resp.Body).Decode(&result); err != nil {
- s5l.Printf("fwd: server failed to fulfill query for lastupdate: %v\n", err)
+ s5l.Printf("srv|fwd: remote hub failed to fulfill query for lastupdate: %v", err)
return
}
@@ -81,12 +81,12 @@ func fwdWriteUpdates(updates []*UpdateFull, pw *io.PipeWriter) {
enc, err := NewStatefulEncoder(pw)
if err != nil {
- s5l.Printf("fwd: failed encoding/sending init message: %v", err)
+ s5l.Printf("srv|fwd: encoding/sending init message failed: %v", err)
return
}
for _, upd := range updates {
if err := enc.Encode(upd); err != nil {
- s5l.Printf("fwd: failed encoding/sending updates: %v", err)
+ s5l.Printf("srv|fwd: encoding/sending updates failed: %v", err)
return
}
}
@@ -116,17 +116,17 @@ tryResync:
for {
lastID, err := fwdGetLastUpdateID(baseUrl, client, hubUUID)
if err != nil {
- s5l.Printf("fwd: lastupdate returned err: %v", err)
+ s5l.Printf("srv|fwd: fetching lastupdate failed: %v", err)
time.Sleep(5 * time.Second)
continue tryResync
}
- s5l.Printf("fwd: lastupdate: %d", lastID)
+ s5l.Printf("srv|fwd: new lastupdate: %d", lastID)
nextBatch:
for {
updates, err := srv.store.GetUpdatesAfter(lastID, 5000)
if err != nil {
- s5l.Printf("fwd: failed reading updates: %v", err)
+ s5l.Printf("srv|fwd: reading updates failed: %v", err)
time.Sleep(500 * time.Millisecond)
continue nextBatch
}
@@ -138,15 +138,15 @@ tryResync:
pr, pw := io.Pipe()
go fwdWriteUpdates(updates, pw)
if num, err := fwdPostUpdates(client, url, pr); err != nil {
- s5l.Printf("fwd: failed sending updates: %v", err)
+ s5l.Printf("srv|fwd: sending updates failed: %v", err)
continue tryResync
} else if num != len(updates) {
- s5l.Printf("fwd: server acknowledged wrong number of updates: expected %d, got: %d", len(updates), num)
+ s5l.Printf("srv|fwd: server acknowledged wrong number of updates: expected %d, got: %d", len(updates), num)
continue tryResync
}
lastID = findMaxID(updates)
- s5l.Printf("fwd: successfully forwarded %d updates, new lastid: %d", len(updates), lastID)
+ s5l.Printf("srv|fwd: successfully forwarded %d updates, new lastid: %d", len(updates), lastID)
}
}
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
index d61c5fc..71313aa 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
@@ -51,29 +51,29 @@ func fwdEsGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) (
url := baseUrl + "/dataupdate/_search?search_type=count"
queryJson := fmt.Sprintf(forwardEsLastUpdateIDJson, hubUUID)
- s5dl.Printf("fwd-es: query: %s", queryJson)
+ s5dl.Printf("srv|fwd-es: query: %s", queryJson)
var resp *http.Response
resp, err = client.Post(url, "application/json", strings.NewReader(queryJson))
if err != nil {
- s5l.Printf("fwd-es: failed to query for lastupdate: %v\n", err)
+ s5l.Printf("srv|fwd-es: querying for lastupdate failed: %v", err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
- s5l.Printf("fwd-es: server failed to fulfill query for lastupdate: %v\n", resp.StatusCode)
+ s5l.Printf("srv|fwd-es: elasticsearch failed to fulfill query for lastupdate: %v", resp.StatusCode)
return
}
var body []byte
body, err = ioutil.ReadAll(resp.Body) // TODO: read from Body using json.Decoder
if err != nil {
- s5l.Printf("fwd-es: failed to read lastupdate response: %v\n", err)
+ s5l.Printf("srv|fwd-es: reading lastupdate response failed: %v", err)
return
}
- s5dl.Printf("fwd-es: lastupdate response: %s\n", body)
+ s5dl.Printf("srv|fwd-es: lastupdate response: %s", body)
if len(body) == 0 {
lastID = -1
@@ -81,13 +81,13 @@ func fwdEsGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) (
var value interface{}
err = json.Unmarshal(body, &value)
if err != nil {
- s5l.Printf("fwd-es: invalid lastupdate response: %v\n", err)
+ s5l.Printf("srv|fwd-es: invalid lastupdate response: %v", err)
return
}
idstrcntr := value.(map[string]interface{})["aggregations"].(map[string]interface{})["last-id"].(map[string]interface{})["value"]
if idstrcntr == nil {
- s5l.Printf("fwd-es: we are new here\n")
+ s5l.Printf("srv|fwd-es: we are new here")
lastID = -1
return
}
@@ -105,17 +105,17 @@ tryResync:
for {
lastID, err := fwdEsGetLastUpdateID(baseUrl, client, hubUUID)
if err != nil {
- s5l.Printf("fwd-es: lastupdate returned err: %v", err)
+ s5l.Printf("srv|fwd-es: fetching lastupdate failed: %v", err)
time.Sleep(5 * time.Second)
continue tryResync
}
- s5l.Printf("fwd-es: lastupdate: %d", lastID)
+ s5l.Printf("srv|fwd-es: new lastupdate: %d", lastID)
nextBatch:
for {
updates, err := srv.store.GetUpdatesAfter(lastID, 5000)
if err != nil {
- s5l.Printf("fwd-es: failed reading updates: %v\n", err)
+ s5l.Printf("srv|fwd-es: reading updates failed: %v", err)
time.Sleep(500 * time.Millisecond)
continue nextBatch
}
@@ -129,7 +129,7 @@ tryResync:
for _, update := range updates {
data, err := json.Marshal(update) // TODO: move to seperate function, use StatelessEncoder and write to io.PipeWriter
if err != nil {
- s5l.Panicf("fwd-es: encode failed: %v\n", err)
+ s5l.Panicf("srv|fwd-es: encoding updates failed: %v", err)
}
postData.WriteString(`{"index":{"_type":"dataupdate"}}`)
postData.WriteRune('\n')
@@ -140,21 +140,21 @@ tryResync:
// TODO: move to seperate function and use io.PipeReader as body
resp, err := client.Post(url, "application/json", bytes.NewReader(postData.Bytes()))
if err != nil {
- s5l.Printf("fwd-es: post failed: %v\n", err)
+ s5l.Printf("srv|fwd-es: posting updates failed: %v", err)
time.Sleep(1 * time.Second)
continue tryResync
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
body, _ := ioutil.ReadAll(resp.Body)
- s5l.Printf("fwd-es: post failed: %s\n%s\n", resp.Status, body)
+ s5l.Printf("srv|fwd-es: posting updates failed: %s\n%s", resp.Status, body)
time.Sleep(1 * time.Second)
continue tryResync
}
resp.Body.Close() // TODO: check result from elasticsearch
lastID = findMaxID(updates)
- s5l.Printf("fwd-es: successfully forwarded %d updates, new lastid: %d", len(updates), lastID)
+ s5l.Printf("srv|fwd-es: successfully forwarded %d updates, new lastid: %d", len(updates), lastID)
}
}
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
index 2972b91..90afb69 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
@@ -44,25 +44,25 @@ tryResync:
for {
client, err := graphite.NewGraphiteFromAddress(forwardHost)
if err != nil {
- s5l.Printf("fwd-graphite: connect returned err: %v", err)
+ s5l.Printf("srv|fwd-graphite: connecting failed: %v", err)
time.Sleep(5 * time.Second)
continue tryResync
}
lastID, err := srv.store.GetLastUpdateID()
if err != nil {
- s5l.Printf("fwd-graphite: lastupdate returned err: %v", err)
+ s5l.Printf("srv|fwd-graphite: fetching lastupdate failed: %v", err)
client.Disconnect()
time.Sleep(5 * time.Second)
continue tryResync
}
- s5l.Printf("fwd-graphite: lastupdate: %d", lastID)
+ s5l.Printf("srv|fwd-graphite: new lastupdate: %d", lastID)
nextBatch:
for {
updates, err := srv.store.GetUpdatesAfter(lastID, 5000)
if err != nil {
- s5l.Printf("fwd-graphite: failed reading updates: %v", err)
+ s5l.Printf("srv|fwd-graphite: reading updates failed: %v", err)
time.Sleep(500 * time.Millisecond)
continue nextBatch
}
@@ -85,13 +85,13 @@ tryResync:
err = client.SendMetrics(metrics)
if err != nil {
- s5l.Printf("fwd-graphite: sending metrics failed: %v", err)
+ s5l.Printf("srv|fwd-graphite: sending metrics failed: %v", err)
time.Sleep(1 * time.Second)
continue tryResync
}
lastID = findMaxID(updates)
- s5l.Printf("fwd-graphite: successfully forwarded %d updates, new lastid: %d", len(updates), lastID)
+ s5l.Printf("srv|fwd-graphite: successfully forwarded %d updates, new lastid: %d", len(updates), lastID)
}
}
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
index 7dc034e..198a53c 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
@@ -62,17 +62,17 @@ tryResync:
// lastID, err := srv.forwardPiwikGetLastUpdateID(piwikURL, siteURL, siteID, token, client, hubUuid)
lastID, err := srv.store.GetLastUpdateID()
if err != nil {
- s5l.Printf("fwd-piwik: lastupdate returned err: %v", err)
+ s5l.Printf("srv|fwd-piwik: fetching lastupdate failed: %v", err)
time.Sleep(5 * time.Second)
continue tryResync
}
- s5l.Printf("fwd-piwik: lastupdate: %d", lastID)
+ s5l.Printf("srv|fwd-piwik: new lastupdate: %d", lastID)
nextBatch:
for {
updates, err := srv.store.GetUpdatesAfter(lastID, 5000)
if err != nil {
- s5l.Printf("fwd-piwik: failed reading updates: %v\n", err)
+ s5l.Printf("srv|fwd-piwik: reading updates failed: %v", err)
time.Sleep(500 * time.Millisecond)
continue nextBatch
}
@@ -114,26 +114,26 @@ tryResync:
postData := bytes.Buffer{}
if err := json.NewEncoder(&postData).Encode(req); err != nil {
- s5l.Panicf("fwd-piwik: encode failed: %v\n", err)
+ s5l.Panicf("srv|fwd-piwik: encoding updates failed: %v", err)
}
// TODO: move this to seperate function and read from io.PipeReader
resp, err := client.Post(piwikURL, "application/json", &postData)
if err != nil {
- s5l.Printf("fwd-piwik: post failed: %v\n", err)
+ s5l.Printf("srv|fwd-piwik: posting updates failed: %v", err)
time.Sleep(1 * time.Second)
continue tryResync
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
body, _ := ioutil.ReadAll(resp.Body)
- s5l.Printf("fwd-piwik: post failed: %s\n%s\n", resp.Status, body)
+ s5l.Printf("srv|fwd-piwik: posting updates failed: %s\n%s", resp.Status, body)
time.Sleep(1 * time.Second)
continue tryResync
}
resp.Body.Close() // TODO: check result from Piwik
lastID = findMaxID(updates)
- s5l.Printf("fwd-piwik: successfully forwarded %d updates, new lastid: %d", len(updates), lastID)
+ s5l.Printf("srv|fwd-piwik: successfully forwarded %d updates, new lastid: %d", len(updates), lastID)
}
}
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 3335878..4755c35 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -117,19 +117,19 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) {
for {
n, _, err := pconn.ReadFrom(buffer)
if err != nil {
- s5l.Printf("pipegram: read() failed: %v", err)
+ s5l.Printf("srv|pgram: read() failed: %v", err)
continue
}
data := buffer[0:n]
update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode()
if err != nil {
- s5l.Printf("pipegram: decoding data message failed: %v\n", err)
+ s5l.Printf("srv|pgram: decoding data message failed: %v\n", err)
continue
}
if err = srv.Ingest(update); err != nil {
- s5l.Printf("pipegram: storing data failed: %v\n", err)
+ s5l.Printf("srv|pgram: storing data failed: %v\n", err)
}
}
}
@@ -137,13 +137,13 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) {
func (srv Server) ServePipegram(cfg PipegramInterfaceConfig) {
pconn, err := net.ListenPacket("unixgram", cfg.ListenPath)
if err != nil {
- s5l.Printf("pipegram: listen() failed: %v", err)
+ s5l.Printf("srv|pgram: listen() failed: %v", err)
return
}
defer pconn.Close()
- s5l.Printf("server|pipegram: listening on '%s'", cfg.ListenPath)
- defer s5l.Println("server|pipegram: interface stopped")
+ s5l.Printf("server|pgram: listening on '%s'", cfg.ListenPath)
+ defer s5l.Println("server|pgram: interface stopped")
srv.pipegramHandle(pconn)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 9c9521a..9e0b474 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -224,12 +224,12 @@ func webUpdatesGetBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
enc, err := NewStatefulEncoder(w)
if err != nil {
- s5l.Printf("server|web: sending response(init) failed: %v", err)
+ s5l.Printf("srv|web: sending response(init) failed: %v", err)
return
}
for _, update := range updates {
if err := enc.Encode(update); err != nil {
- s5l.Printf("server|web: sending response(data) failed: %v", err)
+ s5l.Printf("srv|web: sending response(data) failed: %v", err)
return
}
}
@@ -368,7 +368,7 @@ func sendWebResponse(w http.ResponseWriter, status int, respdata interface{}) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
if err := json.NewEncoder(w).Encode(respdata); err != nil {
- s5l.Printf("server|web: sending response failed: %v", err)
+ s5l.Printf("srv|web: sending response failed: %v", err)
}
}
@@ -417,12 +417,12 @@ func webRun(listener *net.TCPListener, srv *Server) (err error) {
func (srv Server) ServeWeb(cfg WebInterfaceConfig) {
ln, err := net.Listen("tcp", cfg.ListenAddr)
if err != nil {
- s5l.Printf("server|web: listen() failed: %v", err)
+ s5l.Printf("srv|web: listen() failed: %v", err)
return
}
- s5l.Printf("server|web: listening on '%s'", cfg.ListenAddr)
- defer s5l.Println("server|web: interface stopped")
+ s5l.Printf("srv|web: listening on '%s'", cfg.ListenAddr)
+ defer s5l.Println("srv|web: interface stopped")
webRun(ln.(*net.TCPListener), &srv)
}