package sfive import ( "bytes" "encoding/json" "fmt" "io/ioutil" "net/http" "strings" "time" ) const lastUpdateJson = `{ "query": {"match": { "SourceHubUuid": "%s" } }, "aggregations": { "last-id" : { "max" : { "field": "SourceHubDataUpdateId" } } } }` func (self StatsSinkServer) getLastUpdateEs(baseurl string, client *http.Client) (latestId int, storeId string, err error) { url := baseurl + "/dataupdate/_search?pretty" storeId, err = self.getHubIdInvoke() if err != nil { s5l.Printf("fwd-es: failed to get own hubid: %v\n", err) return } queryJson := fmt.Sprintf(lastUpdateJson, storeId) s5tl.Printf("fwd-es: query: %s", queryJson) resp, err := client.Post(url, "application/json", strings.NewReader(queryJson)) if err != nil { s5l.Printf("fwd-es: failed to query for lastupdate: %v\n", err) return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { s5l.Printf("fwd-es: server failed to fulfill query for lastupdate: %v\n", resp.StatusCode) return } body, err := ioutil.ReadAll(resp.Body) if err != nil { s5l.Printf("fwd-es: failed to read lastupdate response: %v\n", err) return } s5tl.Printf("fwd-es: lastupdate response: %s\n", body) if len(body) == 0 { latestId = -1 } else { var value interface{} errl := json.Unmarshal(body, &value) if errl != nil { s5l.Printf("fwd-es: invalid lastupdate response: %v\n", errl) err = errl return } idstrcntr := value.(map[string]interface{})["aggregations"].(map[string]interface{})["last-id"].(map[string]interface{})["value"] if idstrcntr == nil { s5l.Printf("fwd-es: we are new here\n") latestId = -1 return } tid := idstrcntr.(float64) latestId = int(tid) } return } func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) { url := baseurl + "/_bulk" tryResync: for { lastId, _, err := self.getLastUpdateEs(baseurl, client) if err != nil { s5l.Printf("fwd-es: lastupdate returned err: %v", err) time.Sleep(5 * time.Second) continue tryResync } s5l.Printf("fwd-es: lastupdate: %d", lastId) nextBatch: for { updates, err := self.getUpdatesAfterInvoke(lastId) if err != nil { s5l.Printf("fwd-es: failed reading updates: %v\n", err) time.Sleep(500 * time.Millisecond) continue nextBatch } s5l.Printf("fwd-es: got %d updates", len(updates)) if len(updates) == 0 { time.Sleep(1 * time.Second) continue nextBatch } postData := bytes.Buffer{} for _, update := range updates { data, err := json.Marshal(update) if err != nil { s5l.Panicf("fwd-es: encode failed: %v\n", err) } postData.WriteString(`{"index":{"_type":"dataupdate"}}`) postData.WriteRune('\n') postData.Write(data) postData.WriteRune('\n') } //s5tl.Printf("fwd-es: marshalled:\n%v\n", (string)(postData.Bytes())) s5l.Printf("fwd-es: marshal OK") resp, err := client.Post(url, "application/json", bytes.NewReader(postData.Bytes())) if err != nil { s5l.Printf("fwd-es: post failed: %v\n", err) time.Sleep(1 * time.Second) continue tryResync } if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { body, _ := ioutil.ReadAll(resp.Body) s5l.Printf("fwd-es: post failed: %s\n%s\n", resp.Status, body) time.Sleep(1 * time.Second) continue tryResync } resp.Body.Close() s5l.Printf("fwd-es: all posts OK") lastId = findMaxId(updates) s5l.Printf("fwd-es: new lastid: %d", lastId) //time.Sleep(1 * time.Second) } } } func (self StatsSinkServer) RunForwardingToElasticSearch(forwardBaseUrl string) { self.handleForwardingToElasticSearch(forwardBaseUrl, http.DefaultClient) }