M cmd/kgpdemux/main.go +6 -2
@@ 60,6 60,9 @@ const usage = `usage: kgp_client upstrea
func main() {
var metafn = flag.String("meta", "", "metadata filename, stdin by default")
+ var keepalive = flag.Duration(
+ "keepalive", 0, "default 0 (off), time between keepalive",
+ )
flag.Parse()
@@ 110,8 113,8 @@ func main() {
var sigchan = make(chan os.Signal, 1)
signal.Notify(sigchan, os.Interrupt)
<-sigchan // wait for signal to come in
- signal.Reset(os.Interrupt) // Let go handle further signals
- logger.Print("stopping traffic")
+ signal.Reset(os.Interrupt) // Let go handle next signals
+ logger.Print("stopping")
cancel()
}()
@@ 123,6 126,7 @@ func main() {
)
},
)
+ demux.Keepalive = *keepalive
demux.Logger = logger
fmt.Println(demux.Run(ctx, lastSeenSeq+1))
}
M kgp.go +61 -8
@@ 32,14 32,6 @@ type Demux struct {
// Time between keepalive packets, 0 == no keepalive
Keepalive time.Duration
- // Where to write keepalive timings format:
- //
- // keepalive 1234ms
- //
- // or
- //
- // keepalive timeout
- KeepaliveWriter io.Writer
Logger *log.Logger
@@ 281,6 273,67 @@ func keepaliveReply(
}
}
+func (demux *Demux) keepalive(
+ done func() <-chan struct{},
+ output chan<- *Packet,
+) chan<- *Packet {
+ var input = make(chan *Packet)
+ // if keepalive == 0 we should not get any keepalive ack back, discard them if needed
+ if demux.Keepalive == 0 {
+ go func() {
+ for {
+ var p = <-input
+ demux.printf(
+ "keepalive ack %d %d while keepalive disabled",
+ p.header.Sequence, p.header.Ack,
+ )
+ }
+ }()
+ return input
+ }
+
+ var ticker = time.NewTicker(demux.Keepalive)
+ var seq uint32 = 1
+
+ go func() {
+ for {
+ select {
+ case <-done():
+ return
+ case <-ticker.C:
+ {
+ // send keep-alive packet
+ // and wait for response
+ var k = NewPacket()
+ k.header.Sequence = seq
+ // send keepalive packet
+ output <- k
+ }
+
+ // now wait for the reply
+ var p = <-input
+ if p.header.Sequence != seq {
+ demux.printf(
+ "invalid keepalive ack %d %d",
+ p.header.Sequence, p.header.Ack,
+ )
+ }
+ seq += 1
+ if seq == 0 {
+ seq = 1
+ }
+ case p := <-input:
+ demux.printf(
+ "keepalive ack while not waiting for one %d %d",
+ p.header.Sequence, p.header.Ack,
+ )
+ }
+ }
+ }()
+
+ return input
+}
+
func (demux *Demux) Run(ctx context.Context, seq uint32) error {
// Keeps track of the last sequence number from the server
var ack atomic.Value