From 6de098088870144c54118a4eaa04d8c77dc480b0 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sat, 6 May 2017 04:06:18 +0200 Subject: use json Encoder/Decoder and not marshal/unmarshal --- src/hub/src/spreadspace.org/sfive/s5cvt.go | 41 +++++++++------- src/hub/src/spreadspace.org/sfive/s5cvt_test.go | 59 ++++++++++++++--------- src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 36 +++++--------- src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 37 ++++++-------- src/hub/src/spreadspace.org/sfive/s5store_test.go | 12 ++--- src/hub/src/spreadspace.org/sfive/s5typesApi.go | 6 ++- 6 files changed, 98 insertions(+), 93 deletions(-) diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go index 26a7c83..2021216 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go @@ -35,6 +35,7 @@ package sfive import ( "encoding/json" "fmt" + "io" ) const ( @@ -46,18 +47,20 @@ const ( // type Decoder interface { - Decode(jsonString []byte) (DataUpdateFull, error) + Decode() (DataUpdateFull, error) } // stateless protocol interfaces -type StatelessDecoder struct{} +type StatelessDecoder struct { + dec *json.Decoder +} -func NewStatelessDecoder() Decoder { - return &StatelessDecoder{} +func NewStatelessDecoder(r io.Reader) Decoder { + return &StatelessDecoder{json.NewDecoder(r)} } -func (pd *StatelessDecoder) Decode(jsonString []byte) (res DataUpdateFull, err error) { - if err = json.Unmarshal(jsonString, &res); err != nil { +func (pd *StatelessDecoder) Decode() (res DataUpdateFull, err error) { + if err = pd.dec.Decode(&res); err != nil { return } if res.Version != ProtocolVersion { @@ -68,13 +71,15 @@ func (pd *StatelessDecoder) Decode(jsonString []byte) (res DataUpdateFull, err e // stateful protocol interfaces type StatefulDecoder struct { - Version uint + dec *json.Decoder + Header SourceId } -func NewStatefulDecoder(jsonString []byte) (Decoder, error) { +func NewStatefulDecoder(r io.Reader) (Decoder, error) { res := &StatefulDecoder{} - if err := json.Unmarshal(jsonString, &res); err != nil { + res.dec = json.NewDecoder(r) + if err := res.dec.Decode(&res); err != nil { return nil, err } if res.Version != ProtocolVersion { @@ -83,10 +88,10 @@ func NewStatefulDecoder(jsonString []byte) (Decoder, error) { return res, nil } -func (sd *StatefulDecoder) Decode(jsonString []byte) (res DataUpdateFull, err error) { +func (sd *StatefulDecoder) Decode() (res DataUpdateFull, err error) { res.Version = sd.Version res.CopyFromSourceId(&sd.SourceId) - if err = json.Unmarshal(jsonString, &res); err != nil { + if err = sd.dec.Decode(&res); err != nil { return } if res.Version != sd.Version { @@ -100,17 +105,19 @@ func (sd *StatefulDecoder) Decode(jsonString []byte) (res DataUpdateFull, err er // type Encoder interface { - Encode(data DataUpdateFull) ([]byte, error) + Encode(data DataUpdateFull) error } -type StatelessEncoder struct{} +type StatelessEncoder struct { + enc *json.Encoder +} -func NewStatelessEncoder() Encoder { - return &StatelessEncoder{} +func NewStatelessEncoder(w io.Writer) Encoder { + return &StatelessEncoder{json.NewEncoder(w)} } -func (pe *StatelessEncoder) Encode(data DataUpdateFull) (res []byte, err error) { +func (pe *StatelessEncoder) Encode(data DataUpdateFull) error { data.Version = ProtocolVersion data.StartTime = data.StartTime.UTC() - return json.Marshal(data) + return pe.enc.Encode(data) } diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go index 5a79d85..d1a3a4d 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go @@ -33,8 +33,10 @@ package sfive import ( + "bytes" "fmt" "reflect" + "strings" "testing" "time" ) @@ -55,37 +57,52 @@ func GetExpected() (expected DataUpdateFull) { func TestDecodeStateful(t *testing.T) { // invalid init message - if _, err := NewStatefulDecoder([]byte("this is not json")); err == nil { + if _, err := NewStatefulDecoder(strings.NewReader("this is not json")); err == nil { t.Fatalf("creating decoder with invalid json should throw an error") } // wrong protocol version wrongProtoVerMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, initDataEncoded) - if _, err := NewStatefulDecoder([]byte(wrongProtoVerMsg)); err == nil { + if _, err := NewStatefulDecoder(strings.NewReader(wrongProtoVerMsg)); err == nil { t.Fatalf("creating decoder with wrong protocol version should throw an error") } // valid init message statefulInitMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, initDataEncoded) - dec, err := NewStatefulDecoder([]byte(statefulInitMsg)) - if err != nil { + if _, err := NewStatefulDecoder(strings.NewReader(statefulInitMsg)); err != nil { t.Fatalf("creating decoder failed: %v", err) } // invalid data-update message - if _, err := dec.Decode([]byte("this isn't valid json either")); err == nil { + data := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, initDataEncoded) + data = data + "\nthis isn't valid json either" + dec, err := NewStatefulDecoder(strings.NewReader(data)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if _, err := dec.Decode(); err == nil { t.Fatalf("decoding message with invalid json should throw an error") } // wrong protocol version - wrongProtoVerMsg = fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, updateDataEncoded) - if _, err := dec.Decode([]byte(wrongProtoVerMsg)); err == nil { - t.Fatalf("decoding message with wrong protocol version should throw an error") + data = fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, initDataEncoded) + data = data + fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, updateDataEncoded) + dec, err = NewStatefulDecoder(strings.NewReader(data)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if _, err := dec.Decode(); err == nil { + t.Fatalf("decoding message with invalid json should throw an error") } // valid data-update message - statefulUpdateMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, updateDataEncoded) - decoded, err := dec.Decode([]byte(statefulUpdateMsg)) + data = fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, initDataEncoded) + data = data + fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, updateDataEncoded) + dec, err = NewStatefulDecoder(strings.NewReader(data)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + decoded, err := dec.Decode() if err != nil { t.Fatalf("decoding message failed: %v", err) } @@ -98,22 +115,21 @@ func TestDecodeStateful(t *testing.T) { } func TestDecodeStateless(t *testing.T) { - dec := NewStatelessDecoder() - // invalid message - if _, err := dec.Decode([]byte("this is still not json")); err == nil { + dec := NewStatelessDecoder(strings.NewReader("this is still not json")) + if _, err := dec.Decode(); err == nil { t.Fatalf("decoding invalid json should throw an error") } // wrong protocol version - wrongProtoVerMsg := fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion+1, initDataEncoded, updateDataEncoded) - if _, err := dec.Decode([]byte(wrongProtoVerMsg)); err == nil { + dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion+1, initDataEncoded, updateDataEncoded))) + if _, err := dec.Decode(); err == nil { t.Fatalf("decoding message with wrong protocol version should throw an error") } // valid message - statelessDataMsg := fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initDataEncoded, updateDataEncoded) - decoded, err := dec.Decode([]byte(statelessDataMsg)) + dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initDataEncoded, updateDataEncoded))) + decoded, err := dec.Decode() if err != nil { t.Fatalf("decoding message failed: %v", err) } @@ -126,18 +142,17 @@ func TestDecodeStateless(t *testing.T) { } func TestEncodeStateless(t *testing.T) { - enc := NewStatelessEncoder() - var td DataUpdateFull td.CopyFromSourceId(&initDataStruct) td.CopyFromUpdate(&updateDataStruct) - encoded, err := enc.Encode(td) - if err != nil { + encoded := &bytes.Buffer{} + enc := NewStatelessEncoder(encoded) + if err := enc.Encode(td); err != nil { t.Fatalf("encoding message failed: %v", err) } - decoded, err := NewStatelessDecoder().Decode(encoded) + decoded, err := NewStatelessDecoder(encoded).Decode() if err != nil { t.Fatalf("decoding message failed: %v", err) } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 4b48d99..52e098b 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -33,39 +33,26 @@ package sfive import ( - "bufio" + "bytes" "io" "net" ) func (srv Server) pipeHandle(conn net.Conn) { - reader := bufio.NewReader(conn) - buffer, err := reader.ReadBytes('\n') + decoder, err := NewStatefulDecoder(conn) if err != nil { - if err != io.EOF { - s5l.Printf("pipe: failed to read from connection: %v\n", err) - } - return - } - marshaller, err := NewStatefulDecoder(buffer) - if err != nil { - s5l.Printf("pipe: failed initializing decoder with init message: %v\n", err) + s5l.Printf("pipe: failed to read init message: %v\n", err) return } for { - buffer, err := reader.ReadBytes('\n') + value, err := decoder.Decode() if err != nil { if err != io.EOF { - s5l.Printf("pipe: failed to read from connection: %v\n", err) + s5l.Printf("pipe: failed to read data message: %v\n", err) } - return - } - - value, err := marshaller.Decode(buffer) - if err != nil { - s5l.Printf("pipe: failed to decode message: %v\n", err) - continue + // TODO: check for temporary error? + break } if err = srv.Append(value); err != nil { @@ -75,8 +62,8 @@ func (srv Server) pipeHandle(conn net.Conn) { } func (srv Server) pipegramHandle(pconn net.PacketConn) { - decoder := NewStatelessDecoder() - buffer := make([]byte, 64*1024) + buffer := make([]byte, 64*1024) // TODO: hardcoded size + for { n, _, err := pconn.ReadFrom(buffer) if err != nil { @@ -84,9 +71,10 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) { continue } data := buffer[0:n] - value, err := decoder.Decode(data) + + value, err := NewStatelessDecoder(bytes.NewReader(data)).Decode() if err != nil { - s5l.Printf("pipegram: failed to decode message: %v\n", err) + s5l.Printf("pipegram: failed to decode data message: %v\n", err) continue } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 80af8c5..3b74041 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -35,7 +35,6 @@ package sfive import ( "encoding/json" "fmt" - "io/ioutil" "net/http" "strconv" @@ -141,31 +140,23 @@ func (srv Server) webGetUpdate(c web.C, w http.ResponseWriter, r *http.Request) fmt.Fprintf(w, "%s", jsonString) } +// TODO: add different API endpoint called bulk +// container := DataUpdateFullContainer{} +// err = json.Unmarshal(buffer, &container) +// if err == nil { +// if err = srv.AppendMany(container.Data); err != nil { +// http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError) +// } else { +// fmt.Fprintf(w, "%d update(s) successfully stored.", len(container.Data)) +// } +// return +// } + func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "update" - decoder := NewStatelessDecoder() - - buffer, err := ioutil.ReadAll(r.Body) - if err != nil { - s5l.Printf("web: failed to read post value: %v\n", err) - http.Error(w, fmt.Sprintf("failed reading : %s: %v", resourceName, err), http.StatusBadRequest) - return - } - - // TODO: add different API endpoint called bulk - container := DataUpdateFullContainer{} - err = json.Unmarshal(buffer, &container) - if err == nil { - if err = srv.AppendMany(container.Data); err != nil { - http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError) - } else { - fmt.Fprintf(w, "%d update(s) successfully stored.", len(container.Data)) - } - return - } + decoder := NewStatelessDecoder(r.Body) - // else try single value - data, err := decoder.Decode(buffer) + data, err := decoder.Decode() if err != nil { s5l.Printf("web: failed to decode: %v\n", err) http.Error(w, fmt.Sprintf("failed decoding %s: %v", resourceName, err), http.StatusBadRequest) diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index 5fcae5e..07adc47 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -309,7 +309,7 @@ func TestAppendAndFetch(t *testing.T) { upd := updateData upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC) upd.Data.Clients = clientsData - in := DataUpdateFull{0, "", -1, "", -1, sourceData, upd} + in := DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} if err = store.Append(in); err != nil { t.Fatalf("failed to append update: %v", err) @@ -334,10 +334,10 @@ func TestAppendAndFetch(t *testing.T) { // append many var ins []DataUpdateFull upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond) - ins = append(ins, DataUpdateFull{0, "", -1, "", -1, sourceData, upd}) + ins = append(ins, DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}) upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond) upd.Data.Clients = nil - ins = append(ins, DataUpdateFull{0, "", -1, "", -1, sourceData, upd}) + ins = append(ins, DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}) if err = store.AppendMany(ins); err != nil { t.Fatalf("failed to append update: %v", err) } @@ -377,7 +377,7 @@ func TestReadOnly(t *testing.T) { upd := updateData upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC) upd.Data.Clients = clientsData - in := DataUpdateFull{0, "", -1, "", -1, sourceData, upd} + in := DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} if err = store.Append(in); err != nil { t.Fatalf("unexpected error: %v", err) } @@ -416,7 +416,7 @@ func TestGetUpdatesAfter(t *testing.T) { expected := []DataUpdateFull{} for i := 0; i < 3; i = i + 1 { - in := DataUpdateFull{0, "", -1, "", -1, sourceData, upd} + in := DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} if err = store.Append(in); err != nil { t.Fatalf("unexpected error: %v", err) } @@ -534,7 +534,7 @@ func TestForwardedDataUpdates(t *testing.T) { expected := []DataUpdateFull{} for i := 0; i < 3; i = i + 1 { - in := DataUpdateFull{0, "", -1, "", -1, sourceData, upd} + in := DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} in.SourceHubUuid = forwardedHub in.SourceHubDataUpdateId = 3 - i // out of order if err = store.Append(in); err != nil { diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go index 12e92c8..d028ffb 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go @@ -65,12 +65,16 @@ type DataUpdate struct { Data SourceData `json:"data"` } -type DataUpdateFull struct { +type Header struct { Version uint `json:"version"` SourceHubUuid string `json:"SourceHubUuid,omitempty"` SourceHubDataUpdateId int `json:"SourceHubDataUpdateId,omitempty"` ForwardHubUuid string `json:"ForwardHubUuid,omitempty"` ForwardHubDataUpdateId int `json:"ForwardHubDataUpdateId,omitempty"` +} + +type DataUpdateFull struct { + Header SourceId DataUpdate } -- cgit v1.2.3