summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-05-06 18:49:49 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-05-06 18:49:49 +0200
commit584490b339b5160d93e3224cc5450efbde8be2aa (patch)
treebda122ef13c0bdd1659af5f6a4fd9cb1c70fad05
parentrevamp test client handling (diff)
cleanup and minor improvements for pipe and pipegram interface
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt.go23
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt_test.go67
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go79
-rwxr-xr-xsrc/hub/test-client4
4 files changed, 118 insertions, 55 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go
index 2021216..1a8e1f3 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go
@@ -48,6 +48,7 @@ const (
type Decoder interface {
Decode() (DataUpdateFull, error)
+ Slug() string
}
// stateless protocol interfaces
@@ -59,8 +60,12 @@ func NewStatelessDecoder(r io.Reader) Decoder {
return &StatelessDecoder{json.NewDecoder(r)}
}
-func (pd *StatelessDecoder) Decode() (res DataUpdateFull, err error) {
- if err = pd.dec.Decode(&res); err != nil {
+func (sd *StatelessDecoder) Slug() string {
+ return ""
+}
+
+func (sd *StatelessDecoder) Decode() (res DataUpdateFull, err error) {
+ if err = sd.dec.Decode(&res); err != nil {
return
}
if res.Version != ProtocolVersion {
@@ -88,6 +93,11 @@ func NewStatefulDecoder(r io.Reader) (Decoder, error) {
return res, nil
}
+func (sd *StatefulDecoder) Slug() string {
+ s := sd.SourceId
+ return fmt.Sprintf("%s/%s/%s/%s", s.Hostname, s.StreamId.ContentId, s.StreamId.Format, s.StreamId.Quality)
+}
+
func (sd *StatefulDecoder) Decode() (res DataUpdateFull, err error) {
res.Version = sd.Version
res.CopyFromSourceId(&sd.SourceId)
@@ -106,6 +116,7 @@ func (sd *StatefulDecoder) Decode() (res DataUpdateFull, err error) {
type Encoder interface {
Encode(data DataUpdateFull) error
+ Slug() string
}
type StatelessEncoder struct {
@@ -116,8 +127,12 @@ func NewStatelessEncoder(w io.Writer) Encoder {
return &StatelessEncoder{json.NewEncoder(w)}
}
-func (pe *StatelessEncoder) Encode(data DataUpdateFull) error {
+func (se *StatelessEncoder) Slug() string {
+ return ""
+}
+
+func (se *StatelessEncoder) Encode(data DataUpdateFull) error {
data.Version = ProtocolVersion
data.StartTime = data.StartTime.UTC()
- return pe.enc.Encode(data)
+ return se.enc.Encode(data)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
index d1a3a4d..2b102d5 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
@@ -55,6 +55,38 @@ func GetExpected() (expected DataUpdateFull) {
return
}
+func TestDecodeStateless(t *testing.T) {
+ // invalid message
+ dec := NewStatelessDecoder(strings.NewReader("this is still not json"))
+ if _, err := dec.Decode(); err == nil {
+ t.Fatalf("decoding invalid json should throw an error")
+ }
+
+ // check the slug
+ if dec.Slug() != "" {
+ t.Fatalf("the slug of a stateless decoder should be the empty string")
+ }
+
+ // wrong protocol version
+ dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion+1, initDataEncoded, updateDataEncoded)))
+ if _, err := dec.Decode(); err == nil {
+ t.Fatalf("decoding message with wrong protocol version should throw an error")
+ }
+
+ // valid message
+ dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initDataEncoded, updateDataEncoded)))
+ decoded, err := dec.Decode()
+ if err != nil {
+ t.Fatalf("decoding message failed: %v", err)
+ }
+
+ // compare with expected result
+ expected := GetExpected()
+ if !reflect.DeepEqual(decoded, expected) {
+ t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected)
+ }
+}
+
func TestDecodeStateful(t *testing.T) {
// invalid init message
if _, err := NewStatefulDecoder(strings.NewReader("this is not json")); err == nil {
@@ -112,33 +144,14 @@ func TestDecodeStateful(t *testing.T) {
if !reflect.DeepEqual(decoded, expected) {
t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected)
}
-}
-
-func TestDecodeStateless(t *testing.T) {
- // invalid message
- dec := NewStatelessDecoder(strings.NewReader("this is still not json"))
- if _, err := dec.Decode(); err == nil {
- t.Fatalf("decoding invalid json should throw an error")
- }
- // wrong protocol version
- dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion+1, initDataEncoded, updateDataEncoded)))
- if _, err := dec.Decode(); err == nil {
- t.Fatalf("decoding message with wrong protocol version should throw an error")
+ slug := dec.Slug()
+ expectedSlug := strings.Join([]string{initDataStruct.Hostname, initDataStruct.StreamId.ContentId,
+ initDataStruct.StreamId.Format, initDataStruct.StreamId.Quality}, "/")
+ if slug != expectedSlug {
+ t.Fatalf("slug failed:\n actual: %v\n expected: %v\n", slug, expectedSlug)
}
- // valid message
- dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initDataEncoded, updateDataEncoded)))
- decoded, err := dec.Decode()
- if err != nil {
- t.Fatalf("decoding message failed: %v", err)
- }
-
- // compare with expected result
- expected := GetExpected()
- if !reflect.DeepEqual(decoded, expected) {
- t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected)
- }
}
func TestEncodeStateless(t *testing.T) {
@@ -152,6 +165,12 @@ func TestEncodeStateless(t *testing.T) {
t.Fatalf("encoding message failed: %v", err)
}
+ // check the slug
+ if enc.Slug() != "" {
+ t.Fatalf("the slug of a stateless encoder should be the empty string")
+ }
+
+ // try to decode encoded message
decoded, err := NewStatelessDecoder(encoded).Decode()
if err != nil {
t.Fatalf("decoding message failed: %v", err)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 52e098b..f94b2e7 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -38,31 +38,79 @@ import (
"net"
)
+const (
+ PipegramMessageSizeLimit = 1024 * 1024 // TODO: is this really needed?
+)
+
+//
+// Unix Socket Interface (streams)
+//
+
func (srv Server) pipeHandle(conn net.Conn) {
+ defer conn.Close()
+
decoder, err := NewStatefulDecoder(conn)
if err != nil {
s5l.Printf("pipe: failed to read init message: %v\n", err)
return
}
+ slug := decoder.Slug()
+ s5l.Printf("pipe: new connection: %s\n", slug)
+ defer func() {
+ s5l.Printf("pipe(%s): connection closed\n", slug)
+ }()
+
for {
value, err := decoder.Decode()
if err != nil {
- if err != io.EOF {
- s5l.Printf("pipe: failed to read data message: %v\n", err)
+ if err == io.EOF {
+ break
+ }
+ // TODO: send NACK?
+
+ opErr, isOpErr := err.(*net.OpError)
+ if isOpErr && opErr.Temporary() {
+ s5l.Printf("pipe(%s): failed to read data message: %v (temporary error)\n", slug, err)
+ } else {
+ s5l.Printf("pipe(%s): failed to read data message: %v\n", slug, err)
+ break
}
- // TODO: check for temporary error?
- break
}
if err = srv.Append(value); err != nil {
- s5l.Printf("pipe: failed to store data: %v\n", err)
+ s5l.Printf("pipe(%s): failed to store data: %v\n", slug, err)
+ // TODO: send NACK?
+ break
}
+ // TODO: send ACK?
}
}
+func (srv Server) ServePipe(pipePath string) {
+ ln, err := net.Listen("unix", pipePath)
+ if err != nil {
+ s5l.Printf("pipe: failed to connect: %v", err)
+ return
+ }
+ defer ln.Close()
+
+ for {
+ conn, err := ln.Accept()
+ if err != nil {
+ s5l.Printf("pipe: failed accept: %v", err)
+ // ignore
+ continue
+ }
+ go srv.pipeHandle(conn)
+ }
+}
+
+//
+// Unix Socket Interface (datagrams)
+//
func (srv Server) pipegramHandle(pconn net.PacketConn) {
- buffer := make([]byte, 64*1024) // TODO: hardcoded size
+ buffer := make([]byte, PipegramMessageSizeLimit)
for {
n, _, err := pconn.ReadFrom(buffer)
@@ -84,25 +132,6 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) {
}
}
-func (srv Server) ServePipe(pipePath string) {
- ln, err := net.Listen("unix", pipePath)
- if err != nil {
- s5l.Printf("pipe: failed to connect: %v", err)
- return
- }
- defer ln.Close()
-
- for {
- conn, err := ln.Accept()
- if err != nil {
- s5l.Printf("pipe: failed accept: %v", err)
- // ignore
- continue
- }
- go srv.pipeHandle(conn)
- }
-}
-
func (srv Server) ServePipegram(pipePath string) {
pconn, err := net.ListenPacket("unixgram", pipePath)
if err != nil {
diff --git a/src/hub/test-client b/src/hub/test-client
index 5858969..31e7a46 100755
--- a/src/hub/test-client
+++ b/src/hub/test-client
@@ -29,7 +29,7 @@ case $1 in
;;
esac
-echo "store contents:"
-echo "---------------"
+echo "store contents"
+echo "--------------"
curl 'http://localhost:8000/updates' | jq .
echo ""