diff options
author | Christian Pointner <equinox@spreadspace.org> | 2017-07-13 22:19:36 +0200 |
---|---|---|
committer | Christian Pointner <equinox@spreadspace.org> | 2017-07-13 22:20:37 +0200 |
commit | 6e69590369f2410127a0061902361025b7aa7c12 (patch) | |
tree | 3493f0422d622878552afd6c91d05828dc2ae09e /src/hub | |
parent | cleaner shutdown for pipgram interface (diff) |
move pipgram into separate file
Diffstat (limited to 'src/hub')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 63 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvPipegram.go | 92 |
2 files changed, 92 insertions, 63 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 6cf2bc4..20e3085 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -33,21 +33,10 @@ package sfive import ( - "bytes" - "context" "io" "net" - "strings" ) -const ( - PipegramMessageSizeLimit = 1024 * 1024 // TODO: is this really needed? -) - -// -// Unix Socket Interface (streams) -// - func (srv *Server) pipeHandle(conn net.Conn) { defer conn.Close() @@ -108,55 +97,3 @@ func (srv *Server) ServePipe(cfg PipeInterfaceConfig) { go srv.pipeHandle(conn) } } - -// -// Unix Socket Interface (datagrams) -// - -func (srv *Server) pipegramRun() { - buffer := make([]byte, PipegramMessageSizeLimit) - - for { - n, _, err := srv.interfaces.pipegram.ReadFrom(buffer) - if err != nil { - if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this? - return - } - s5l.Printf("srv|pgram: read() failed: %v", err) - continue - } - data := buffer[0:n] - - update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode() - if err != nil { - s5l.Printf("srv|pgram: decoding data message failed: %v\n", err) - continue - } - - if err = srv.Ingest(update); err != nil { - s5l.Printf("srv|pgram: storing data failed: %v\n", err) - } - } -} - -func (srv *Server) pipegramStop(ctx context.Context) (err error) { - // TODO: this is a race condition between a call to webRun and webStop... - if srv.interfaces.pipegram == nil { - return nil - } - s5l.Printf("srv|pgram: shutting down") - return srv.interfaces.pipegram.Close() -} - -func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) (err error) { - if srv.interfaces.pipegram, err = net.ListenPacket("unixgram", cfg.ListenPath); err != nil { - s5l.Printf("srv|pgram: listen() failed: %v", err) - return - } - - s5l.Printf("srv|pgram: listening on '%s'", cfg.ListenPath) - defer s5l.Println("srv|pgram: interface stopped") - - srv.pipegramRun() - return -} diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go new file mode 100644 index 0000000..b2807ed --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go @@ -0,0 +1,92 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> +// Markus Grüneis <gimpf@gimpf.org> +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see <http://www.gnu.org/licenses/>. +// + +package sfive + +import ( + "bytes" + "context" + "net" + "strings" +) + +const ( + PipegramMessageSizeLimit = 1024 * 1024 // TODO: is this really needed? +) + +func (srv *Server) pipegramRun() { + buffer := make([]byte, PipegramMessageSizeLimit) + + for { + n, _, err := srv.interfaces.pipegram.ReadFrom(buffer) + if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this? + return + } + s5l.Printf("srv|pgram: read() failed: %v", err) + continue + } + data := buffer[0:n] + + update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode() + if err != nil { + s5l.Printf("srv|pgram: decoding data message failed: %v\n", err) + continue + } + + if err = srv.Ingest(update); err != nil { + s5l.Printf("srv|pgram: storing data failed: %v\n", err) + } + } +} + +func (srv *Server) pipegramStop(ctx context.Context) (err error) { + // TODO: this is a race condition between a call to pipegramRun and pipegramStop... + if srv.interfaces.pipegram == nil { + return nil + } + s5l.Printf("srv|pgram: shutting down") + return srv.interfaces.pipegram.Close() +} + +func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) (err error) { + if srv.interfaces.pipegram, err = net.ListenPacket("unixgram", cfg.ListenPath); err != nil { + s5l.Printf("srv|pgram: listen() failed: %v", err) + return + } + + s5l.Printf("srv|pgram: listening on '%s'", cfg.ListenPath) + defer s5l.Println("srv|pgram: interface stopped") + + srv.pipegramRun() + return +} |