diff options
author | Christian Pointner <equinox@spreadspace.org> | 2017-05-19 05:54:06 +0200 |
---|---|---|
committer | Christian Pointner <equinox@spreadspace.org> | 2017-05-19 05:54:06 +0200 |
commit | aa1659587addb5bbf249e614047cbe913c5f2c8e (patch) | |
tree | b6685f5df684919eceebd902bbe7daa39234a9e7 /src | |
parent | added encoding for geo-ip info (diff) |
geo ip lookup implemented
Diffstat (limited to 'src')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 5 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 20 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store.go | 79 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store_test.go | 61 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5typesApi.go | 9 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5typesStore.go | 67 |
6 files changed, 204 insertions, 37 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index d3ce519..6d07e8c 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -66,11 +66,10 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull { bytesSentTotal += client.BytesSent if srv.geoip != nil { - if rec, err := srv.geoip.Lookup(client.IP); err != nil { + if info, err := srv.geoip.Lookup(client.IP); err != nil { s5l.Printf("transform: Geo-IP lookup failed: %v", err) } else { - // TODO: actually store the record - s5l.Printf("transform: Geo-IP lookup; %s -> %+v", client.IP, rec) + client.GeoInfo = *info } } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index cf08e72..9fe5b86 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -104,6 +104,25 @@ func webSources(srv *Server, w http.ResponseWriter, r *http.Request) { } // +// /clients +// + +func webClients(srv *Server, w http.ResponseWriter, r *http.Request) { + if r.Method != "GET" { + sendInvalidMethod(w, r.Method) + return + } + + var err error + resp := WebClientsResponse{} + if resp.Clients, err = srv.store.GetClients(); err != nil { + sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()}) + return + } + sendWebResponse(w, http.StatusOK, resp) +} + +// // /updates/(:ID|_bulk) // @@ -382,6 +401,7 @@ func webRun(listener *net.TCPListener, srv *Server) (err error) { mux.Handle("/healthz", webHandler{srv, webHealthz}) mux.Handle("/hubs", webHandler{srv, webHubs}) mux.Handle("/sources", webHandler{srv, webSources}) + mux.Handle("/clients", webHandler{srv, webClients}) mux.Handle("/updates/", webHandler{srv, webUpdatesWithParam}) mux.Handle("/updates", webHandler{srv, webUpdates}) mux.Handle("/lastupdate/", webHandler{srv, webLastUpdateIDForUUID}) diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index e8aebf5..ebe83ec 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -50,7 +50,7 @@ const ( var ( storeBuckets = []string{latestUpdatesBn, hubUUIDsFwdBn, hubUUIDsRevBn, updatesBn, - sourcesFwdBn, sourcesRevBn, clientDataBn, userAgentsFwdBn, userAgentsRevBn} + sourcesFwdBn, sourcesRevBn, clientsFwdBn, clientsRevBn, clientDataBn, userAgentsFwdBn, userAgentsRevBn} ) type Store struct { @@ -278,7 +278,36 @@ func (st *Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaID int, err error return uaID, err } -func (st *Store) insertClient(tx *bolt.Tx, uID int, cd []Client) error { +func (st *Store) insertNewClient(tx *bolt.Tx, client *clientDB) (clientID int, err error) { + bf := tx.Bucket([]byte(clientsFwdBn)) + // br.FillPercent = 1.0 // these appends are not ordered (the key is the slug and not an integer id) + br := tx.Bucket([]byte(clientsRevBn)) + br.FillPercent = 1.0 // we only do appends (with ever incrementing interger ids) + + slug := client.Slug() + bClientID := bf.Get([]byte(slug)) + if bClientID != nil { + return btoi(bClientID), nil + } + + var jsonData []byte + if jsonData, err = json.Marshal(client); err != nil { + return + } + + next, _ := bf.NextSequence() + clientID = int(next) + if err = bf.Put([]byte(slug), itob(clientID)); err != nil { + return + } + if err = br.Put(itob(clientID), jsonData); err != nil { + return + } + + return clientID, err +} + +func (st *Store) insertClients(tx *bolt.Tx, uID int, cd []Client) error { if len(cd) == 0 { return nil } @@ -292,7 +321,13 @@ func (st *Store) insertClient(tx *bolt.Tx, uID int, cd []Client) error { if err != nil { return err } - data = append(data, clientDataDB{c.IP, uaID, c.BytesSent}) + + clientID, err := st.insertNewClient(tx, NewClientDB(&c)) + if err != nil { + return err + } + + data = append(data, clientDataDB{clientID, uaID, c.BytesSent}) } jsonData, err := json.Marshal(data) @@ -328,7 +363,7 @@ func (st *Store) appendItem(tx *bolt.Tx, uf *UpdateFull) (uID int, err error) { if uID, err = st.insertUpdate(tx, u); err != nil { return } - if err = st.insertClient(tx, uID, uf.Data.Clients); err != nil { + if err = st.insertClients(tx, uID, uf.Data.Clients); err != nil { return } @@ -392,10 +427,11 @@ func (st *Store) getSource(tx *bolt.Tx, id int) (source *sourceDB, err error) { } func (st *Store) getClients(tx *bolt.Tx, id int) (clients []Client, err error) { - bc := tx.Bucket([]byte(clientDataBn)) + bcd := tx.Bucket([]byte(clientDataBn)) + bc := tx.Bucket([]byte(clientsRevBn)) bu := tx.Bucket([]byte(userAgentsRevBn)) - jsonData := bc.Get(itob(id)) + jsonData := bcd.Get(itob(id)) if jsonData == nil { return } @@ -404,7 +440,18 @@ func (st *Store) getClients(tx *bolt.Tx, id int) (clients []Client, err error) { return } for _, c := range data { - cd := Client{IP: c.IP, BytesSent: c.BytesSent} + cd := Client{BytesSent: c.BytesSent} + jsonData := bc.Get(itob(c.ClientID)) + if jsonData == nil { + err = ErrClientNotFound + return + } + client := &clientDB{} + if err = json.Unmarshal(jsonData, client); err != nil { + return + } + cd.CopyFromClientDB(client) + ua := bu.Get(itob(c.UserAgentID)) if ua != nil { cd.UserAgent = string(ua) @@ -556,3 +603,21 @@ func (st *Store) GetSources() (sources []*Source, err error) { }) return } + +func (st *Store) GetClients() (clients []*Client, err error) { + clients = []*Client{} + err = st.db.View(func(tx *bolt.Tx) error { + c := tx.Bucket([]byte(clientsRevBn)).Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + var s *clientDB + if err := json.Unmarshal(v, &s); err != nil { + return err + } + client := &Client{} + client.CopyFromClientDB(s) + clients = append(clients, client) + } + return nil + }) + return +} diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index a82c208..3f5cd37 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -54,14 +54,14 @@ var ( sourceData = Source{Hostname: "streamer", Tags: []string{"tag1", "master"}, Stream: streamIDData} updateData = Update{Data: UpdateData{ClientCount: 3, BytesReceived: 42, BytesSent: 136}, Duration: 5000} clientsData = []Client{ - Client{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400}, - Client{"10.12.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}, - Client{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400}, - Client{"192.168.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}, - Client{"172.16.0.2", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}} + Client{"127.0.0.1", 51236, "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400, GeoInfo{}}, + Client{"10.12.0.1", 0, "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400, GeoInfo{}}, + Client{"127.0.0.1", 51202, "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400, GeoInfo{}}, + Client{"192.168.0.1", 49487, "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400, GeoInfo{}}, + Client{"172.16.0.2", 0, "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400, GeoInfo{}}} ) -func generateTestData(n int) ([]*UpdateFull, int) { +func generateTestData(n int) ([]*UpdateFull, int, int) { hostnames := []string{"streamer1", "streamer2"} contents := []string{"av", "audio"} formats := []string{"webm", "flash", "hls"} @@ -102,7 +102,7 @@ func generateTestData(n int) ([]*UpdateFull, int) { } starttime = starttime.Add(time.Duration(duration) * time.Millisecond) } - return data[:n], numSrcs + return data[:n], numSrcs, len(clientsData) } func TestMain(m *testing.M) { @@ -507,7 +507,7 @@ func TestForwardedUpdates(t *testing.T) { defer store.Close() // generate/append some local updates - data, _ := generateTestData(10) + data, _, _ := generateTestData(10) if err := store.AppendMany(data); err != nil { t.Fatalf("unexpected error: %v", err) } @@ -666,11 +666,11 @@ func TestForwardedUpdates2(t *testing.T) { defer finalStore.Close() // generate/append some updates to src - data, _ := generateTestData(10) + data, _, _ := generateTestData(10) if err := src1Store.AppendMany(data); err != nil { t.Fatalf("unexpected error: %v", err) } - data, _ = generateTestData(7) + data, _, _ = generateTestData(7) if err := src2Store.AppendMany(data); err != nil { t.Fatalf("unexpected error: %v", err) } @@ -759,7 +759,7 @@ func TestGetSources(t *testing.T) { } // generate/append some data - data, numSrcs := generateTestData(-1) + data, numSrcs, _ := generateTestData(-1) if err := store.AppendMany(data); err != nil { t.Fatalf("unexpected error: %v", err) } @@ -775,6 +775,41 @@ func TestGetSources(t *testing.T) { // the result will be orderd using the slug so doing a DeepEqual doesn't work here } +func TestGetClients(t *testing.T) { + // prepare a new store + os.Remove(testBoltPath) + store, err := NewStore(testBoltPath, false) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer store.Close() + + // check if client list is empty + clientList, err := store.GetClients() + if err != nil { + t.Fatalf("fetching clients failed: %v", err) + } + if len(clientList) != 0 { + t.Fatalf("wrong number of clients: %d, expected 0", len(clientList)) + } + + // generate/append some data + data, _, numClients := generateTestData(-1) + if err := store.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // fetch all the clients + clientList, err = store.GetClients() + if err != nil { + t.Fatalf("fetching clients failed: %v", err) + } + if len(clientList) != numClients { + t.Fatalf("wrong number of clients: %d, expected %d", len(clientList), numClients) + } + // the result will be orderd using the slug so doing a DeepEqual doesn't work here +} + // // Benchmarking // @@ -786,7 +821,7 @@ func BenchmarkAppendMany(b *testing.B) { b.Fatalf("unexpected error: %v", err) } defer store.Close() - data, _ := generateTestData(b.N) + data, _, _ := generateTestData(b.N) b.ResetTimer() @@ -802,7 +837,7 @@ func BenchmarkGetUpdatesAfter(b *testing.B) { b.Fatalf("unexpected error: %v", err) } defer store.Close() - data, _ := generateTestData(b.N) + data, _, _ := generateTestData(b.N) if err := store.AppendMany(data); err != nil { b.Fatalf("unexpected error: %v", err) } diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go index c1c425e..fa0d7a2 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go @@ -61,9 +61,10 @@ type GeoInfo struct { type Client struct { IP string `json:"ip"` + Port uint `json:"port,omitempty"` UserAgent string `json:"user-agent,omitempty"` - BytesSent uint `json:"bytes-sent"` - // GeoInfo + BytesSent uint `json:"bytes-sent,omitempty"` + GeoInfo } type UpdateData struct { @@ -140,6 +141,10 @@ type WebSourcesResponse struct { Sources []*Source `json:"sources"` } +type WebClientsResponse struct { + Clients []*Client `json:"clients"` +} + type WebUpdatesGetResponse struct { Updates []*UpdateFull `json:"updates"` } diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go index 92147e1..57e6776 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go @@ -43,6 +43,7 @@ import ( var ( ErrNotFound = errors.New("date-update entry not found") ErrSourceNotFound = errors.New("source entry not found") + ErrClientNotFound = errors.New("client entry not found") ErrReadOnly = errors.New("store is in read-only mode") ) @@ -55,6 +56,8 @@ const ( updatesBn = "Updates" sourcesFwdBn = "SourcesFwd" sourcesRevBn = "SourcesRev" + clientsFwdBn = "ClientsFwd" + clientsRevBn = "ClientsRev" clientDataBn = "ClientData" userAgentsFwdBn = "UserAgentsFwd" userAgentsRevBn = "UserAgentsRev" @@ -77,19 +80,19 @@ type sourceDB struct { Tags []string `json:"t"` } -func NewSourceDB(value *UpdateFull) *sourceDB { +func NewSourceDB(uf *UpdateFull) *sourceDB { return &sourceDB{ - Hostname: value.Source.Hostname, + Hostname: uf.Source.Hostname, Stream: streamDB{ - ContentID: value.Source.Stream.ContentID, - Format: value.Source.Stream.Format, - Quality: value.Source.Stream.Quality, + ContentID: uf.Source.Stream.ContentID, + Format: uf.Source.Stream.Format, + Quality: uf.Source.Stream.Quality, }, - Tags: value.Source.Tags, + Tags: uf.Source.Tags, } } -func (s sourceDB) Slug() string { +func (s *sourceDB) Slug() string { return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.Stream.ContentID, s.Stream.Format, s.Stream.Quality, strings.Join(s.Tags, ",")) } @@ -101,14 +104,54 @@ func (s *Source) CopyFromSourceDB(sdb *sourceDB) { s.Tags = sdb.Tags } -// stored in clientDataBn +type clientDB struct { + IP string `json:"ip"` + Port uint `json:"po,omitempty"` + CountryName string `json:"cn,omitempty"` + CountryCode2 string `json:"cc,omitempty"` + RegionName string `json:"rn,omitempty"` + RegionCode string `json:"rc,omitempty"` + CityName string `json:"ct,omitempty"` + Latitude float64 `json:"lat,omitempty"` + Longitude float64 `json:"lon,omitempty"` +} + +func NewClientDB(c *Client) *clientDB { + return &clientDB{ + IP: c.IP, + Port: c.Port, + CountryName: c.CountryName, + CountryCode2: c.CountryCode2, + RegionName: c.RegionName, + RegionCode: c.RegionCode, + CityName: c.CityName, + Latitude: c.Latitude, + Longitude: c.Longitude, + } +} + +func (c *clientDB) Slug() string { + return fmt.Sprintf("%s:%d/%s/%s/%s/%f,%f", c.IP, c.Port, c.CountryCode2, c.RegionCode, c.CityName, c.Latitude, c.Longitude) +} + +func (c *Client) CopyFromClientDB(cdb *clientDB) { + c.IP = cdb.IP + c.Port = cdb.Port + c.CountryName = cdb.CountryName + c.CountryCode2 = cdb.CountryCode2 + c.RegionName = cdb.RegionName + c.RegionCode = cdb.RegionCode + c.CityName = cdb.CityName + c.Latitude = cdb.Latitude + c.Longitude = cdb.Longitude +} + type clientDataDB struct { - IP string `json:"ip"` - UserAgentID int `json:"ua"` - BytesSent uint `json:"bs"` + ClientID int `json:"ci"` + UserAgentID int `json:"ua"` + BytesSent uint `json:"bs"` } -// stored in dataUpdatesBn type updateDB struct { SourceHubID int `json:"h,omitempty"` SourceHubUpdateID int `json:"hi,omitempty"` |