summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--dat/sample-web.json10
-rw-r--r--doc/TODO1
-rw-r--r--src/hub/Makefile1
-rw-r--r--src/hub/src/spreadspace.org/sfive-hub/s5hub.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go44
5 files changed, 57 insertions, 3 deletions
diff --git a/dat/sample-web.json b/dat/sample-web.json
index c6cc573..4497496 100644
--- a/dat/sample-web.json
+++ b/dat/sample-web.json
@@ -3,7 +3,15 @@
"hostname": "localhost",
"tags": ["suppa", "toll"],
"version": 2,
- "data": {"bytes-sent": 1519, "client-count": 0},
+ "data": {
+ "bytes-sent": 1519,
+ "client-count": 0,
+ "clients": [
+ {"ip": "85.238.166.201", "bytes-sent": 413535, "user-agent": "blah"},
+ {"ip": "85.238.166.205", "bytes-sent": 1234, "user-agent": "foo"},
+ {"ip": "192.168.0.1", "bytes-sent": 325345, "user-agent": "bar"}
+ ]
+ },
"start-time": "2013-10-21T12:30:00Z",
"duration-ms": 300000
}
diff --git a/doc/TODO b/doc/TODO
index 4ef2c48..7b6308c 100644
--- a/doc/TODO
+++ b/doc/TODO
@@ -10,3 +10,4 @@
* hub: add crpyto-pan support for IP address anonymization
* hub: add support for Elasticsearch 5.x
* hub: add additional checks at protocol parser
+* hub: sanity check len(clients) and clients field in date update
diff --git a/src/hub/Makefile b/src/hub/Makefile
index a65f145..b7640f7 100644
--- a/src/hub/Makefile
+++ b/src/hub/Makefile
@@ -40,6 +40,7 @@ EXECUTEABLE := sfive-hub
LIBS := "github.com/boltdb/bolt" \
"github.com/pborman/uuid" \
+ "github.com/Yawning/cryptopan" \
"github.com/equinox0815/graphite-golang"
diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
index 68db727..9c58481 100644
--- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
+++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
@@ -46,6 +46,8 @@ var s5hl = log.New(os.Stderr, "[s5hub]\t", log.LstdFlags)
func main() {
db := flag.String("db", "/var/lib/sfive/db.bolt", "path to the database file")
readOnly := flag.Bool("read-only", false, "open database in read-only mode")
+ anonymize := flag.Bool("anonymize", false, "anonymize clients IP addresses using crypto-pan")
+ anonKeyFile := flag.String("anonymization-key-file", "", "path to the file containing the key to be used for crypto-pan anonymization (default: use randomly generated key)")
pipe := flag.String("pipe", "/var/run/sfive/pipe", "path to the unix pipe for the pipeserver")
startPipe := flag.Bool("start-pipe-server", true, "start a connection oriented pipe server; see option pipe")
pipegram := flag.String("pipegram", "/var/run/sfive/pipegram", "path to the unix datagram pipe for the pipeserver")
@@ -70,7 +72,7 @@ func main() {
return
}
- srv, err := sfive.NewServer(*db, *readOnly)
+ srv, err := sfive.NewServer(*db, *readOnly, *anonymize, *anonKeyFile)
if err != nil {
s5hl.Fatalf(err.Error())
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index f99bffb..87b2c0d 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -32,6 +32,10 @@
package sfive
+import (
+ "errors"
+)
+
type appendToken struct {
update UpdateFull
response chan error
@@ -72,6 +76,7 @@ type getLastUpdateIdToken struct {
type Server struct {
store Store
+ anonymization AnonymizationAlgo
quit chan bool
done chan bool
appendChan chan appendToken
@@ -81,6 +86,29 @@ type Server struct {
getLastUpdateIdChan chan getLastUpdateIdToken
}
+func (srv Server) Anonymize(update UpdateFull) UpdateFull {
+ anonymized := []Client{}
+ for _, client := range update.Data.Clients {
+ aIp, err := srv.anonymization.Anonymize(client.Ip)
+ if err != nil {
+ s5l.Printf("anonymization: failed: %v", err)
+ } else {
+ client.Ip = aIp
+ }
+ anonymized = append(anonymized, client)
+ }
+ update.Data.Clients = anonymized
+ return update
+}
+
+func (srv Server) AnonymizeMany(updates []UpdateFull) []UpdateFull {
+ anonymized := []UpdateFull{}
+ for _, update := range updates {
+ anonymized = append(anonymized, srv.Anonymize(update))
+ }
+ return anonymized
+}
+
func (srv Server) appendActor() {
defer func() { srv.done <- true }()
for {
@@ -88,8 +116,14 @@ func (srv Server) appendActor() {
case <-srv.quit:
return
case token := <-srv.appendChan:
+ if srv.anonymization != nil {
+ token.update = srv.Anonymize(token.update)
+ }
token.response <- srv.store.Append(token.update)
case token := <-srv.appendManyChan:
+ if srv.anonymization != nil {
+ token.updates = srv.AnonymizeMany(token.updates)
+ }
token.response <- srv.store.AppendMany(token.updates)
case token := <-srv.getUpdatesAfterChan:
values, err := srv.store.GetUpdatesAfter(token.id, token.limit)
@@ -156,7 +190,7 @@ func (srv Server) Close() {
s5l.Printf("server: finished\n")
}
-func NewServer(dbPath string, readOnly bool) (server *Server, err error) {
+func NewServer(dbPath string, readOnly, anonymize bool, anonKeyfile string) (server *Server, err error) {
// TODO read configuration and create instance with correct settings
server = &Server{}
server.store, err = NewStore(dbPath, readOnly)
@@ -164,6 +198,14 @@ func NewServer(dbPath string, readOnly bool) (server *Server, err error) {
return
}
+ if anonymize {
+ if server.anonymization, err = NewCryptopanAnonymization(anonKeyfile); err != nil {
+ err = errors.New("failed to initialize IP address anonymization: " + err.Error())
+ return
+ }
+ s5l.Printf("using IP address anonymization: %s", server.anonymization)
+ }
+
server.quit = make(chan bool)
server.done = make(chan bool)
server.appendChan = make(chan appendToken, 32)