summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-05-05 23:30:52 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-05-05 23:30:52 +0200
commit4b2ce030720ea4bc4d532fbe5f6cd4e9de64899c (patch)
tree146bdad81a4a92ce0e58c66d110c56bea9fe7ab8
parentswitch back to binary indeces (diff)
remove now uneeded retry for append
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go66
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go13
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go19
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go22
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesStore.go1
-rwxr-xr-xsrc/hub/test-srv-ro13
6 files changed, 79 insertions, 55 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index e49c30d..76805c3 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -32,13 +32,14 @@
package sfive
-import (
- "time"
-)
+type appendToken struct {
+ data DataUpdateFull
+ response chan error
+}
type appendManyToken struct {
data []DataUpdateFull
- response chan bool
+ response chan error
}
type getUpdatesResult struct {
@@ -73,8 +74,8 @@ type Server struct {
store Store
quit chan bool
done chan bool
- appendData chan DataUpdateFull
- appendManyData chan appendManyToken
+ appendChan chan appendToken
+ appendManyChan chan appendManyToken
getUpdatesAfterChan chan getUpdatesAfterToken
getHubIdChan chan getHubIdToken
getLastUpdateIdChan chan getLastUpdateIdToken
@@ -86,27 +87,10 @@ func (srv Server) appendActor() {
select {
case <-srv.quit:
return
- case value := <-srv.appendData:
- var err error
- for tryNum := 0; tryNum < 5; tryNum++ {
- err = srv.store.Append(value)
- if err != nil {
- time.Sleep(1 * time.Second)
- } else {
- break
- }
- }
- if err != nil {
- s5l.Printf("server: failed to store data: %v\n", err)
- }
- case token := <-srv.appendManyData:
- err := srv.store.AppendMany(token.data)
- if err != nil {
- s5l.Printf("server: failed to store many data: %v\n", err)
- token.response <- false
- } else {
- token.response <- true
- }
+ case token := <-srv.appendChan:
+ token.response <- srv.store.Append(token.data)
+ case token := <-srv.appendManyChan:
+ token.response <- srv.store.AppendMany(token.data)
case token := <-srv.getUpdatesAfterChan:
values, err := srv.store.GetUpdatesAfter(token.id, token.limit)
token.response <- getUpdatesResult{values, err}
@@ -119,6 +103,20 @@ func (srv Server) appendActor() {
}
}
+func (srv Server) Append(data DataUpdateFull) error {
+ token := appendToken{data: data, response: make(chan error, 1)}
+ defer close(token.response)
+ srv.appendChan <- token
+ return <-token.response
+}
+
+func (srv Server) AppendMany(data []DataUpdateFull) error {
+ token := appendManyToken{data: data, response: make(chan error, 1)}
+ defer close(token.response)
+ srv.appendManyChan <- token
+ return <-token.response
+}
+
func (srv Server) GetUpdatesAfter(id, limit int) ([]DataUpdateFull, error) {
token := getUpdatesAfterToken{id: id, limit: limit, response: make(chan getUpdatesResult, 1)}
defer close(token.response)
@@ -149,8 +147,8 @@ func (srv Server) Close() {
<-srv.done
close(srv.quit)
close(srv.done)
- close(srv.appendData)
- close(srv.appendManyData)
+ close(srv.appendChan)
+ close(srv.appendManyChan)
close(srv.getUpdatesAfterChan)
close(srv.getHubIdChan)
close(srv.getLastUpdateIdChan)
@@ -168,11 +166,11 @@ func NewServer(dbPath string, readOnly bool) (server *Server, err error) {
server.quit = make(chan bool)
server.done = make(chan bool)
- server.appendData = make(chan DataUpdateFull, 5)
- server.appendManyData = make(chan appendManyToken, 5)
- server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 1)
- server.getHubIdChan = make(chan getHubIdToken, 1)
- server.getLastUpdateIdChan = make(chan getLastUpdateIdToken, 1)
+ server.appendChan = make(chan appendToken, 32)
+ server.appendManyChan = make(chan appendManyToken, 32)
+ server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 32)
+ server.getHubIdChan = make(chan getHubIdToken, 32)
+ server.getLastUpdateIdChan = make(chan getLastUpdateIdToken, 32)
go server.appendActor()
s5l.Printf("server: started\n")
return
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 9aa5e49..4b48d99 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -68,7 +68,9 @@ func (srv Server) pipeHandle(conn net.Conn) {
continue
}
- srv.appendData <- value
+ if err = srv.Append(value); err != nil {
+ s5l.Printf("pipe: failed to store data: %v\n", err)
+ }
}
}
@@ -83,10 +85,13 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) {
}
data := buffer[0:n]
value, err := decoder.Decode(data)
- if err == nil {
- srv.appendData <- value
- } else {
+ if err != nil {
s5l.Printf("pipegram: failed to decode message: %v\n", err)
+ continue
+ }
+
+ if err = srv.Append(value); err != nil {
+ s5l.Printf("pipegram: failed to store data: %v\n", err)
}
}
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 657492f..80af8c5 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -156,14 +156,10 @@ func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request)
container := DataUpdateFullContainer{}
err = json.Unmarshal(buffer, &container)
if err == nil {
- token := appendManyToken{
- data: container.Data,
- response: make(chan bool, 2)}
- defer close(token.response)
- srv.appendManyData <- token
- success := <-token.response
- if !success {
- http.Error(w, "failed to store data", http.StatusInternalServerError)
+ if err = srv.AppendMany(container.Data); err != nil {
+ http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError)
+ } else {
+ fmt.Fprintf(w, "%d update(s) successfully stored.", len(container.Data))
}
return
}
@@ -176,8 +172,11 @@ func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request)
return
}
- srv.appendData <- data
- // TODO send response channel, wait for OK
+ if err = srv.Append(data); err != nil {
+ http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError)
+ } else {
+ fmt.Fprintf(w, "1 update successfully stored.")
+ }
}
func (srv Server) webGetLastUpdateId(c web.C, w http.ResponseWriter, r *http.Request) {
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 9ef1355..64dd5a7 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -54,9 +54,10 @@ var (
)
type Store struct {
- version int
- hubUuid string
- db *bolt.DB
+ version int
+ hubUuid string
+ db *bolt.DB
+ readOnly bool
}
//
@@ -152,12 +153,16 @@ func NewStore(dbPath string, readOnly bool) (Store, error) {
}
if db != nil {
- s5l.Printf("store: opened (UUID: %s)", hubid)
- return Store{version, hubid, db}, nil
+ if readOnly {
+ s5l.Printf("store: opened read-only (UUID: %s)", hubid)
+ } else {
+ s5l.Printf("store: opened (UUID: %s)", hubid)
+ }
+ return Store{version, hubid, db, readOnly}, nil
}
if readOnly {
- return Store{}, errors.New("store: failed to open, requested read-only mode but file does not exist.")
+ return Store{}, errors.New("store: failed to open, requested read-only mode but store file does not exist.")
}
db, version, hubid, err = createDb(dbPath)
@@ -165,7 +170,7 @@ func NewStore(dbPath string, readOnly bool) (Store, error) {
return Store{}, err
}
s5l.Printf("store: initialized (UUID: %s)", hubid)
- return Store{version, hubid, db}, nil
+ return Store{version, hubid, db, readOnly}, nil
}
func (st Store) Close() {
@@ -336,6 +341,9 @@ func (st Store) appendItem(tx *bolt.Tx, update DataUpdateFull) (duId int, err er
// Public Append Interface
func (st Store) AppendMany(updates []DataUpdateFull) (err error) {
+ if st.readOnly {
+ return ErrReadOnly
+ }
return st.db.Update(func(tx *bolt.Tx) error {
for _, update := range updates {
if _, err := st.appendItem(tx, update); err != nil {
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
index 5f6dc6a..404505e 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
@@ -42,6 +42,7 @@ import (
var (
ErrNotFound = errors.New("not found")
+ ErrReadOnly = errors.New("store is in read-only mode")
)
const (
diff --git a/src/hub/test-srv-ro b/src/hub/test-srv-ro
new file mode 100755
index 0000000..56fe440
--- /dev/null
+++ b/src/hub/test-srv-ro
@@ -0,0 +1,13 @@
+#!/bin/sh
+
+if [ -z "$1" ]; then
+ echo "Usage: $0 <db-name>"
+ exit 1
+fi
+
+TEST_D="./test"
+TEST_DB="$TEST_D/$1.bolt"
+
+mkdir -p "$TEST_D"
+rm -f "$TEST_D/pipe" "$TEST_D/pipegram"
+exec ./bin/sfive-hub -db "$TEST_DB" -read-only -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -bind=":8000"