summaryrefslogtreecommitdiff
path: root/src/hub/src/spreadspace.org/sfive/s5srv.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srv.go')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go59
1 files changed, 36 insertions, 23 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 220a7ae..b91d5c5 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -33,16 +33,14 @@
package sfive
import (
+ "context"
"errors"
+ "net/http"
"runtime"
"sync"
"time"
)
-var (
- ErrShutdownInProgress = errors.New("shutdown in progress")
-)
-
type ingestToken struct {
updates []*UpdateFull
response chan error
@@ -53,12 +51,14 @@ type Server struct {
numWorker int
anonymization AnonymizationAlgo
geoip GeoIPLookup
- quit chan bool
wgWorker *sync.WaitGroup
ingestChan chan ingestToken
+ interfaces struct {
+ web *http.Server
+ }
}
-func (srv Server) transform(update *UpdateFull) *UpdateFull {
+func (srv *Server) transform(update *UpdateFull) *UpdateFull {
bytesSentTotal := uint(0)
clients := []Client{}
for _, client := range update.Data.Clients {
@@ -100,13 +100,13 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull {
return update
}
-func (srv Server) transformMany(updates []*UpdateFull) {
+func (srv *Server) transformMany(updates []*UpdateFull) {
for _, update := range updates {
srv.transform(update)
}
}
-func (srv Server) ingestWorker(idx int) {
+func (srv *Server) ingestWorker(idx int) {
for {
select {
case token, ok := <-srv.ingestChan:
@@ -119,31 +119,44 @@ func (srv Server) ingestWorker(idx int) {
}
}
-func (srv Server) Ingest(update *UpdateFull) error {
+func (srv *Server) Ingest(update *UpdateFull) error {
return srv.IngestMany([]*UpdateFull{update})
}
-func (srv Server) IngestMany(updates []*UpdateFull) error {
- if len(srv.quit) > 0 { // check if there is at least one element on the channel without consuming it
- return ErrShutdownInProgress
- }
-
+func (srv *Server) IngestMany(updates []*UpdateFull) error {
token := ingestToken{updates: updates, response: make(chan error, 1)}
defer close(token.response)
srv.ingestChan <- token
return <-token.response
}
-func (srv Server) Close() {
- s5l.Printf("srv: shutting down")
+func (srv *Server) shutdownInterfaces() (errors int) {
+ ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
+ c := make(chan error)
+ go func() { c <- srv.webStop(ctx) }()
+ //go func() { c <- srv.pipeStope(ctx) }() // TODO: add this as soon as pipe interface can be stopped
+ //go func() { c <- srv.pipegramStop(ctx) }() // TODO: add this as soon as pipegram interface can be stopped
+
+ errors = 0
+ for i := 0; i < 1; i++ { // TODO: set limit to 3 when the above has been enabled
+ if err := <-c; err != nil {
+ s5l.Printf("srv: interface shutdown failed failed: %v", err)
+ errors++
+ }
+ }
+ close(c) // closing channel here in the hopes that this leads to a panic
+ // in case the number of channel reads (for loop above) is) doesn't match the
+ // number of interfaces
+ cancel()
+ return
+}
- // TODO: enable this as soon as interfaces can be told to stop accepting data
- // and yield wgInterfaces once the last client connection is done
- // close(srv.quit) // tell interfaces to don't accept new data
- // srv.wgInterfaces.Wait() // wait for interfaces to finish up
+func (srv *Server) Close() {
+ s5l.Printf("srv: shutting down")
- srv.quit <- true // this will checked by Ingest() and IngestMany without consuming it
- time.Sleep(time.Second) // this is quite ugly but must be good enough for now
+ if errors := srv.shutdownInterfaces(); errors != 0 {
+ s5l.Printf("srv: shutdown of at least one interface failed, this is an unclean shutdown!!!")
+ }
close(srv.ingestChan) // close ingest channel to tell worker to stop
srv.wgWorker.Wait() // wait for worker to finish up
@@ -178,7 +191,7 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) {
if cfg.Workers > 0 {
srv.numWorker = cfg.Workers
}
- srv.quit = make(chan bool, 1) // this will never be consumed (until interface cleanup actually works)
+
srv.wgWorker = &sync.WaitGroup{}
srv.ingestChan = make(chan ingestToken, srv.numWorker)
for i := 0; i < srv.numWorker; i = i + 1 {