package sfive import ( "bufio" "net" ) func (self StatsSinkServer) handleConnection(conn net.Conn) { reader := bufio.NewReader(conn) buffer, err := reader.ReadBytes('\n') if err != nil { s5l.Printf("pipe: failed to read from connection: %v\n", err) return } marshaller, err := NewStatefulDecoder(buffer) if err != nil { s5l.Printf("pipe: failed initializing decoder with init message: %v\n", err) return } for { buffer, err := reader.ReadBytes('\n') if err != nil { s5l.Printf("pipe: failed to read from connection: %v\n", err) return } // s5l.Printf("msg: %v", string(buffer)) value, err := marshaller.Decode(buffer) if err != nil { s5l.Printf("pipe: failed to decode message: %v\n", err) continue } self.appendData <- value } } func (self StatsSinkServer) handlePacketConn(pconn net.PacketConn) { decoder := NewPlainDecoder() buffer := make([]byte, 64*1024) for { n, _, err := pconn.ReadFrom(buffer) if err != nil { s5l.Printf("p-pipe: failed read: %v", err) continue } data := buffer[0:n] value, err := decoder.Decode(data) if err == nil { self.appendData <- value } else { s5l.Printf("p-pipe: failed to decode message: %v\n", err) } } } func (self StatsSinkServer) ServePipe(pipePath string) { ln, err := net.Listen("unix", pipePath) if err != nil { s5l.Printf("pipe: failed to connect: %v", err) return } defer ln.Close() for { conn, err := ln.Accept() if err != nil { s5l.Printf("pipe: failed accept: %v", err) // ignore continue } go self.handleConnection(conn) } } func (self StatsSinkServer) ServeGramPipe(pipePath string) { pconn, err := net.ListenPacket("unixgram", pipePath) if err != nil { s5l.Printf("p-pipe: failed to listen: %v", err) return } defer pconn.Close() self.handlePacketConn(pconn) }