1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
package sfive
import (
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
"strconv"
"time"
)
func findMaxId(values []StatisticsData) int {
maxId := -1
for i := range values {
id := values[i].SourceHubDataUpdateId
if id != nil && *id > maxId {
maxId = *id
}
}
return maxId
}
func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string) {
storeId, err := self.store.GetStoreId()
if err != nil {
s5l.Panicf("fwd: failed to get own hubid: %v\n", err)
}
resp, err := client.Get(baseurl + "/lastupdate/" + storeId)
if err != nil {
s5l.Panicf("fwd: failed to query for lastupdate: %v\n", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
s5l.Panicf("fwd: server failed to fulfill query for lastupdate: %v\n", resp.StatusCode)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
s5l.Panicf("fwd: failed to read lastupdate response: %v\n", err)
}
if len(body) == 0 {
latestId = -1
} else {
tid, err := strconv.ParseInt(string(body), 10, 32)
if err != nil {
s5l.Panicf("fwd: invalid lastupdate response: %v\n", err)
}
latestId = int(tid)
}
return
}
func (self StatsSinkServer) handleForwarding(baseurl string, client *http.Client) {
url := baseurl + "/updates"
tryResync:
for {
lastId, _ := self.getLastUpdate(baseurl, client)
nextBatch:
for {
updates, err := self.store.GetUpdatesAfter(lastId)
if err != nil {
s5l.Printf("fwd: failed reading updates: %v\n", err)
time.Sleep(500 * time.Millisecond)
continue nextBatch
}
if len(updates) == 0 {
time.Sleep(1 * time.Second)
continue nextBatch
}
data, err := json.Marshal(StatisticsDataContainer{updates})
if err != nil {
s5l.Panicf("fwd: encode failed: %v\n", err)
}
_, err = client.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
s5l.Printf("fwd: post failed: %v\n", err)
continue tryResync
// TODO retry etc.
}
// }
lastId = findMaxId(updates)
s5tl.Printf("fwd: new lastid: %d", lastId)
time.Sleep(100 * time.Millisecond)
}
}
}
func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) {
self.handleForwarding(forwardBaseUrl, http.DefaultClient)
}
|