summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go5
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go20
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go79
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go61
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesApi.go9
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesStore.go67
6 files changed, 204 insertions, 37 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index d3ce519..6d07e8c 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -66,11 +66,10 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull {
bytesSentTotal += client.BytesSent
if srv.geoip != nil {
- if rec, err := srv.geoip.Lookup(client.IP); err != nil {
+ if info, err := srv.geoip.Lookup(client.IP); err != nil {
s5l.Printf("transform: Geo-IP lookup failed: %v", err)
} else {
- // TODO: actually store the record
- s5l.Printf("transform: Geo-IP lookup; %s -> %+v", client.IP, rec)
+ client.GeoInfo = *info
}
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index cf08e72..9fe5b86 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -104,6 +104,25 @@ func webSources(srv *Server, w http.ResponseWriter, r *http.Request) {
}
//
+// /clients
+//
+
+func webClients(srv *Server, w http.ResponseWriter, r *http.Request) {
+ if r.Method != "GET" {
+ sendInvalidMethod(w, r.Method)
+ return
+ }
+
+ var err error
+ resp := WebClientsResponse{}
+ if resp.Clients, err = srv.store.GetClients(); err != nil {
+ sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()})
+ return
+ }
+ sendWebResponse(w, http.StatusOK, resp)
+}
+
+//
// /updates/(:ID|_bulk)
//
@@ -382,6 +401,7 @@ func webRun(listener *net.TCPListener, srv *Server) (err error) {
mux.Handle("/healthz", webHandler{srv, webHealthz})
mux.Handle("/hubs", webHandler{srv, webHubs})
mux.Handle("/sources", webHandler{srv, webSources})
+ mux.Handle("/clients", webHandler{srv, webClients})
mux.Handle("/updates/", webHandler{srv, webUpdatesWithParam})
mux.Handle("/updates", webHandler{srv, webUpdates})
mux.Handle("/lastupdate/", webHandler{srv, webLastUpdateIDForUUID})
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index e8aebf5..ebe83ec 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -50,7 +50,7 @@ const (
var (
storeBuckets = []string{latestUpdatesBn, hubUUIDsFwdBn, hubUUIDsRevBn, updatesBn,
- sourcesFwdBn, sourcesRevBn, clientDataBn, userAgentsFwdBn, userAgentsRevBn}
+ sourcesFwdBn, sourcesRevBn, clientsFwdBn, clientsRevBn, clientDataBn, userAgentsFwdBn, userAgentsRevBn}
)
type Store struct {
@@ -278,7 +278,36 @@ func (st *Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaID int, err error
return uaID, err
}
-func (st *Store) insertClient(tx *bolt.Tx, uID int, cd []Client) error {
+func (st *Store) insertNewClient(tx *bolt.Tx, client *clientDB) (clientID int, err error) {
+ bf := tx.Bucket([]byte(clientsFwdBn))
+ // br.FillPercent = 1.0 // these appends are not ordered (the key is the slug and not an integer id)
+ br := tx.Bucket([]byte(clientsRevBn))
+ br.FillPercent = 1.0 // we only do appends (with ever incrementing interger ids)
+
+ slug := client.Slug()
+ bClientID := bf.Get([]byte(slug))
+ if bClientID != nil {
+ return btoi(bClientID), nil
+ }
+
+ var jsonData []byte
+ if jsonData, err = json.Marshal(client); err != nil {
+ return
+ }
+
+ next, _ := bf.NextSequence()
+ clientID = int(next)
+ if err = bf.Put([]byte(slug), itob(clientID)); err != nil {
+ return
+ }
+ if err = br.Put(itob(clientID), jsonData); err != nil {
+ return
+ }
+
+ return clientID, err
+}
+
+func (st *Store) insertClients(tx *bolt.Tx, uID int, cd []Client) error {
if len(cd) == 0 {
return nil
}
@@ -292,7 +321,13 @@ func (st *Store) insertClient(tx *bolt.Tx, uID int, cd []Client) error {
if err != nil {
return err
}
- data = append(data, clientDataDB{c.IP, uaID, c.BytesSent})
+
+ clientID, err := st.insertNewClient(tx, NewClientDB(&c))
+ if err != nil {
+ return err
+ }
+
+ data = append(data, clientDataDB{clientID, uaID, c.BytesSent})
}
jsonData, err := json.Marshal(data)
@@ -328,7 +363,7 @@ func (st *Store) appendItem(tx *bolt.Tx, uf *UpdateFull) (uID int, err error) {
if uID, err = st.insertUpdate(tx, u); err != nil {
return
}
- if err = st.insertClient(tx, uID, uf.Data.Clients); err != nil {
+ if err = st.insertClients(tx, uID, uf.Data.Clients); err != nil {
return
}
@@ -392,10 +427,11 @@ func (st *Store) getSource(tx *bolt.Tx, id int) (source *sourceDB, err error) {
}
func (st *Store) getClients(tx *bolt.Tx, id int) (clients []Client, err error) {
- bc := tx.Bucket([]byte(clientDataBn))
+ bcd := tx.Bucket([]byte(clientDataBn))
+ bc := tx.Bucket([]byte(clientsRevBn))
bu := tx.Bucket([]byte(userAgentsRevBn))
- jsonData := bc.Get(itob(id))
+ jsonData := bcd.Get(itob(id))
if jsonData == nil {
return
}
@@ -404,7 +440,18 @@ func (st *Store) getClients(tx *bolt.Tx, id int) (clients []Client, err error) {
return
}
for _, c := range data {
- cd := Client{IP: c.IP, BytesSent: c.BytesSent}
+ cd := Client{BytesSent: c.BytesSent}
+ jsonData := bc.Get(itob(c.ClientID))
+ if jsonData == nil {
+ err = ErrClientNotFound
+ return
+ }
+ client := &clientDB{}
+ if err = json.Unmarshal(jsonData, client); err != nil {
+ return
+ }
+ cd.CopyFromClientDB(client)
+
ua := bu.Get(itob(c.UserAgentID))
if ua != nil {
cd.UserAgent = string(ua)
@@ -556,3 +603,21 @@ func (st *Store) GetSources() (sources []*Source, err error) {
})
return
}
+
+func (st *Store) GetClients() (clients []*Client, err error) {
+ clients = []*Client{}
+ err = st.db.View(func(tx *bolt.Tx) error {
+ c := tx.Bucket([]byte(clientsRevBn)).Cursor()
+ for k, v := c.First(); k != nil; k, v = c.Next() {
+ var s *clientDB
+ if err := json.Unmarshal(v, &s); err != nil {
+ return err
+ }
+ client := &Client{}
+ client.CopyFromClientDB(s)
+ clients = append(clients, client)
+ }
+ return nil
+ })
+ return
+}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
index a82c208..3f5cd37 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -54,14 +54,14 @@ var (
sourceData = Source{Hostname: "streamer", Tags: []string{"tag1", "master"}, Stream: streamIDData}
updateData = Update{Data: UpdateData{ClientCount: 3, BytesReceived: 42, BytesSent: 136}, Duration: 5000}
clientsData = []Client{
- Client{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400},
- Client{"10.12.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400},
- Client{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400},
- Client{"192.168.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400},
- Client{"172.16.0.2", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}}
+ Client{"127.0.0.1", 51236, "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400, GeoInfo{}},
+ Client{"10.12.0.1", 0, "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400, GeoInfo{}},
+ Client{"127.0.0.1", 51202, "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400, GeoInfo{}},
+ Client{"192.168.0.1", 49487, "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400, GeoInfo{}},
+ Client{"172.16.0.2", 0, "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400, GeoInfo{}}}
)
-func generateTestData(n int) ([]*UpdateFull, int) {
+func generateTestData(n int) ([]*UpdateFull, int, int) {
hostnames := []string{"streamer1", "streamer2"}
contents := []string{"av", "audio"}
formats := []string{"webm", "flash", "hls"}
@@ -102,7 +102,7 @@ func generateTestData(n int) ([]*UpdateFull, int) {
}
starttime = starttime.Add(time.Duration(duration) * time.Millisecond)
}
- return data[:n], numSrcs
+ return data[:n], numSrcs, len(clientsData)
}
func TestMain(m *testing.M) {
@@ -507,7 +507,7 @@ func TestForwardedUpdates(t *testing.T) {
defer store.Close()
// generate/append some local updates
- data, _ := generateTestData(10)
+ data, _, _ := generateTestData(10)
if err := store.AppendMany(data); err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -666,11 +666,11 @@ func TestForwardedUpdates2(t *testing.T) {
defer finalStore.Close()
// generate/append some updates to src
- data, _ := generateTestData(10)
+ data, _, _ := generateTestData(10)
if err := src1Store.AppendMany(data); err != nil {
t.Fatalf("unexpected error: %v", err)
}
- data, _ = generateTestData(7)
+ data, _, _ = generateTestData(7)
if err := src2Store.AppendMany(data); err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -759,7 +759,7 @@ func TestGetSources(t *testing.T) {
}
// generate/append some data
- data, numSrcs := generateTestData(-1)
+ data, numSrcs, _ := generateTestData(-1)
if err := store.AppendMany(data); err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -775,6 +775,41 @@ func TestGetSources(t *testing.T) {
// the result will be orderd using the slug so doing a DeepEqual doesn't work here
}
+func TestGetClients(t *testing.T) {
+ // prepare a new store
+ os.Remove(testBoltPath)
+ store, err := NewStore(testBoltPath, false)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ defer store.Close()
+
+ // check if client list is empty
+ clientList, err := store.GetClients()
+ if err != nil {
+ t.Fatalf("fetching clients failed: %v", err)
+ }
+ if len(clientList) != 0 {
+ t.Fatalf("wrong number of clients: %d, expected 0", len(clientList))
+ }
+
+ // generate/append some data
+ data, _, numClients := generateTestData(-1)
+ if err := store.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // fetch all the clients
+ clientList, err = store.GetClients()
+ if err != nil {
+ t.Fatalf("fetching clients failed: %v", err)
+ }
+ if len(clientList) != numClients {
+ t.Fatalf("wrong number of clients: %d, expected %d", len(clientList), numClients)
+ }
+ // the result will be orderd using the slug so doing a DeepEqual doesn't work here
+}
+
//
// Benchmarking
//
@@ -786,7 +821,7 @@ func BenchmarkAppendMany(b *testing.B) {
b.Fatalf("unexpected error: %v", err)
}
defer store.Close()
- data, _ := generateTestData(b.N)
+ data, _, _ := generateTestData(b.N)
b.ResetTimer()
@@ -802,7 +837,7 @@ func BenchmarkGetUpdatesAfter(b *testing.B) {
b.Fatalf("unexpected error: %v", err)
}
defer store.Close()
- data, _ := generateTestData(b.N)
+ data, _, _ := generateTestData(b.N)
if err := store.AppendMany(data); err != nil {
b.Fatalf("unexpected error: %v", err)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
index c1c425e..fa0d7a2 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
@@ -61,9 +61,10 @@ type GeoInfo struct {
type Client struct {
IP string `json:"ip"`
+ Port uint `json:"port,omitempty"`
UserAgent string `json:"user-agent,omitempty"`
- BytesSent uint `json:"bytes-sent"`
- // GeoInfo
+ BytesSent uint `json:"bytes-sent,omitempty"`
+ GeoInfo
}
type UpdateData struct {
@@ -140,6 +141,10 @@ type WebSourcesResponse struct {
Sources []*Source `json:"sources"`
}
+type WebClientsResponse struct {
+ Clients []*Client `json:"clients"`
+}
+
type WebUpdatesGetResponse struct {
Updates []*UpdateFull `json:"updates"`
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
index 92147e1..57e6776 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
@@ -43,6 +43,7 @@ import (
var (
ErrNotFound = errors.New("date-update entry not found")
ErrSourceNotFound = errors.New("source entry not found")
+ ErrClientNotFound = errors.New("client entry not found")
ErrReadOnly = errors.New("store is in read-only mode")
)
@@ -55,6 +56,8 @@ const (
updatesBn = "Updates"
sourcesFwdBn = "SourcesFwd"
sourcesRevBn = "SourcesRev"
+ clientsFwdBn = "ClientsFwd"
+ clientsRevBn = "ClientsRev"
clientDataBn = "ClientData"
userAgentsFwdBn = "UserAgentsFwd"
userAgentsRevBn = "UserAgentsRev"
@@ -77,19 +80,19 @@ type sourceDB struct {
Tags []string `json:"t"`
}
-func NewSourceDB(value *UpdateFull) *sourceDB {
+func NewSourceDB(uf *UpdateFull) *sourceDB {
return &sourceDB{
- Hostname: value.Source.Hostname,
+ Hostname: uf.Source.Hostname,
Stream: streamDB{
- ContentID: value.Source.Stream.ContentID,
- Format: value.Source.Stream.Format,
- Quality: value.Source.Stream.Quality,
+ ContentID: uf.Source.Stream.ContentID,
+ Format: uf.Source.Stream.Format,
+ Quality: uf.Source.Stream.Quality,
},
- Tags: value.Source.Tags,
+ Tags: uf.Source.Tags,
}
}
-func (s sourceDB) Slug() string {
+func (s *sourceDB) Slug() string {
return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.Stream.ContentID, s.Stream.Format, s.Stream.Quality, strings.Join(s.Tags, ","))
}
@@ -101,14 +104,54 @@ func (s *Source) CopyFromSourceDB(sdb *sourceDB) {
s.Tags = sdb.Tags
}
-// stored in clientDataBn
+type clientDB struct {
+ IP string `json:"ip"`
+ Port uint `json:"po,omitempty"`
+ CountryName string `json:"cn,omitempty"`
+ CountryCode2 string `json:"cc,omitempty"`
+ RegionName string `json:"rn,omitempty"`
+ RegionCode string `json:"rc,omitempty"`
+ CityName string `json:"ct,omitempty"`
+ Latitude float64 `json:"lat,omitempty"`
+ Longitude float64 `json:"lon,omitempty"`
+}
+
+func NewClientDB(c *Client) *clientDB {
+ return &clientDB{
+ IP: c.IP,
+ Port: c.Port,
+ CountryName: c.CountryName,
+ CountryCode2: c.CountryCode2,
+ RegionName: c.RegionName,
+ RegionCode: c.RegionCode,
+ CityName: c.CityName,
+ Latitude: c.Latitude,
+ Longitude: c.Longitude,
+ }
+}
+
+func (c *clientDB) Slug() string {
+ return fmt.Sprintf("%s:%d/%s/%s/%s/%f,%f", c.IP, c.Port, c.CountryCode2, c.RegionCode, c.CityName, c.Latitude, c.Longitude)
+}
+
+func (c *Client) CopyFromClientDB(cdb *clientDB) {
+ c.IP = cdb.IP
+ c.Port = cdb.Port
+ c.CountryName = cdb.CountryName
+ c.CountryCode2 = cdb.CountryCode2
+ c.RegionName = cdb.RegionName
+ c.RegionCode = cdb.RegionCode
+ c.CityName = cdb.CityName
+ c.Latitude = cdb.Latitude
+ c.Longitude = cdb.Longitude
+}
+
type clientDataDB struct {
- IP string `json:"ip"`
- UserAgentID int `json:"ua"`
- BytesSent uint `json:"bs"`
+ ClientID int `json:"ci"`
+ UserAgentID int `json:"ua"`
+ BytesSent uint `json:"bs"`
}
-// stored in dataUpdatesBn
type updateDB struct {
SourceHubID int `json:"h,omitempty"`
SourceHubUpdateID int `json:"hi,omitempty"`