From 4b2ce030720ea4bc4d532fbe5f6cd4e9de64899c Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Fri, 5 May 2017 23:30:52 +0200 Subject: remove now uneeded retry for append --- src/hub/src/spreadspace.org/sfive/s5srv.go | 66 +++++++++++------------ src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 13 +++-- src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 19 ++++--- src/hub/src/spreadspace.org/sfive/s5store.go | 22 +++++--- src/hub/src/spreadspace.org/sfive/s5typesStore.go | 1 + src/hub/test-srv-ro | 13 +++++ 6 files changed, 79 insertions(+), 55 deletions(-) create mode 100755 src/hub/test-srv-ro (limited to 'src') diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index e49c30d..76805c3 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -32,13 +32,14 @@ package sfive -import ( - "time" -) +type appendToken struct { + data DataUpdateFull + response chan error +} type appendManyToken struct { data []DataUpdateFull - response chan bool + response chan error } type getUpdatesResult struct { @@ -73,8 +74,8 @@ type Server struct { store Store quit chan bool done chan bool - appendData chan DataUpdateFull - appendManyData chan appendManyToken + appendChan chan appendToken + appendManyChan chan appendManyToken getUpdatesAfterChan chan getUpdatesAfterToken getHubIdChan chan getHubIdToken getLastUpdateIdChan chan getLastUpdateIdToken @@ -86,27 +87,10 @@ func (srv Server) appendActor() { select { case <-srv.quit: return - case value := <-srv.appendData: - var err error - for tryNum := 0; tryNum < 5; tryNum++ { - err = srv.store.Append(value) - if err != nil { - time.Sleep(1 * time.Second) - } else { - break - } - } - if err != nil { - s5l.Printf("server: failed to store data: %v\n", err) - } - case token := <-srv.appendManyData: - err := srv.store.AppendMany(token.data) - if err != nil { - s5l.Printf("server: failed to store many data: %v\n", err) - token.response <- false - } else { - token.response <- true - } + case token := <-srv.appendChan: + token.response <- srv.store.Append(token.data) + case token := <-srv.appendManyChan: + token.response <- srv.store.AppendMany(token.data) case token := <-srv.getUpdatesAfterChan: values, err := srv.store.GetUpdatesAfter(token.id, token.limit) token.response <- getUpdatesResult{values, err} @@ -119,6 +103,20 @@ func (srv Server) appendActor() { } } +func (srv Server) Append(data DataUpdateFull) error { + token := appendToken{data: data, response: make(chan error, 1)} + defer close(token.response) + srv.appendChan <- token + return <-token.response +} + +func (srv Server) AppendMany(data []DataUpdateFull) error { + token := appendManyToken{data: data, response: make(chan error, 1)} + defer close(token.response) + srv.appendManyChan <- token + return <-token.response +} + func (srv Server) GetUpdatesAfter(id, limit int) ([]DataUpdateFull, error) { token := getUpdatesAfterToken{id: id, limit: limit, response: make(chan getUpdatesResult, 1)} defer close(token.response) @@ -149,8 +147,8 @@ func (srv Server) Close() { <-srv.done close(srv.quit) close(srv.done) - close(srv.appendData) - close(srv.appendManyData) + close(srv.appendChan) + close(srv.appendManyChan) close(srv.getUpdatesAfterChan) close(srv.getHubIdChan) close(srv.getLastUpdateIdChan) @@ -168,11 +166,11 @@ func NewServer(dbPath string, readOnly bool) (server *Server, err error) { server.quit = make(chan bool) server.done = make(chan bool) - server.appendData = make(chan DataUpdateFull, 5) - server.appendManyData = make(chan appendManyToken, 5) - server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 1) - server.getHubIdChan = make(chan getHubIdToken, 1) - server.getLastUpdateIdChan = make(chan getLastUpdateIdToken, 1) + server.appendChan = make(chan appendToken, 32) + server.appendManyChan = make(chan appendManyToken, 32) + server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 32) + server.getHubIdChan = make(chan getHubIdToken, 32) + server.getLastUpdateIdChan = make(chan getLastUpdateIdToken, 32) go server.appendActor() s5l.Printf("server: started\n") return diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 9aa5e49..4b48d99 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -68,7 +68,9 @@ func (srv Server) pipeHandle(conn net.Conn) { continue } - srv.appendData <- value + if err = srv.Append(value); err != nil { + s5l.Printf("pipe: failed to store data: %v\n", err) + } } } @@ -83,10 +85,13 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) { } data := buffer[0:n] value, err := decoder.Decode(data) - if err == nil { - srv.appendData <- value - } else { + if err != nil { s5l.Printf("pipegram: failed to decode message: %v\n", err) + continue + } + + if err = srv.Append(value); err != nil { + s5l.Printf("pipegram: failed to store data: %v\n", err) } } } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 657492f..80af8c5 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -156,14 +156,10 @@ func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request) container := DataUpdateFullContainer{} err = json.Unmarshal(buffer, &container) if err == nil { - token := appendManyToken{ - data: container.Data, - response: make(chan bool, 2)} - defer close(token.response) - srv.appendManyData <- token - success := <-token.response - if !success { - http.Error(w, "failed to store data", http.StatusInternalServerError) + if err = srv.AppendMany(container.Data); err != nil { + http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError) + } else { + fmt.Fprintf(w, "%d update(s) successfully stored.", len(container.Data)) } return } @@ -176,8 +172,11 @@ func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request) return } - srv.appendData <- data - // TODO send response channel, wait for OK + if err = srv.Append(data); err != nil { + http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError) + } else { + fmt.Fprintf(w, "1 update successfully stored.") + } } func (srv Server) webGetLastUpdateId(c web.C, w http.ResponseWriter, r *http.Request) { diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 9ef1355..64dd5a7 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -54,9 +54,10 @@ var ( ) type Store struct { - version int - hubUuid string - db *bolt.DB + version int + hubUuid string + db *bolt.DB + readOnly bool } // @@ -152,12 +153,16 @@ func NewStore(dbPath string, readOnly bool) (Store, error) { } if db != nil { - s5l.Printf("store: opened (UUID: %s)", hubid) - return Store{version, hubid, db}, nil + if readOnly { + s5l.Printf("store: opened read-only (UUID: %s)", hubid) + } else { + s5l.Printf("store: opened (UUID: %s)", hubid) + } + return Store{version, hubid, db, readOnly}, nil } if readOnly { - return Store{}, errors.New("store: failed to open, requested read-only mode but file does not exist.") + return Store{}, errors.New("store: failed to open, requested read-only mode but store file does not exist.") } db, version, hubid, err = createDb(dbPath) @@ -165,7 +170,7 @@ func NewStore(dbPath string, readOnly bool) (Store, error) { return Store{}, err } s5l.Printf("store: initialized (UUID: %s)", hubid) - return Store{version, hubid, db}, nil + return Store{version, hubid, db, readOnly}, nil } func (st Store) Close() { @@ -336,6 +341,9 @@ func (st Store) appendItem(tx *bolt.Tx, update DataUpdateFull) (duId int, err er // Public Append Interface func (st Store) AppendMany(updates []DataUpdateFull) (err error) { + if st.readOnly { + return ErrReadOnly + } return st.db.Update(func(tx *bolt.Tx) error { for _, update := range updates { if _, err := st.appendItem(tx, update); err != nil { diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go index 5f6dc6a..404505e 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go @@ -42,6 +42,7 @@ import ( var ( ErrNotFound = errors.New("not found") + ErrReadOnly = errors.New("store is in read-only mode") ) const ( diff --git a/src/hub/test-srv-ro b/src/hub/test-srv-ro new file mode 100755 index 0000000..56fe440 --- /dev/null +++ b/src/hub/test-srv-ro @@ -0,0 +1,13 @@ +#!/bin/sh + +if [ -z "$1" ]; then + echo "Usage: $0 " + exit 1 +fi + +TEST_D="./test" +TEST_DB="$TEST_D/$1.bolt" + +mkdir -p "$TEST_D" +rm -f "$TEST_D/pipe" "$TEST_D/pipegram" +exec ./bin/sfive-hub -db "$TEST_DB" -read-only -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -bind=":8000" -- cgit v1.2.3