From 584490b339b5160d93e3224cc5450efbde8be2aa Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sat, 6 May 2017 18:49:49 +0200 Subject: cleanup and minor improvements for pipe and pipegram interface --- src/hub/src/spreadspace.org/sfive/s5cvt.go | 23 +++++-- src/hub/src/spreadspace.org/sfive/s5cvt_test.go | 67 +++++++++++++-------- src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 79 +++++++++++++++++-------- src/hub/test-client | 4 +- 4 files changed, 118 insertions(+), 55 deletions(-) diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go index 2021216..1a8e1f3 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go @@ -48,6 +48,7 @@ const ( type Decoder interface { Decode() (DataUpdateFull, error) + Slug() string } // stateless protocol interfaces @@ -59,8 +60,12 @@ func NewStatelessDecoder(r io.Reader) Decoder { return &StatelessDecoder{json.NewDecoder(r)} } -func (pd *StatelessDecoder) Decode() (res DataUpdateFull, err error) { - if err = pd.dec.Decode(&res); err != nil { +func (sd *StatelessDecoder) Slug() string { + return "" +} + +func (sd *StatelessDecoder) Decode() (res DataUpdateFull, err error) { + if err = sd.dec.Decode(&res); err != nil { return } if res.Version != ProtocolVersion { @@ -88,6 +93,11 @@ func NewStatefulDecoder(r io.Reader) (Decoder, error) { return res, nil } +func (sd *StatefulDecoder) Slug() string { + s := sd.SourceId + return fmt.Sprintf("%s/%s/%s/%s", s.Hostname, s.StreamId.ContentId, s.StreamId.Format, s.StreamId.Quality) +} + func (sd *StatefulDecoder) Decode() (res DataUpdateFull, err error) { res.Version = sd.Version res.CopyFromSourceId(&sd.SourceId) @@ -106,6 +116,7 @@ func (sd *StatefulDecoder) Decode() (res DataUpdateFull, err error) { type Encoder interface { Encode(data DataUpdateFull) error + Slug() string } type StatelessEncoder struct { @@ -116,8 +127,12 @@ func NewStatelessEncoder(w io.Writer) Encoder { return &StatelessEncoder{json.NewEncoder(w)} } -func (pe *StatelessEncoder) Encode(data DataUpdateFull) error { +func (se *StatelessEncoder) Slug() string { + return "" +} + +func (se *StatelessEncoder) Encode(data DataUpdateFull) error { data.Version = ProtocolVersion data.StartTime = data.StartTime.UTC() - return pe.enc.Encode(data) + return se.enc.Encode(data) } diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go index d1a3a4d..2b102d5 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go @@ -55,6 +55,38 @@ func GetExpected() (expected DataUpdateFull) { return } +func TestDecodeStateless(t *testing.T) { + // invalid message + dec := NewStatelessDecoder(strings.NewReader("this is still not json")) + if _, err := dec.Decode(); err == nil { + t.Fatalf("decoding invalid json should throw an error") + } + + // check the slug + if dec.Slug() != "" { + t.Fatalf("the slug of a stateless decoder should be the empty string") + } + + // wrong protocol version + dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion+1, initDataEncoded, updateDataEncoded))) + if _, err := dec.Decode(); err == nil { + t.Fatalf("decoding message with wrong protocol version should throw an error") + } + + // valid message + dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initDataEncoded, updateDataEncoded))) + decoded, err := dec.Decode() + if err != nil { + t.Fatalf("decoding message failed: %v", err) + } + + // compare with expected result + expected := GetExpected() + if !reflect.DeepEqual(decoded, expected) { + t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected) + } +} + func TestDecodeStateful(t *testing.T) { // invalid init message if _, err := NewStatefulDecoder(strings.NewReader("this is not json")); err == nil { @@ -112,33 +144,14 @@ func TestDecodeStateful(t *testing.T) { if !reflect.DeepEqual(decoded, expected) { t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected) } -} - -func TestDecodeStateless(t *testing.T) { - // invalid message - dec := NewStatelessDecoder(strings.NewReader("this is still not json")) - if _, err := dec.Decode(); err == nil { - t.Fatalf("decoding invalid json should throw an error") - } - // wrong protocol version - dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion+1, initDataEncoded, updateDataEncoded))) - if _, err := dec.Decode(); err == nil { - t.Fatalf("decoding message with wrong protocol version should throw an error") + slug := dec.Slug() + expectedSlug := strings.Join([]string{initDataStruct.Hostname, initDataStruct.StreamId.ContentId, + initDataStruct.StreamId.Format, initDataStruct.StreamId.Quality}, "/") + if slug != expectedSlug { + t.Fatalf("slug failed:\n actual: %v\n expected: %v\n", slug, expectedSlug) } - // valid message - dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initDataEncoded, updateDataEncoded))) - decoded, err := dec.Decode() - if err != nil { - t.Fatalf("decoding message failed: %v", err) - } - - // compare with expected result - expected := GetExpected() - if !reflect.DeepEqual(decoded, expected) { - t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected) - } } func TestEncodeStateless(t *testing.T) { @@ -152,6 +165,12 @@ func TestEncodeStateless(t *testing.T) { t.Fatalf("encoding message failed: %v", err) } + // check the slug + if enc.Slug() != "" { + t.Fatalf("the slug of a stateless encoder should be the empty string") + } + + // try to decode encoded message decoded, err := NewStatelessDecoder(encoded).Decode() if err != nil { t.Fatalf("decoding message failed: %v", err) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 52e098b..f94b2e7 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -38,31 +38,79 @@ import ( "net" ) +const ( + PipegramMessageSizeLimit = 1024 * 1024 // TODO: is this really needed? +) + +// +// Unix Socket Interface (streams) +// + func (srv Server) pipeHandle(conn net.Conn) { + defer conn.Close() + decoder, err := NewStatefulDecoder(conn) if err != nil { s5l.Printf("pipe: failed to read init message: %v\n", err) return } + slug := decoder.Slug() + s5l.Printf("pipe: new connection: %s\n", slug) + defer func() { + s5l.Printf("pipe(%s): connection closed\n", slug) + }() + for { value, err := decoder.Decode() if err != nil { - if err != io.EOF { - s5l.Printf("pipe: failed to read data message: %v\n", err) + if err == io.EOF { + break + } + // TODO: send NACK? + + opErr, isOpErr := err.(*net.OpError) + if isOpErr && opErr.Temporary() { + s5l.Printf("pipe(%s): failed to read data message: %v (temporary error)\n", slug, err) + } else { + s5l.Printf("pipe(%s): failed to read data message: %v\n", slug, err) + break } - // TODO: check for temporary error? - break } if err = srv.Append(value); err != nil { - s5l.Printf("pipe: failed to store data: %v\n", err) + s5l.Printf("pipe(%s): failed to store data: %v\n", slug, err) + // TODO: send NACK? + break } + // TODO: send ACK? } } +func (srv Server) ServePipe(pipePath string) { + ln, err := net.Listen("unix", pipePath) + if err != nil { + s5l.Printf("pipe: failed to connect: %v", err) + return + } + defer ln.Close() + + for { + conn, err := ln.Accept() + if err != nil { + s5l.Printf("pipe: failed accept: %v", err) + // ignore + continue + } + go srv.pipeHandle(conn) + } +} + +// +// Unix Socket Interface (datagrams) +// func (srv Server) pipegramHandle(pconn net.PacketConn) { - buffer := make([]byte, 64*1024) // TODO: hardcoded size + buffer := make([]byte, PipegramMessageSizeLimit) for { n, _, err := pconn.ReadFrom(buffer) @@ -84,25 +132,6 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) { } } -func (srv Server) ServePipe(pipePath string) { - ln, err := net.Listen("unix", pipePath) - if err != nil { - s5l.Printf("pipe: failed to connect: %v", err) - return - } - defer ln.Close() - - for { - conn, err := ln.Accept() - if err != nil { - s5l.Printf("pipe: failed accept: %v", err) - // ignore - continue - } - go srv.pipeHandle(conn) - } -} - func (srv Server) ServePipegram(pipePath string) { pconn, err := net.ListenPacket("unixgram", pipePath) if err != nil { diff --git a/src/hub/test-client b/src/hub/test-client index 5858969..31e7a46 100755 --- a/src/hub/test-client +++ b/src/hub/test-client @@ -29,7 +29,7 @@ case $1 in ;; esac -echo "store contents:" -echo "---------------" +echo "store contents" +echo "--------------" curl 'http://localhost:8000/updates' | jq . echo "" -- cgit v1.2.3