blob: 6647518813a4d21a40a9e4bea80a81ab45bcb2ee (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
package sfive
import "time"
type appendManyToken struct {
data []StatisticsData
response chan bool
}
type StatsSinkServer struct {
store sqliteStore
quit chan bool
done chan bool
appendData chan StatisticsData
appendManyData chan appendManyToken // chan []StatisticsData
}
func (self StatsSinkServer) appendActor() {
defer func() { self.done <- true }()
for {
select {
case <-self.quit:
return
case value := <-self.appendData:
var err error
for tryNum := 0; tryNum < 5; tryNum++ {
err = self.store.Append(value)
if err != nil {
time.Sleep(1 * time.Second)
} else {
break
}
}
if err != nil {
s5l.Printf("failed to store data: %v\n", err)
}
case token := <-self.appendManyData:
err := self.store.AppendMany(token.data)
if err != nil {
s5l.Printf("failed to store many data: %v\n", err)
token.response <- false
} else {
token.response <- true
}
}
}
}
func (self StatsSinkServer) Close() {
self.quit <- true
<-self.done
close(self.quit)
close(self.done)
close(self.appendData)
close(self.appendManyData)
self.store.Close()
}
func NewServer(dbPath string) (server *StatsSinkServer, err error) {
// TODO read configuration and create instance with correct settings
server = new(StatsSinkServer)
server.store, err = NewStore(dbPath)
if err != nil {
return
}
server.quit = make(chan bool)
server.done = make(chan bool)
server.appendData = make(chan StatisticsData, 100)
server.appendManyData = make(chan appendManyToken, 5)
go server.appendActor()
return
}
|