summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-07-13 22:19:36 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-07-13 22:20:37 +0200
commit6e69590369f2410127a0061902361025b7aa7c12 (patch)
tree3493f0422d622878552afd6c91d05828dc2ae09e
parentcleaner shutdown for pipgram interface (diff)
move pipgram into separate file
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go63
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipegram.go92
2 files changed, 92 insertions, 63 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 6cf2bc4..20e3085 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -33,21 +33,10 @@
package sfive
import (
- "bytes"
- "context"
"io"
"net"
- "strings"
)
-const (
- PipegramMessageSizeLimit = 1024 * 1024 // TODO: is this really needed?
-)
-
-//
-// Unix Socket Interface (streams)
-//
-
func (srv *Server) pipeHandle(conn net.Conn) {
defer conn.Close()
@@ -108,55 +97,3 @@ func (srv *Server) ServePipe(cfg PipeInterfaceConfig) {
go srv.pipeHandle(conn)
}
}
-
-//
-// Unix Socket Interface (datagrams)
-//
-
-func (srv *Server) pipegramRun() {
- buffer := make([]byte, PipegramMessageSizeLimit)
-
- for {
- n, _, err := srv.interfaces.pipegram.ReadFrom(buffer)
- if err != nil {
- if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this?
- return
- }
- s5l.Printf("srv|pgram: read() failed: %v", err)
- continue
- }
- data := buffer[0:n]
-
- update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode()
- if err != nil {
- s5l.Printf("srv|pgram: decoding data message failed: %v\n", err)
- continue
- }
-
- if err = srv.Ingest(update); err != nil {
- s5l.Printf("srv|pgram: storing data failed: %v\n", err)
- }
- }
-}
-
-func (srv *Server) pipegramStop(ctx context.Context) (err error) {
- // TODO: this is a race condition between a call to webRun and webStop...
- if srv.interfaces.pipegram == nil {
- return nil
- }
- s5l.Printf("srv|pgram: shutting down")
- return srv.interfaces.pipegram.Close()
-}
-
-func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) (err error) {
- if srv.interfaces.pipegram, err = net.ListenPacket("unixgram", cfg.ListenPath); err != nil {
- s5l.Printf("srv|pgram: listen() failed: %v", err)
- return
- }
-
- s5l.Printf("srv|pgram: listening on '%s'", cfg.ListenPath)
- defer s5l.Println("srv|pgram: interface stopped")
-
- srv.pipegramRun()
- return
-}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go
new file mode 100644
index 0000000..b2807ed
--- /dev/null
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go
@@ -0,0 +1,92 @@
+//
+// sfive
+//
+// sfive - spreadspace streaming statistics suite is a generic
+// statistic collection tool for streaming server infrastuctures.
+// The system collects and stores meta data like number of views
+// and throughput from a number of streaming servers and stores
+// it in a global data store.
+// The data acquisition is designed to be generic and extensible in
+// order to support different streaming software.
+// sfive also contains tools and applications to filter and visualize
+// live and recorded data.
+//
+//
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
+// Markus Grüneis <gimpf@gimpf.org>
+//
+// This file is part of sfive.
+//
+// sfive is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3
+// as published by the Free Software Foundation.
+//
+// sfive is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with sfive. If not, see <http://www.gnu.org/licenses/>.
+//
+
+package sfive
+
+import (
+ "bytes"
+ "context"
+ "net"
+ "strings"
+)
+
+const (
+ PipegramMessageSizeLimit = 1024 * 1024 // TODO: is this really needed?
+)
+
+func (srv *Server) pipegramRun() {
+ buffer := make([]byte, PipegramMessageSizeLimit)
+
+ for {
+ n, _, err := srv.interfaces.pipegram.ReadFrom(buffer)
+ if err != nil {
+ if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this?
+ return
+ }
+ s5l.Printf("srv|pgram: read() failed: %v", err)
+ continue
+ }
+ data := buffer[0:n]
+
+ update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode()
+ if err != nil {
+ s5l.Printf("srv|pgram: decoding data message failed: %v\n", err)
+ continue
+ }
+
+ if err = srv.Ingest(update); err != nil {
+ s5l.Printf("srv|pgram: storing data failed: %v\n", err)
+ }
+ }
+}
+
+func (srv *Server) pipegramStop(ctx context.Context) (err error) {
+ // TODO: this is a race condition between a call to pipegramRun and pipegramStop...
+ if srv.interfaces.pipegram == nil {
+ return nil
+ }
+ s5l.Printf("srv|pgram: shutting down")
+ return srv.interfaces.pipegram.Close()
+}
+
+func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) (err error) {
+ if srv.interfaces.pipegram, err = net.ListenPacket("unixgram", cfg.ListenPath); err != nil {
+ s5l.Printf("srv|pgram: listen() failed: %v", err)
+ return
+ }
+
+ s5l.Printf("srv|pgram: listening on '%s'", cfg.ListenPath)
+ defer s5l.Println("srv|pgram: interface stopped")
+
+ srv.pipegramRun()
+ return
+}