summaryrefslogtreecommitdiff
path: root/src/hub/src/spreadspace.org/sfive/s5srv.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srv.go')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go87
1 files changed, 28 insertions, 59 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 8bd7904..7eab0eb 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -1,77 +1,46 @@
package sfive
-import (
- "bufio"
- "fmt"
- "net"
-)
-
type StatsSinkServer struct {
+ store sqliteStore
+ quit chan bool
+ done chan bool
+ appendData chan StatisticsData
}
-func handleConnection(store StatsContainer, conn net.Conn) {
- reader := bufio.NewReader(conn)
- buffer, err := reader.ReadBytes('\n')
- if err != nil {
- fmt.Printf("s5 failed to read from connection: %v\n", err)
- return
- }
- marshaller, err := NewStatefulDecoder(buffer)
- if err != nil {
- fmt.Printf("s5 failed initializing decoder with init message: %v\n", err)
- return
- }
-
+func (self StatsSinkServer) appendActor() {
+ defer func() { self.done <- true }()
for {
- buffer, err := reader.ReadBytes('\n')
- if err != nil {
- fmt.Printf("s5 failed to read from connection: %v\n", err)
- return
- }
-
- fmt.Printf("s5 msg: %v", string(buffer))
-
- value, err := marshaller.Decode(buffer)
- if err != nil {
- fmt.Printf("s5 failed to decode message: %v\n", err)
- continue
- }
-
- err = store.Append(value)
- if err != nil {
- fmt.Printf("s5 failed to store data: %v\n", err)
+ select {
+ case <-self.quit:
return
+ case value := <-self.appendData:
+ err := self.store.Append(value)
+ if err != nil {
+ s5l.Printf("failed to store data: %v\n", err)
+ }
}
}
}
-func (self StatsSinkServer) ListenAndServe() {
- store, err := NewStore()
- if err != nil {
- fmt.Printf("s5 failed to connect to persistence layer: %v\n", err)
- return
- }
- defer store.Close()
+func (self StatsSinkServer) Close() {
+ self.quit <- true
+ <-self.done
+ close(self.quit)
+ close(self.done)
+ close(self.appendData)
+ self.store.Close()
+}
- ln, err := net.Listen("unix", "/run/sfive/receiver")
+func NewServer(dbPath string) (server *StatsSinkServer, err error) {
+ // TODO read configuration and create instance with correct settings
+ server = new(StatsSinkServer)
+ server.store, err = NewStore(dbPath)
if err != nil {
- fmt.Printf("s5 failed to connect to unix pipe: %v", err)
return
}
- defer ln.Close()
- for {
- conn, err := ln.Accept()
- if err != nil {
- // ignore
- continue
- }
- go handleConnection(store, conn)
- }
-}
-
-func NewSfiveServer() (server *StatsSinkServer, err error) {
- // TODO read configuration and create instance with correct settings
- server = new(StatsSinkServer)
+ server.quit = make(chan bool)
+ server.done = make(chan bool)
+ server.appendData = make(chan StatisticsData)
return
}