Fix test + move some tests to handshake_test.go
2 files changed, 75 insertions(+), 63 deletions(-)

A => handshake_test.go
M kgp_test.go
A => handshake_test.go +71 -0
@@ 0,0 1,71 @@ 
+package kgp
+
+import (
+	"bytes"
+	"io"
+	"io/ioutil"
+	"testing"
+)
+
+const authMeta = `{"username": "u", "access_key": "k"}`
+
+func TestAnnouncement(t *testing.T) {
+	var buf bytes.Buffer
+	var announcement = Announcement{
+		Metadata: bytes.NewBufferString(authMeta),
+	}
+
+	if _, err := announcement.Write(&buf); err != nil {
+		t.Errorf("Announcement.Write failed: %s", err)
+	}
+
+	var a, err = ReadAnnouncement(&buf)
+	if err != nil {
+		t.Errorf("ReadAnnouncement failed: %s", err)
+	}
+
+	var empty UUID
+	if a.UUID != empty {
+		t.Errorf("invalid uuid: %q", a.UUID)
+	}
+	meta, err := ioutil.ReadAll(a.Metadata)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if !bytes.Equal(meta, []byte(authMeta)) {
+		t.Fatal(meta, "!=", authMeta)
+	}
+}
+
+func TestAnnouncementPipe(t *testing.T) {
+	var announcement = Announcement{
+		Metadata: bytes.NewBufferString(authMeta),
+	}
+	var server, client = io.Pipe()
+
+	go func() {
+		if _, err := announcement.Write(client); err != nil {
+			t.Fatalf("Announcement.Write failed: %s", err)
+		}
+		client.Close()
+	}()
+
+	var a, err = ReadAnnouncement(server)
+	if err != nil {
+		t.Fatalf("ReadAnnouncement failed: %s", err)
+	}
+
+	var empty UUID
+	if a.UUID != empty {
+		t.Fatalf("invalid uuid: %q", a.UUID)
+	}
+	meta, err := ioutil.ReadAll(a.Metadata)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if !bytes.Equal(meta, []byte(authMeta)) {
+		t.Fatal(meta, "!=", authMeta)
+	}
+
+	io.Copy(ioutil.Discard, server)
+}

          
M kgp_test.go +4 -63
@@ 5,74 5,11 @@ import (
 	"context"
 	"io"
 	"io/ioutil"
-	"net"
 	"strings"
 	"testing"
 	"time"
 )
 
-const authMeta = `{"username": "u", "access_key": "k"}`
-
-func TestAnnouncement(t *testing.T) {
-	var buf bytes.Buffer
-	var announcement = Announcement{
-		Metadata: bytes.NewBufferString(authMeta),
-	}
-
-	if _, err := announcement.Write(&buf); err != nil {
-		t.Errorf("Announcement.Write failed: %s", err)
-	}
-
-	var a, err = ReadAnnouncement(&buf)
-	if err != nil {
-		t.Errorf("ReadAnnouncement failed: %s", err)
-	}
-
-	var empty UUID
-	if a.UUID != empty {
-		t.Errorf("invalid uuid: %q", a.UUID)
-	}
-	meta, err := ioutil.ReadAll(a.Metadata)
-	if err != nil {
-		t.Fatal(err)
-	}
-	if !bytes.Equal(meta, []byte(authMeta)) {
-		t.Fatal(meta, "!=", authMeta)
-	}
-}
-
-func TestPipe(t *testing.T) {
-	var announcement = Announcement{
-		Metadata: bytes.NewBufferString(authMeta),
-	}
-	var client, server = net.Pipe()
-
-	go func() {
-		var a, err = ReadAnnouncement(server)
-		if err != nil {
-			t.Errorf("ReadAnnouncement failed: %s", err)
-		}
-
-		var empty UUID
-		if a.UUID != empty {
-			t.Errorf("invalid uuid: %q", a.UUID)
-		}
-		meta, err := ioutil.ReadAll(a.Metadata)
-		if err != nil {
-			t.Fatal(err)
-		}
-		if !bytes.Equal(meta, []byte(authMeta)) {
-			t.Fatal(meta, "!=", authMeta)
-		}
-
-		io.Copy(ioutil.Discard, server)
-	}()
-
-	if _, err := announcement.Write(client); err != nil {
-		t.Errorf("Announcement.Write failed: %s", err)
-	}
-}
-
 type recorder struct {
 	Input  io.Reader
 	Output io.Writer

          
@@ 270,6 207,10 @@ func TestRelayRequestResponse(t *testing
 	p.SetSeqAck(1, 0)
 	p.WriteTo(requestWriter)
 
+	// Wait for the channel to open. This is so we know for sure that seq = 1
+	// and ack = 1 when we write "hello". This avoids a race condition.
+	waitChannelCountEqual(1, demux, ctx.Done)
+
 	// Write "hello"
 	p.Payload = []byte("hello")
 	p.SetSeqAck(2, 0)