diff --git a/etcdmain/serve.go b/etcdmain/serve.go index b525ceead..f0d634671 100644 --- a/etcdmain/serve.go +++ b/etcdmain/serve.go @@ -42,6 +42,9 @@ type serveCtx struct { func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler) error { logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) + <-s.ReadyNotify() + plog.Info("ready to serve client requests") + m := cmux.New(sctx.l) if sctx.insecure { diff --git a/etcdserver/server.go b/etcdserver/server.go index a55b48f92..cb1d32132 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -162,7 +162,9 @@ type EtcdServer struct { // count the number of inflight snapshots. // MUST use atomic operation to access this field. inflightSnapshots int64 - r raftNode + + readych chan struct{} + r raftNode cfg *ServerConfig snapCount uint64 @@ -366,6 +368,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { lstats := stats.NewLeaderStats(id.String()) srv = &EtcdServer{ + readych: make(chan struct{}), cfg: cfg, snapCount: cfg.SnapCount, errorc: make(chan error, 1), @@ -729,6 +732,10 @@ func (s *EtcdServer) Stop() { <-s.done } +// ReadyNotify returns a channel that will be closed when the server +// is ready to serve client requests +func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych } + func (s *EtcdServer) stopWithDelay(d time.Duration, err error) { select { case <-time.After(d): @@ -888,6 +895,7 @@ func (s *EtcdServer) publish(timeout time.Duration) { cancel() switch err { case nil: + close(s.readych) plog.Infof("published %+v to cluster %s", s.attributes, s.cluster.ID()) return case ErrStopped: diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 4509454a4..3295917cb 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1155,6 +1155,7 @@ func TestPublish(t *testing.T) { ch <- Response{} w := wait.NewWithResponse(ch) srv := &EtcdServer{ + readych: make(chan struct{}), cfg: &ServerConfig{TickMs: 1}, id: 1, r: raftNode{Node: n}, diff --git a/integration/member_test.go b/integration/member_test.go index f705bc90c..e1a663ebc 100644 --- a/integration/member_test.go +++ b/integration/member_test.go @@ -61,6 +61,7 @@ func TestRestartMember(t *testing.T) { t.Fatal(err) } } + clusterMustProgress(t, c.Members) } @@ -105,6 +106,7 @@ func TestSnapshotAndRestartMember(t *testing.T) { m.Stop(t) m.Restart(t) + m.WaitOK(t) for i := 0; i < 120; i++ { cc := mustNewHTTPClient(t, []string{m.URL()}, nil) kapi := client.NewKeysAPI(cc)