From f17897ee7d4d57af4ecdd4f13cdf666011fddb74 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Mon, 5 Jun 2017 15:42:55 +0200 Subject: updated s5proxy to new type names --- doc/TODO | 1 - src/daq/s5proxy/sample.json | 2 +- src/daq/s5proxy/src/s5proxy/config.go | 2 +- src/daq/s5proxy/src/s5proxy/main.go | 2 +- src/daq/s5proxy/src/s5proxy/proxy.go | 22 +++++++---- src/daq/s5proxy/src/s5proxy/stats.go | 74 ++++++++++++++++++++++++----------- src/daq/s5proxy/test | 3 ++ src/hub/test-srv-geoip-anon | 15 +++++++ 8 files changed, 87 insertions(+), 34 deletions(-) create mode 100755 src/daq/s5proxy/test create mode 100755 src/hub/test-srv-geoip-anon diff --git a/doc/TODO b/doc/TODO index 2785c6f..80c8711 100644 --- a/doc/TODO +++ b/doc/TODO @@ -3,5 +3,4 @@ * address issues with python-twisted dropping messages when transmit buffer is full * move common code of python-twisted based importer to seperate module * fix wrong usage of twisted -* hub: add geo-ip lookups * hub: add support for Elasticsearch 5.x diff --git a/src/daq/s5proxy/sample.json b/src/daq/s5proxy/sample.json index b1abcdc..d45a4d5 100644 --- a/src/daq/s5proxy/sample.json +++ b/src/daq/s5proxy/sample.json @@ -14,7 +14,7 @@ { "op": "time", "header": "Expires", "value": "-1s" } ], "sfive": { - "socket": "/run/sfive/pipegram", + "socket": "../../hub/test/pipegram", "hostname": "public1", "tags": [ "hello", "world" ], "duration": "15s", diff --git a/src/daq/s5proxy/src/s5proxy/config.go b/src/daq/s5proxy/src/s5proxy/config.go index 280b9fd..1fc721d 100644 --- a/src/daq/s5proxy/src/s5proxy/config.go +++ b/src/daq/s5proxy/src/s5proxy/config.go @@ -12,7 +12,7 @@ // live and recorded data. // // -// Copyright (C) 2014-2016 Christian Pointner +// Copyright (C) 2014-2017 Christian Pointner // Markus Grüneis // // This file is part of sfive. diff --git a/src/daq/s5proxy/src/s5proxy/main.go b/src/daq/s5proxy/src/s5proxy/main.go index fd57e42..8871c71 100644 --- a/src/daq/s5proxy/src/s5proxy/main.go +++ b/src/daq/s5proxy/src/s5proxy/main.go @@ -12,7 +12,7 @@ // live and recorded data. // // -// Copyright (C) 2014-2016 Christian Pointner +// Copyright (C) 2014-2017 Christian Pointner // Markus Grüneis // // This file is part of sfive. diff --git a/src/daq/s5proxy/src/s5proxy/proxy.go b/src/daq/s5proxy/src/s5proxy/proxy.go index b5c7a72..6ef1742 100644 --- a/src/daq/s5proxy/src/s5proxy/proxy.go +++ b/src/daq/s5proxy/src/s5proxy/proxy.go @@ -12,7 +12,7 @@ // live and recorded data. // // -// Copyright (C) 2014-2016 Christian Pointner +// Copyright (C) 2014-2017 Christian Pointner // Markus Grüneis // // This file is part of sfive. @@ -34,6 +34,7 @@ package main import ( "crypto/tls" + "net" "net/http" "net/http/httputil" "net/url" @@ -53,8 +54,9 @@ func generateTime(input string) string { type s5proxyResponseWriter struct { wrapped http.ResponseWriter conf *Config - StatsCh chan<- *ClientData + StatsCh chan<- *Client IP string + Port int UserAgent string } @@ -65,7 +67,7 @@ func (r s5proxyResponseWriter) Header() http.Header { func (r s5proxyResponseWriter) Write(data []byte) (int, error) { sent, err := r.wrapped.Write(data) if r.StatsCh != nil { - stats := &ClientData{IP: r.IP, UserAgent: r.UserAgent, BytesSent: uint(sent)} + stats := &Client{IP: r.IP, Port: uint(r.Port), UserAgent: r.UserAgent, BytesSent: uint(sent)} select { case r.StatsCh <- stats: default: @@ -95,10 +97,16 @@ func (r s5proxyResponseWriter) WriteHeader(status int) { func proxyHandler(p *Proxy, w http.ResponseWriter, r *http.Request) { pw := s5proxyResponseWriter{wrapped: w, conf: p.conf} if p.stats != nil { - //pw.IP, _, _ = net.SplitHostPort(r.RemoteAddr) - pw.IP = r.RemoteAddr // Use IP:Port for now since IP alone might not be unique - pw.UserAgent = r.UserAgent() - pw.StatsCh = p.stats.GetUpdateChannel(r.URL.String()) + if ip, port, err := net.SplitHostPort(r.RemoteAddr); err != nil { + s5l.Printf("PROXY: client '%s' error: invalid address/port info: %v, no statistics will be gathered", r.RemoteAddr, err) + } else { + pw.IP = ip + if pw.Port, err = net.LookupPort("tcp", port); err != nil { + s5l.Printf("PROXY: client '%s' warning: can't parse port or service name: %v", r.RemoteAddr, err) + } + pw.UserAgent = r.UserAgent() + pw.StatsCh = p.stats.GetUpdateChannel(r.URL.String()) + } } p.proxy.ServeHTTP(pw, r) diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go index 803dfa3..078c1ad 100644 --- a/src/daq/s5proxy/src/s5proxy/stats.go +++ b/src/daq/s5proxy/src/s5proxy/stats.go @@ -12,7 +12,7 @@ // live and recorded data. // // -// Copyright (C) 2014-2016 Christian Pointner +// Copyright (C) 2014-2017 Christian Pointner // Markus Grüneis // // This file is part of sfive. @@ -54,43 +54,71 @@ type Stream struct { Quality string `json:"quality"` } -type ClientData struct { +type Source struct { + Hostname string `json:"hostname"` + Stream Stream `json:"stream"` + Tags []string `json:"tags,omitempty"` +} + +type GeoInfo struct { + CountryName string `json:"country,omitempty"` + CountryCode2 string `json:"country-code2,omitempty"` + RegionName string `json:"region,omitempty"` + RegionCode string `json:"region-code,omitempty"` + CityName string `json:"city,omitempty"` + Latitude float64 `json:"latitude,omitempty"` + Longitude float64 `json:"longitude,omitempty"` +} + +type Client struct { IP string `json:"ip"` - UserAgent string `json:"user-agent"` - BytesSent uint `json:"bytes-sent"` + Port uint `json:"port,omitempty"` + UserAgent string `json:"user-agent,omitempty"` + BytesSent uint `json:"bytes-sent,omitempty"` + GeoInfo } type UpdateData struct { - ClientCount uint `json:"client-count"` - BytesReceived uint `json:"bytes-received"` - BytesSent uint `json:"bytes-sent"` - Clients []ClientData `json:"clients,omitempty"` + ClientCount uint `json:"client-count"` + BytesReceived uint `json:"bytes-received,omitempty"` + BytesSent uint `json:"bytes-sent"` + Clients []Client `json:"clients,omitempty"` } type Update struct { - Version uint `json:"version"` - Hostname string `json:"hostname"` - Stream Stream `json:"stream"` - Tags []string `json:"tags,omitempty"` StartTime time.Time `json:"start-time"` Duration int64 `json:"duration-ms"` Data UpdateData `json:"data"` } +type Header struct { + Version uint `json:"version,omitempty"` // omitempty is needed for data only messages and for REST API + SourceHubUUID string `json:"SourceHubUuid,omitempty"` + SourceHubUpdateID int `json:"SourceHubUpdateId,omitempty"` + ForwardHubUUID string `json:"ForwardHubUuid,omitempty"` + ForwardHubUpdateID int `json:"ForwardHubUpdateId,omitempty"` +} + +type UpdateFull struct { + Header + Source + Update +} + type StatsWorker struct { stream Stream - current map[string]*ClientData + current map[string]*Client trigger chan time.Time - output chan<- *Update - input chan *ClientData + output chan<- *UpdateFull + input chan *Client } -func NewStatsWorker(stream Stream, updates chan<- *Update) (sw StatsWorker) { +func NewStatsWorker(stream Stream, updates chan<- *UpdateFull) (sw StatsWorker) { sw.stream = stream - sw.current = make(map[string]*ClientData) + sw.current = make(map[string]*Client) sw.trigger = make(chan time.Time) sw.output = updates - sw.input = make(chan *ClientData, 100) + sw.input = make(chan *Client, 100) return } @@ -99,7 +127,7 @@ func (sw StatsWorker) Run() { select { case t := <-sw.trigger: if t.UnixNano() != 0 { - upd := &Update{Stream: sw.stream, StartTime: t} + upd := &UpdateFull{Source: Source{Stream: sw.stream}, Update: Update{StartTime: t}} upd.Data.ClientCount = uint(len(sw.current)) for _, c := range sw.current { upd.Data.Clients = append(upd.Data.Clients, *c) @@ -111,7 +139,7 @@ func (sw StatsWorker) Run() { s5l.Printf("STATS: worker(%v): writing to output channel would block, dropping data update...", sw.stream) } } - sw.current = make(map[string]*ClientData) + sw.current = make(map[string]*Client) case upd := <-sw.input: if data, exists := sw.current[upd.IP]; exists { data.BytesSent += upd.BytesSent @@ -126,11 +154,11 @@ type Stats struct { conf *Config sock net.Conn dataEncoder *json.Encoder - updates chan *Update + updates chan *UpdateFull workers map[string]StatsWorker } -func (s *Stats) GetUpdateChannel(url string) chan<- *ClientData { +func (s *Stats) GetUpdateChannel(url string) chan<- *Client { s5l.Printf("STATS: got new client for url: %s", url) for name, worker := range s.workers { if strings.Contains(url, name) { @@ -191,7 +219,7 @@ func NewStats(conf *Config) (s *Stats, err error) { return } - s.updates = make(chan *Update, 1000) + s.updates = make(chan *UpdateFull, 1000) s.workers = make(map[string]StatsWorker) var content, format, quality []string for _, desc := range conf.SFive.Stream { diff --git a/src/daq/s5proxy/test b/src/daq/s5proxy/test new file mode 100755 index 0000000..f27f060 --- /dev/null +++ b/src/daq/s5proxy/test @@ -0,0 +1,3 @@ +#!/bin/bash + +exec bin/s5proxy -config sample.json diff --git a/src/hub/test-srv-geoip-anon b/src/hub/test-srv-geoip-anon new file mode 100755 index 0000000..558dc94 --- /dev/null +++ b/src/hub/test-srv-geoip-anon @@ -0,0 +1,15 @@ +#!/bin/sh + +if [ -z "$1" ]; then + echo "Usage: $0 " + exit 1 +fi + +TEST_D="./test" +TEST_DB="$TEST_D/$1.bolt" +EXTRA_OPTS="" + + +mkdir -p "$TEST_D" +rm -f "$TEST_D/pipe" "$TEST_D/pipegram" +exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -web ":8000" -geoip-db $2 -anonymize -- cgit v1.2.3