package sfive import ( "bytes" "encoding/json" "io/ioutil" "net/http" "strconv" "time" ) func findMaxId(values []StatisticsData) int { maxId := -1 for i := range values { id := values[i].SourceHubDataUpdateId if id != nil && *id > maxId { maxId = *id } } return maxId } func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string, err error) { storeId, err = self.store.GetStoreId() if err != nil { s5l.Printf("fwd: failed to get own hubid: %v\n", err) return } resp, err := client.Get(baseurl + "/lastupdate/" + storeId) if err != nil { s5l.Printf("fwd: failed to query for lastupdate: %v\n", err) return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { s5l.Printf("fwd: server failed to fulfill query for lastupdate: %v\n", resp.StatusCode) return } body, err := ioutil.ReadAll(resp.Body) if err != nil { s5l.Printf("fwd: failed to read lastupdate response: %v\n", err) return } if len(body) == 0 { latestId = -1 } else { tid, errl := strconv.ParseInt(string(body), 10, 32) if errl != nil { s5l.Printf("fwd: invalid lastupdate response: %v\n", err) err = errl return } latestId = int(tid) } return } func (self StatsSinkServer) getUpdatesAfterInvoke(id int) (values []StatisticsData, err error) { token := getUpdatesAfterToken{id: id, response: make(chan getUpdatesResult, 1)} defer close(token.response) self.getUpdatesAfterChan <- token res := <-token.response values = res.values err = res.err return } func (self StatsSinkServer) handleForwarding(baseurl string, client *http.Client) { url := baseurl + "/updates" tryResync: for { lastId, _, err := self.getLastUpdate(baseurl, client) if err != nil { s5l.Printf("fwd: lastupdate returned err: %v", err) time.Sleep(5 * time.Second) continue tryResync } s5l.Printf("fwd: lastupdate: %d", lastId) nextBatch: for { updates, err := self.getUpdatesAfterInvoke(lastId) if err != nil { s5l.Printf("fwd: failed reading updates: %v\n", err) time.Sleep(500 * time.Millisecond) continue nextBatch } s5l.Printf("fwd: got %d updates", len(updates)) if len(updates) == 0 { time.Sleep(1 * time.Second) continue nextBatch } data, err := json.Marshal(StatisticsDataContainer{updates}) if err != nil { s5l.Panicf("fwd: encode failed: %v\n", err) } s5l.Printf("fwd: marshal OK") resp, err := client.Post(url, "application/json", bytes.NewBuffer(data)) if err != nil { s5l.Printf("fwd: post failed: %v\n", err) continue tryResync // TODO retry etc. } resp.Body.Close() if resp.StatusCode != http.StatusOK { s5l.Printf("fwd: post failed: %s\n", resp.Status) continue tryResync } // } s5l.Printf("fwd: post OK") lastId = findMaxId(updates) s5l.Printf("fwd: new lastid: %d", lastId) time.Sleep(1 * time.Second) } } } func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) { self.handleForwarding(forwardBaseUrl, http.DefaultClient) }