summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-05-06 04:06:18 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-05-06 04:06:18 +0200
commit6de098088870144c54118a4eaa04d8c77dc480b0 (patch)
tree279a480a7960ddde797835793647bb988f864a0a /src
parentMerge branch 'bolt' (diff)
use json Encoder/Decoder and not marshal/unmarshal
Diffstat (limited to 'src')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt.go41
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt_test.go59
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go36
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go37
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go12
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesApi.go6
6 files changed, 98 insertions, 93 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go
index 26a7c83..2021216 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go
@@ -35,6 +35,7 @@ package sfive
import (
"encoding/json"
"fmt"
+ "io"
)
const (
@@ -46,18 +47,20 @@ const (
//
type Decoder interface {
- Decode(jsonString []byte) (DataUpdateFull, error)
+ Decode() (DataUpdateFull, error)
}
// stateless protocol interfaces
-type StatelessDecoder struct{}
+type StatelessDecoder struct {
+ dec *json.Decoder
+}
-func NewStatelessDecoder() Decoder {
- return &StatelessDecoder{}
+func NewStatelessDecoder(r io.Reader) Decoder {
+ return &StatelessDecoder{json.NewDecoder(r)}
}
-func (pd *StatelessDecoder) Decode(jsonString []byte) (res DataUpdateFull, err error) {
- if err = json.Unmarshal(jsonString, &res); err != nil {
+func (pd *StatelessDecoder) Decode() (res DataUpdateFull, err error) {
+ if err = pd.dec.Decode(&res); err != nil {
return
}
if res.Version != ProtocolVersion {
@@ -68,13 +71,15 @@ func (pd *StatelessDecoder) Decode(jsonString []byte) (res DataUpdateFull, err e
// stateful protocol interfaces
type StatefulDecoder struct {
- Version uint
+ dec *json.Decoder
+ Header
SourceId
}
-func NewStatefulDecoder(jsonString []byte) (Decoder, error) {
+func NewStatefulDecoder(r io.Reader) (Decoder, error) {
res := &StatefulDecoder{}
- if err := json.Unmarshal(jsonString, &res); err != nil {
+ res.dec = json.NewDecoder(r)
+ if err := res.dec.Decode(&res); err != nil {
return nil, err
}
if res.Version != ProtocolVersion {
@@ -83,10 +88,10 @@ func NewStatefulDecoder(jsonString []byte) (Decoder, error) {
return res, nil
}
-func (sd *StatefulDecoder) Decode(jsonString []byte) (res DataUpdateFull, err error) {
+func (sd *StatefulDecoder) Decode() (res DataUpdateFull, err error) {
res.Version = sd.Version
res.CopyFromSourceId(&sd.SourceId)
- if err = json.Unmarshal(jsonString, &res); err != nil {
+ if err = sd.dec.Decode(&res); err != nil {
return
}
if res.Version != sd.Version {
@@ -100,17 +105,19 @@ func (sd *StatefulDecoder) Decode(jsonString []byte) (res DataUpdateFull, err er
//
type Encoder interface {
- Encode(data DataUpdateFull) ([]byte, error)
+ Encode(data DataUpdateFull) error
}
-type StatelessEncoder struct{}
+type StatelessEncoder struct {
+ enc *json.Encoder
+}
-func NewStatelessEncoder() Encoder {
- return &StatelessEncoder{}
+func NewStatelessEncoder(w io.Writer) Encoder {
+ return &StatelessEncoder{json.NewEncoder(w)}
}
-func (pe *StatelessEncoder) Encode(data DataUpdateFull) (res []byte, err error) {
+func (pe *StatelessEncoder) Encode(data DataUpdateFull) error {
data.Version = ProtocolVersion
data.StartTime = data.StartTime.UTC()
- return json.Marshal(data)
+ return pe.enc.Encode(data)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
index 5a79d85..d1a3a4d 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
@@ -33,8 +33,10 @@
package sfive
import (
+ "bytes"
"fmt"
"reflect"
+ "strings"
"testing"
"time"
)
@@ -55,37 +57,52 @@ func GetExpected() (expected DataUpdateFull) {
func TestDecodeStateful(t *testing.T) {
// invalid init message
- if _, err := NewStatefulDecoder([]byte("this is not json")); err == nil {
+ if _, err := NewStatefulDecoder(strings.NewReader("this is not json")); err == nil {
t.Fatalf("creating decoder with invalid json should throw an error")
}
// wrong protocol version
wrongProtoVerMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, initDataEncoded)
- if _, err := NewStatefulDecoder([]byte(wrongProtoVerMsg)); err == nil {
+ if _, err := NewStatefulDecoder(strings.NewReader(wrongProtoVerMsg)); err == nil {
t.Fatalf("creating decoder with wrong protocol version should throw an error")
}
// valid init message
statefulInitMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, initDataEncoded)
- dec, err := NewStatefulDecoder([]byte(statefulInitMsg))
- if err != nil {
+ if _, err := NewStatefulDecoder(strings.NewReader(statefulInitMsg)); err != nil {
t.Fatalf("creating decoder failed: %v", err)
}
// invalid data-update message
- if _, err := dec.Decode([]byte("this isn't valid json either")); err == nil {
+ data := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, initDataEncoded)
+ data = data + "\nthis isn't valid json either"
+ dec, err := NewStatefulDecoder(strings.NewReader(data))
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if _, err := dec.Decode(); err == nil {
t.Fatalf("decoding message with invalid json should throw an error")
}
// wrong protocol version
- wrongProtoVerMsg = fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, updateDataEncoded)
- if _, err := dec.Decode([]byte(wrongProtoVerMsg)); err == nil {
- t.Fatalf("decoding message with wrong protocol version should throw an error")
+ data = fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, initDataEncoded)
+ data = data + fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, updateDataEncoded)
+ dec, err = NewStatefulDecoder(strings.NewReader(data))
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if _, err := dec.Decode(); err == nil {
+ t.Fatalf("decoding message with invalid json should throw an error")
}
// valid data-update message
- statefulUpdateMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, updateDataEncoded)
- decoded, err := dec.Decode([]byte(statefulUpdateMsg))
+ data = fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, initDataEncoded)
+ data = data + fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, updateDataEncoded)
+ dec, err = NewStatefulDecoder(strings.NewReader(data))
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ decoded, err := dec.Decode()
if err != nil {
t.Fatalf("decoding message failed: %v", err)
}
@@ -98,22 +115,21 @@ func TestDecodeStateful(t *testing.T) {
}
func TestDecodeStateless(t *testing.T) {
- dec := NewStatelessDecoder()
-
// invalid message
- if _, err := dec.Decode([]byte("this is still not json")); err == nil {
+ dec := NewStatelessDecoder(strings.NewReader("this is still not json"))
+ if _, err := dec.Decode(); err == nil {
t.Fatalf("decoding invalid json should throw an error")
}
// wrong protocol version
- wrongProtoVerMsg := fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion+1, initDataEncoded, updateDataEncoded)
- if _, err := dec.Decode([]byte(wrongProtoVerMsg)); err == nil {
+ dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion+1, initDataEncoded, updateDataEncoded)))
+ if _, err := dec.Decode(); err == nil {
t.Fatalf("decoding message with wrong protocol version should throw an error")
}
// valid message
- statelessDataMsg := fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initDataEncoded, updateDataEncoded)
- decoded, err := dec.Decode([]byte(statelessDataMsg))
+ dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initDataEncoded, updateDataEncoded)))
+ decoded, err := dec.Decode()
if err != nil {
t.Fatalf("decoding message failed: %v", err)
}
@@ -126,18 +142,17 @@ func TestDecodeStateless(t *testing.T) {
}
func TestEncodeStateless(t *testing.T) {
- enc := NewStatelessEncoder()
-
var td DataUpdateFull
td.CopyFromSourceId(&initDataStruct)
td.CopyFromUpdate(&updateDataStruct)
- encoded, err := enc.Encode(td)
- if err != nil {
+ encoded := &bytes.Buffer{}
+ enc := NewStatelessEncoder(encoded)
+ if err := enc.Encode(td); err != nil {
t.Fatalf("encoding message failed: %v", err)
}
- decoded, err := NewStatelessDecoder().Decode(encoded)
+ decoded, err := NewStatelessDecoder(encoded).Decode()
if err != nil {
t.Fatalf("decoding message failed: %v", err)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 4b48d99..52e098b 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -33,39 +33,26 @@
package sfive
import (
- "bufio"
+ "bytes"
"io"
"net"
)
func (srv Server) pipeHandle(conn net.Conn) {
- reader := bufio.NewReader(conn)
- buffer, err := reader.ReadBytes('\n')
+ decoder, err := NewStatefulDecoder(conn)
if err != nil {
- if err != io.EOF {
- s5l.Printf("pipe: failed to read from connection: %v\n", err)
- }
- return
- }
- marshaller, err := NewStatefulDecoder(buffer)
- if err != nil {
- s5l.Printf("pipe: failed initializing decoder with init message: %v\n", err)
+ s5l.Printf("pipe: failed to read init message: %v\n", err)
return
}
for {
- buffer, err := reader.ReadBytes('\n')
+ value, err := decoder.Decode()
if err != nil {
if err != io.EOF {
- s5l.Printf("pipe: failed to read from connection: %v\n", err)
+ s5l.Printf("pipe: failed to read data message: %v\n", err)
}
- return
- }
-
- value, err := marshaller.Decode(buffer)
- if err != nil {
- s5l.Printf("pipe: failed to decode message: %v\n", err)
- continue
+ // TODO: check for temporary error?
+ break
}
if err = srv.Append(value); err != nil {
@@ -75,8 +62,8 @@ func (srv Server) pipeHandle(conn net.Conn) {
}
func (srv Server) pipegramHandle(pconn net.PacketConn) {
- decoder := NewStatelessDecoder()
- buffer := make([]byte, 64*1024)
+ buffer := make([]byte, 64*1024) // TODO: hardcoded size
+
for {
n, _, err := pconn.ReadFrom(buffer)
if err != nil {
@@ -84,9 +71,10 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) {
continue
}
data := buffer[0:n]
- value, err := decoder.Decode(data)
+
+ value, err := NewStatelessDecoder(bytes.NewReader(data)).Decode()
if err != nil {
- s5l.Printf("pipegram: failed to decode message: %v\n", err)
+ s5l.Printf("pipegram: failed to decode data message: %v\n", err)
continue
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 80af8c5..3b74041 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -35,7 +35,6 @@ package sfive
import (
"encoding/json"
"fmt"
- "io/ioutil"
"net/http"
"strconv"
@@ -141,31 +140,23 @@ func (srv Server) webGetUpdate(c web.C, w http.ResponseWriter, r *http.Request)
fmt.Fprintf(w, "%s", jsonString)
}
+// TODO: add different API endpoint called bulk
+// container := DataUpdateFullContainer{}
+// err = json.Unmarshal(buffer, &container)
+// if err == nil {
+// if err = srv.AppendMany(container.Data); err != nil {
+// http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError)
+// } else {
+// fmt.Fprintf(w, "%d update(s) successfully stored.", len(container.Data))
+// }
+// return
+// }
+
func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "update"
- decoder := NewStatelessDecoder()
-
- buffer, err := ioutil.ReadAll(r.Body)
- if err != nil {
- s5l.Printf("web: failed to read post value: %v\n", err)
- http.Error(w, fmt.Sprintf("failed reading : %s: %v", resourceName, err), http.StatusBadRequest)
- return
- }
-
- // TODO: add different API endpoint called bulk
- container := DataUpdateFullContainer{}
- err = json.Unmarshal(buffer, &container)
- if err == nil {
- if err = srv.AppendMany(container.Data); err != nil {
- http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError)
- } else {
- fmt.Fprintf(w, "%d update(s) successfully stored.", len(container.Data))
- }
- return
- }
+ decoder := NewStatelessDecoder(r.Body)
- // else try single value
- data, err := decoder.Decode(buffer)
+ data, err := decoder.Decode()
if err != nil {
s5l.Printf("web: failed to decode: %v\n", err)
http.Error(w, fmt.Sprintf("failed decoding %s: %v", resourceName, err), http.StatusBadRequest)
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
index 5fcae5e..07adc47 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -309,7 +309,7 @@ func TestAppendAndFetch(t *testing.T) {
upd := updateData
upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC)
upd.Data.Clients = clientsData
- in := DataUpdateFull{0, "", -1, "", -1, sourceData, upd}
+ in := DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
if err = store.Append(in); err != nil {
t.Fatalf("failed to append update: %v", err)
@@ -334,10 +334,10 @@ func TestAppendAndFetch(t *testing.T) {
// append many
var ins []DataUpdateFull
upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond)
- ins = append(ins, DataUpdateFull{0, "", -1, "", -1, sourceData, upd})
+ ins = append(ins, DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd})
upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond)
upd.Data.Clients = nil
- ins = append(ins, DataUpdateFull{0, "", -1, "", -1, sourceData, upd})
+ ins = append(ins, DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd})
if err = store.AppendMany(ins); err != nil {
t.Fatalf("failed to append update: %v", err)
}
@@ -377,7 +377,7 @@ func TestReadOnly(t *testing.T) {
upd := updateData
upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC)
upd.Data.Clients = clientsData
- in := DataUpdateFull{0, "", -1, "", -1, sourceData, upd}
+ in := DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
if err = store.Append(in); err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -416,7 +416,7 @@ func TestGetUpdatesAfter(t *testing.T) {
expected := []DataUpdateFull{}
for i := 0; i < 3; i = i + 1 {
- in := DataUpdateFull{0, "", -1, "", -1, sourceData, upd}
+ in := DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
if err = store.Append(in); err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -534,7 +534,7 @@ func TestForwardedDataUpdates(t *testing.T) {
expected := []DataUpdateFull{}
for i := 0; i < 3; i = i + 1 {
- in := DataUpdateFull{0, "", -1, "", -1, sourceData, upd}
+ in := DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
in.SourceHubUuid = forwardedHub
in.SourceHubDataUpdateId = 3 - i // out of order
if err = store.Append(in); err != nil {
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
index 12e92c8..d028ffb 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
@@ -65,12 +65,16 @@ type DataUpdate struct {
Data SourceData `json:"data"`
}
-type DataUpdateFull struct {
+type Header struct {
Version uint `json:"version"`
SourceHubUuid string `json:"SourceHubUuid,omitempty"`
SourceHubDataUpdateId int `json:"SourceHubDataUpdateId,omitempty"`
ForwardHubUuid string `json:"ForwardHubUuid,omitempty"`
ForwardHubDataUpdateId int `json:"ForwardHubDataUpdateId,omitempty"`
+}
+
+type DataUpdateFull struct {
+ Header
SourceId
DataUpdate
}