integration: test embed.Etcd.Close with watch

Ensure 'Close' returns in time when there are open
connections (watch streams).

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyu-Ho Lee 2017-07-14 14:33:48 -07:00
parent e8f3cbf1c6
commit a5d94fe229

View File

@ -15,13 +15,16 @@
package integration
import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/embed"
)
@ -102,6 +105,47 @@ func TestEmbedEtcd(t *testing.T) {
}
}
// TestEmbedEtcdGracefulStop ensures embedded server stops
// cutting existing transports.
func TestEmbedEtcdGracefulStop(t *testing.T) {
cfg := embed.NewConfig()
urls := newEmbedURLs(2)
setupEmbedCfg(cfg, []url.URL{urls[0]}, []url.URL{urls[1]})
cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprintf("embed-etcd"))
os.RemoveAll(cfg.Dir)
defer os.RemoveAll(cfg.Dir)
e, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
}
<-e.Server.ReadyNotify() // wait for e.Server to join the cluster
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{urls[0].String()}})
if err != nil {
t.Fatal(err)
}
defer cli.Close()
// open watch connection
cli.Watch(context.Background(), "foo")
donec := make(chan struct{})
go func() {
e.Close()
close(donec)
}()
select {
case err := <-e.Err():
t.Fatal(err)
case <-donec:
case <-time.After(2*time.Second + e.Server.Cfg.ReqTimeout()):
t.Fatalf("took too long to close server")
}
}
func newEmbedURLs(n int) (urls []url.URL) {
for i := 0; i < n; i++ {
u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d%06d", os.Getpid(), i))