diff --git a/etcdserver/server.go b/etcdserver/server.go index 5eca50f9e..ed325a324 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -396,6 +396,10 @@ func (s *EtcdServer) Stop() { <-s.done } +// StopNotify returns a channel that receives a empty struct +// when the server is stopped. +func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done } + // Do interprets r and performs an operation on s.store according to r.Method // and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with // Quorum == true, r will be sent through consensus before performing its @@ -452,18 +456,14 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { } } -func (s *EtcdServer) SelfStats() []byte { - return s.stats.JSON() -} +func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() } func (s *EtcdServer) LeaderStats() []byte { // TODO(jonboulle): need to lock access to lstats, set it to nil when not leader, ... return s.lstats.JSON() } -func (s *EtcdServer) StoreStats() []byte { - return s.store.JsonStats() -} +func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() } func (s *EtcdServer) UpdateRecvApp(from types.ID, length int64) { s.stats.RecvAppendReq(from.String(), int(length)) @@ -508,13 +508,9 @@ func (s *EtcdServer) UpdateMember(ctx context.Context, memb Member) error { } // Implement the RaftTimer interface -func (s *EtcdServer) Index() uint64 { - return atomic.LoadUint64(&s.raftIndex) -} +func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.raftIndex) } -func (s *EtcdServer) Term() uint64 { - return atomic.LoadUint64(&s.raftTerm) -} +func (s *EtcdServer) Term() uint64 { return atomic.LoadUint64(&s.raftTerm) } // configure sends a configuration change through consensus and // then waits for it to be applied to the server. It diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 162a8b38a..8dd653b90 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -677,6 +677,8 @@ func TestDoProposalStopped(t *testing.T) { tk := make(chan time.Time) // this makes <-tk always successful, which accelarates internal clock close(tk) + cl := newCluster("abc") + cl.SetStore(store.New()) srv := &EtcdServer{ // TODO: use fake node for better testability node: n, @@ -684,6 +686,7 @@ func TestDoProposalStopped(t *testing.T) { sender: &nopSender{}, storage: &storageRecorder{}, Ticker: tk, + Cluster: cl, } srv.start() @@ -1132,6 +1135,30 @@ func TestPublishRetry(t *testing.T) { } } +func TestStopNotify(t *testing.T) { + s := &EtcdServer{ + stop: make(chan struct{}), + done: make(chan struct{}), + } + go func() { + <-s.stop + close(s.done) + }() + + notifier := s.StopNotify() + select { + case <-notifier: + t.Fatalf("received unexpected stop notification") + default: + } + s.Stop() + select { + case <-notifier: + default: + t.Fatalf("cannot receive stop notification") + } +} + func TestGetOtherPeerURLs(t *testing.T) { tests := []struct { membs []*Member