diff options
-rw-r--r-- | dat/sample-web.json | 10 | ||||
-rw-r--r-- | doc/TODO | 1 | ||||
-rw-r--r-- | src/hub/Makefile | 1 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive-hub/s5hub.go | 4 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 44 |
5 files changed, 57 insertions, 3 deletions
diff --git a/dat/sample-web.json b/dat/sample-web.json index c6cc573..4497496 100644 --- a/dat/sample-web.json +++ b/dat/sample-web.json @@ -3,7 +3,15 @@ "hostname": "localhost", "tags": ["suppa", "toll"], "version": 2, - "data": {"bytes-sent": 1519, "client-count": 0}, + "data": { + "bytes-sent": 1519, + "client-count": 0, + "clients": [ + {"ip": "85.238.166.201", "bytes-sent": 413535, "user-agent": "blah"}, + {"ip": "85.238.166.205", "bytes-sent": 1234, "user-agent": "foo"}, + {"ip": "192.168.0.1", "bytes-sent": 325345, "user-agent": "bar"} + ] + }, "start-time": "2013-10-21T12:30:00Z", "duration-ms": 300000 } @@ -10,3 +10,4 @@ * hub: add crpyto-pan support for IP address anonymization * hub: add support for Elasticsearch 5.x * hub: add additional checks at protocol parser +* hub: sanity check len(clients) and clients field in date update diff --git a/src/hub/Makefile b/src/hub/Makefile index a65f145..b7640f7 100644 --- a/src/hub/Makefile +++ b/src/hub/Makefile @@ -40,6 +40,7 @@ EXECUTEABLE := sfive-hub LIBS := "github.com/boltdb/bolt" \ "github.com/pborman/uuid" \ + "github.com/Yawning/cryptopan" \ "github.com/equinox0815/graphite-golang" diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go index 68db727..9c58481 100644 --- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go +++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go @@ -46,6 +46,8 @@ var s5hl = log.New(os.Stderr, "[s5hub]\t", log.LstdFlags) func main() { db := flag.String("db", "/var/lib/sfive/db.bolt", "path to the database file") readOnly := flag.Bool("read-only", false, "open database in read-only mode") + anonymize := flag.Bool("anonymize", false, "anonymize clients IP addresses using crypto-pan") + anonKeyFile := flag.String("anonymization-key-file", "", "path to the file containing the key to be used for crypto-pan anonymization (default: use randomly generated key)") pipe := flag.String("pipe", "/var/run/sfive/pipe", "path to the unix pipe for the pipeserver") startPipe := flag.Bool("start-pipe-server", true, "start a connection oriented pipe server; see option pipe") pipegram := flag.String("pipegram", "/var/run/sfive/pipegram", "path to the unix datagram pipe for the pipeserver") @@ -70,7 +72,7 @@ func main() { return } - srv, err := sfive.NewServer(*db, *readOnly) + srv, err := sfive.NewServer(*db, *readOnly, *anonymize, *anonKeyFile) if err != nil { s5hl.Fatalf(err.Error()) } diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index f99bffb..87b2c0d 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -32,6 +32,10 @@ package sfive +import ( + "errors" +) + type appendToken struct { update UpdateFull response chan error @@ -72,6 +76,7 @@ type getLastUpdateIdToken struct { type Server struct { store Store + anonymization AnonymizationAlgo quit chan bool done chan bool appendChan chan appendToken @@ -81,6 +86,29 @@ type Server struct { getLastUpdateIdChan chan getLastUpdateIdToken } +func (srv Server) Anonymize(update UpdateFull) UpdateFull { + anonymized := []Client{} + for _, client := range update.Data.Clients { + aIp, err := srv.anonymization.Anonymize(client.Ip) + if err != nil { + s5l.Printf("anonymization: failed: %v", err) + } else { + client.Ip = aIp + } + anonymized = append(anonymized, client) + } + update.Data.Clients = anonymized + return update +} + +func (srv Server) AnonymizeMany(updates []UpdateFull) []UpdateFull { + anonymized := []UpdateFull{} + for _, update := range updates { + anonymized = append(anonymized, srv.Anonymize(update)) + } + return anonymized +} + func (srv Server) appendActor() { defer func() { srv.done <- true }() for { @@ -88,8 +116,14 @@ func (srv Server) appendActor() { case <-srv.quit: return case token := <-srv.appendChan: + if srv.anonymization != nil { + token.update = srv.Anonymize(token.update) + } token.response <- srv.store.Append(token.update) case token := <-srv.appendManyChan: + if srv.anonymization != nil { + token.updates = srv.AnonymizeMany(token.updates) + } token.response <- srv.store.AppendMany(token.updates) case token := <-srv.getUpdatesAfterChan: values, err := srv.store.GetUpdatesAfter(token.id, token.limit) @@ -156,7 +190,7 @@ func (srv Server) Close() { s5l.Printf("server: finished\n") } -func NewServer(dbPath string, readOnly bool) (server *Server, err error) { +func NewServer(dbPath string, readOnly, anonymize bool, anonKeyfile string) (server *Server, err error) { // TODO read configuration and create instance with correct settings server = &Server{} server.store, err = NewStore(dbPath, readOnly) @@ -164,6 +198,14 @@ func NewServer(dbPath string, readOnly bool) (server *Server, err error) { return } + if anonymize { + if server.anonymization, err = NewCryptopanAnonymization(anonKeyfile); err != nil { + err = errors.New("failed to initialize IP address anonymization: " + err.Error()) + return + } + s5l.Printf("using IP address anonymization: %s", server.anonymization) + } + server.quit = make(chan bool) server.done = make(chan bool) server.appendChan = make(chan appendToken, 32) |