2 files changed, 67 insertions(+), 10 deletions(-)

M cmd/kgpdemux/main.go
M kgp.go
M cmd/kgpdemux/main.go +6 -2
@@ 60,6 60,9 @@ const usage = `usage: kgp_client upstrea
 
 func main() {
 	var metafn = flag.String("meta", "", "metadata filename, stdin by default")
+	var keepalive = flag.Duration(
+		"keepalive", 0, "default 0 (off), time between keepalive",
+	)
 
 	flag.Parse()
 

          
@@ 110,8 113,8 @@ func main() {
 		var sigchan = make(chan os.Signal, 1)
 		signal.Notify(sigchan, os.Interrupt)
 		<-sigchan                  // wait for signal to come in
-		signal.Reset(os.Interrupt) // Let go handle further signals
-		logger.Print("stopping traffic")
+		signal.Reset(os.Interrupt) // Let go handle next signals
+		logger.Print("stopping")
 		cancel()
 	}()
 

          
@@ 123,6 126,7 @@ func main() {
 			)
 		},
 	)
+	demux.Keepalive = *keepalive
 	demux.Logger = logger
 	fmt.Println(demux.Run(ctx, lastSeenSeq+1))
 }

          
M kgp.go +61 -8
@@ 32,14 32,6 @@ type Demux struct {
 
 	// Time between keepalive packets, 0 == no keepalive
 	Keepalive time.Duration
-	// Where to write keepalive timings format:
-	//
-	// 		keepalive 1234ms
-	//
-	// or
-	//
-	// 		keepalive timeout
-	KeepaliveWriter io.Writer
 
 	Logger *log.Logger
 

          
@@ 281,6 273,67 @@ func keepaliveReply(
 	}
 }
 
+func (demux *Demux) keepalive(
+	done func() <-chan struct{},
+	output chan<- *Packet,
+) chan<- *Packet {
+	var input = make(chan *Packet)
+	// if keepalive == 0 we should not get any keepalive ack back, discard them if needed
+	if demux.Keepalive == 0 {
+		go func() {
+			for {
+				var p = <-input
+				demux.printf(
+					"keepalive ack %d %d while keepalive disabled",
+					p.header.Sequence, p.header.Ack,
+				)
+			}
+		}()
+		return input
+	}
+
+	var ticker = time.NewTicker(demux.Keepalive)
+	var seq uint32 = 1
+
+	go func() {
+		for {
+			select {
+			case <-done():
+				return
+			case <-ticker.C:
+				{
+					// send keep-alive packet
+					// and wait for response
+					var k = NewPacket()
+					k.header.Sequence = seq
+					// send keepalive packet
+					output <- k
+				}
+
+				// now wait for the reply
+				var p = <-input
+				if p.header.Sequence != seq {
+					demux.printf(
+						"invalid keepalive ack %d %d",
+						p.header.Sequence, p.header.Ack,
+					)
+				}
+				seq += 1
+				if seq == 0 {
+					seq = 1
+				}
+			case p := <-input:
+				demux.printf(
+					"keepalive ack while not waiting for one %d %d",
+					p.header.Sequence, p.header.Ack,
+				)
+			}
+		}
+	}()
+
+	return input
+}
+
 func (demux *Demux) Run(ctx context.Context, seq uint32) error {
 	// Keeps track of the last sequence number from the server
 	var ack atomic.Value