diff options
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srv.go')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 87 |
1 files changed, 28 insertions, 59 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 8bd7904..7eab0eb 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -1,77 +1,46 @@ package sfive -import ( - "bufio" - "fmt" - "net" -) - type StatsSinkServer struct { + store sqliteStore + quit chan bool + done chan bool + appendData chan StatisticsData } -func handleConnection(store StatsContainer, conn net.Conn) { - reader := bufio.NewReader(conn) - buffer, err := reader.ReadBytes('\n') - if err != nil { - fmt.Printf("s5 failed to read from connection: %v\n", err) - return - } - marshaller, err := NewStatefulDecoder(buffer) - if err != nil { - fmt.Printf("s5 failed initializing decoder with init message: %v\n", err) - return - } - +func (self StatsSinkServer) appendActor() { + defer func() { self.done <- true }() for { - buffer, err := reader.ReadBytes('\n') - if err != nil { - fmt.Printf("s5 failed to read from connection: %v\n", err) - return - } - - fmt.Printf("s5 msg: %v", string(buffer)) - - value, err := marshaller.Decode(buffer) - if err != nil { - fmt.Printf("s5 failed to decode message: %v\n", err) - continue - } - - err = store.Append(value) - if err != nil { - fmt.Printf("s5 failed to store data: %v\n", err) + select { + case <-self.quit: return + case value := <-self.appendData: + err := self.store.Append(value) + if err != nil { + s5l.Printf("failed to store data: %v\n", err) + } } } } -func (self StatsSinkServer) ListenAndServe() { - store, err := NewStore() - if err != nil { - fmt.Printf("s5 failed to connect to persistence layer: %v\n", err) - return - } - defer store.Close() +func (self StatsSinkServer) Close() { + self.quit <- true + <-self.done + close(self.quit) + close(self.done) + close(self.appendData) + self.store.Close() +} - ln, err := net.Listen("unix", "/run/sfive/receiver") +func NewServer(dbPath string) (server *StatsSinkServer, err error) { + // TODO read configuration and create instance with correct settings + server = new(StatsSinkServer) + server.store, err = NewStore(dbPath) if err != nil { - fmt.Printf("s5 failed to connect to unix pipe: %v", err) return } - defer ln.Close() - for { - conn, err := ln.Accept() - if err != nil { - // ignore - continue - } - go handleConnection(store, conn) - } -} - -func NewSfiveServer() (server *StatsSinkServer, err error) { - // TODO read configuration and create instance with correct settings - server = new(StatsSinkServer) + server.quit = make(chan bool) + server.done = make(chan bool) + server.appendData = make(chan StatisticsData) return } |