From 8ad935ef2ceb1400c84a9950ccd843711678235f Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 31 Mar 2017 17:59:37 -0700 Subject: [PATCH] etcdserver: use cancelable context for server initiated requests --- etcdserver/apply.go | 2 +- etcdserver/server.go | 19 +++++++++++++------ etcdserver/server_test.go | 21 +++++++++++++++++++++ 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/etcdserver/apply.go b/etcdserver/apply.go index b90e0a099..cfb7dfcb5 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -584,7 +584,7 @@ func (a *applierV3backend) AuthDisable() (*pb.AuthDisableResponse, error) { } func (a *applierV3backend) Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) { - ctx := context.WithValue(context.WithValue(context.TODO(), "index", a.s.consistIndex.ConsistentIndex()), "simpleToken", r.SimpleToken) + ctx := context.WithValue(context.WithValue(a.s.ctx, "index", a.s.consistIndex.ConsistentIndex()), "simpleToken", r.SimpleToken) return a.s.AuthStore().Authenticate(ctx, r.Name, r.Password) } diff --git a/etcdserver/server.go b/etcdserver/server.go index 70e14924b..f5665ccc4 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -238,6 +238,11 @@ type EtcdServer struct { // wg is used to wait for the go routines that depends on the server state // to exit when stopping the server. wg sync.WaitGroup + + // ctx is used for etcd-initiated requests that may need to be canceled + // on etcd server shutdown. + ctx context.Context + cancel context.CancelFunc } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -536,6 +541,7 @@ func (s *EtcdServer) start() { s.done = make(chan struct{}) s.stop = make(chan struct{}) s.stopping = make(chan struct{}) + s.ctx, s.cancel = context.WithCancel(context.Background()) s.readwaitc = make(chan struct{}, 1) s.readNotifier = newNotifier() if s.ClusterVersion() != nil { @@ -686,6 +692,7 @@ func (s *EtcdServer) run() { s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping close(s.stopping) s.wgMu.Unlock() + s.cancel() sched.Stop() @@ -740,7 +747,7 @@ func (s *EtcdServer) run() { } lid := lease.ID s.goAttach(func() { - s.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: int64(lid)}) + s.LeaseRevoke(s.ctx, &pb.LeaseRevokeRequest{ID: int64(lid)}) <-c }) } @@ -967,7 +974,7 @@ func (s *EtcdServer) TransferLeadership() error { } tm := s.Cfg.ReqTimeout() - ctx, cancel := context.WithTimeout(context.TODO(), tm) + ctx, cancel := context.WithTimeout(s.ctx, tm) err := s.transferLeadership(ctx, s.Lead(), uint64(transferee)) cancel() return err @@ -1181,7 +1188,6 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error // This makes no guarantee that the request will be proposed or performed. // The request will be canceled after the given timeout. func (s *EtcdServer) sync(timeout time.Duration) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) req := pb.Request{ Method: "SYNC", ID: s.reqIDGen.Next(), @@ -1190,6 +1196,7 @@ func (s *EtcdServer) sync(timeout time.Duration) { data := pbutil.MustMarshal(&req) // There is no promise that node has leader when do SYNC request, // so it uses goroutine to propose. + ctx, cancel := context.WithTimeout(s.ctx, timeout) s.goAttach(func() { s.r.Propose(ctx, data) cancel() @@ -1214,7 +1221,7 @@ func (s *EtcdServer) publish(timeout time.Duration) { } for { - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := context.WithTimeout(s.ctx, timeout) _, err := s.Do(ctx, req) cancel() switch err { @@ -1355,7 +1362,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { Alarm: pb.AlarmType_NOSPACE, } r := pb.InternalRaftRequest{Alarm: a} - s.processInternalRaftRequest(context.TODO(), r) + s.processInternalRaftRequest(s.ctx, r) s.w.Trigger(id, ar) }) } @@ -1551,7 +1558,7 @@ func (s *EtcdServer) updateClusterVersion(ver string) { Path: membership.StoreClusterVersionKey(), Val: ver, } - ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) + ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout()) _, err := s.Do(ctx, req) cancel() switch err { diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 57b1aebd8..bde16a503 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -721,9 +721,12 @@ func TestDoProposalStopped(t *testing.T) { // TestSync tests sync 1. is nonblocking 2. proposes SYNC request. func TestSync(t *testing.T) { n := newNodeRecorder() + ctx, cancel := context.WithCancel(context.TODO()) srv := &EtcdServer{ r: raftNode{Node: n}, reqIDGen: idutil.NewGenerator(0, time.Time{}), + ctx: ctx, + cancel: cancel, } srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} @@ -761,9 +764,12 @@ func TestSync(t *testing.T) { // after timeout func TestSyncTimeout(t *testing.T) { n := newProposalBlockerRecorder() + ctx, cancel := context.WithCancel(context.TODO()) srv := &EtcdServer{ r: raftNode{Node: n}, reqIDGen: idutil.NewGenerator(0, time.Time{}), + ctx: ctx, + cancel: cancel, } srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} @@ -1185,6 +1191,7 @@ func TestPublish(t *testing.T) { // simulate that request has gone through consensus ch <- Response{} w := wait.NewWithResponse(ch) + ctx, cancel := context.WithCancel(context.TODO()) srv := &EtcdServer{ readych: make(chan struct{}), Cfg: &ServerConfig{TickMs: 1}, @@ -1195,6 +1202,9 @@ func TestPublish(t *testing.T) { w: w, reqIDGen: idutil.NewGenerator(0, time.Time{}), SyncTicker: &time.Ticker{}, + + ctx: ctx, + cancel: cancel, } srv.publish(time.Hour) @@ -1228,6 +1238,7 @@ func TestPublish(t *testing.T) { // TestPublishStopped tests that publish will be stopped if server is stopped. func TestPublishStopped(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) srv := &EtcdServer{ Cfg: &ServerConfig{TickMs: 1}, r: raftNode{ @@ -1242,6 +1253,9 @@ func TestPublishStopped(t *testing.T) { stop: make(chan struct{}), reqIDGen: idutil.NewGenerator(0, time.Time{}), SyncTicker: &time.Ticker{}, + + ctx: ctx, + cancel: cancel, } close(srv.stopping) srv.publish(time.Hour) @@ -1249,6 +1263,7 @@ func TestPublishStopped(t *testing.T) { // TestPublishRetry tests that publish will keep retry until success. func TestPublishRetry(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) n := newNodeRecorderStream() srv := &EtcdServer{ Cfg: &ServerConfig{TickMs: 1}, @@ -1257,6 +1272,8 @@ func TestPublishRetry(t *testing.T) { stopping: make(chan struct{}), reqIDGen: idutil.NewGenerator(0, time.Time{}), SyncTicker: &time.Ticker{}, + ctx: ctx, + cancel: cancel, } // expect multiple proposals from retrying ch := make(chan struct{}) @@ -1287,6 +1304,7 @@ func TestUpdateVersion(t *testing.T) { // simulate that request has gone through consensus ch <- Response{} w := wait.NewWithResponse(ch) + ctx, cancel := context.WithCancel(context.TODO()) srv := &EtcdServer{ id: 1, Cfg: &ServerConfig{TickMs: 1}, @@ -1296,6 +1314,9 @@ func TestUpdateVersion(t *testing.T) { w: w, reqIDGen: idutil.NewGenerator(0, time.Time{}), SyncTicker: &time.Ticker{}, + + ctx: ctx, + cancel: cancel, } srv.updateClusterVersion("2.0.0")