etcdserver: use cancelable context for server initiated requests

This commit is contained in:
Anthony Romano 2017-03-31 17:59:37 -07:00
parent 9ca7f22e84
commit 8ad935ef2c
3 changed files with 35 additions and 7 deletions

View File

@ -584,7 +584,7 @@ func (a *applierV3backend) AuthDisable() (*pb.AuthDisableResponse, error) {
}
func (a *applierV3backend) Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) {
ctx := context.WithValue(context.WithValue(context.TODO(), "index", a.s.consistIndex.ConsistentIndex()), "simpleToken", r.SimpleToken)
ctx := context.WithValue(context.WithValue(a.s.ctx, "index", a.s.consistIndex.ConsistentIndex()), "simpleToken", r.SimpleToken)
return a.s.AuthStore().Authenticate(ctx, r.Name, r.Password)
}

View File

@ -238,6 +238,11 @@ type EtcdServer struct {
// wg is used to wait for the go routines that depends on the server state
// to exit when stopping the server.
wg sync.WaitGroup
// ctx is used for etcd-initiated requests that may need to be canceled
// on etcd server shutdown.
ctx context.Context
cancel context.CancelFunc
}
// NewServer creates a new EtcdServer from the supplied configuration. The
@ -536,6 +541,7 @@ func (s *EtcdServer) start() {
s.done = make(chan struct{})
s.stop = make(chan struct{})
s.stopping = make(chan struct{})
s.ctx, s.cancel = context.WithCancel(context.Background())
s.readwaitc = make(chan struct{}, 1)
s.readNotifier = newNotifier()
if s.ClusterVersion() != nil {
@ -686,6 +692,7 @@ func (s *EtcdServer) run() {
s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping
close(s.stopping)
s.wgMu.Unlock()
s.cancel()
sched.Stop()
@ -740,7 +747,7 @@ func (s *EtcdServer) run() {
}
lid := lease.ID
s.goAttach(func() {
s.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: int64(lid)})
s.LeaseRevoke(s.ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
<-c
})
}
@ -967,7 +974,7 @@ func (s *EtcdServer) TransferLeadership() error {
}
tm := s.Cfg.ReqTimeout()
ctx, cancel := context.WithTimeout(context.TODO(), tm)
ctx, cancel := context.WithTimeout(s.ctx, tm)
err := s.transferLeadership(ctx, s.Lead(), uint64(transferee))
cancel()
return err
@ -1181,7 +1188,6 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error
// This makes no guarantee that the request will be proposed or performed.
// The request will be canceled after the given timeout.
func (s *EtcdServer) sync(timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
req := pb.Request{
Method: "SYNC",
ID: s.reqIDGen.Next(),
@ -1190,6 +1196,7 @@ func (s *EtcdServer) sync(timeout time.Duration) {
data := pbutil.MustMarshal(&req)
// There is no promise that node has leader when do SYNC request,
// so it uses goroutine to propose.
ctx, cancel := context.WithTimeout(s.ctx, timeout)
s.goAttach(func() {
s.r.Propose(ctx, data)
cancel()
@ -1214,7 +1221,7 @@ func (s *EtcdServer) publish(timeout time.Duration) {
}
for {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := context.WithTimeout(s.ctx, timeout)
_, err := s.Do(ctx, req)
cancel()
switch err {
@ -1355,7 +1362,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
Alarm: pb.AlarmType_NOSPACE,
}
r := pb.InternalRaftRequest{Alarm: a}
s.processInternalRaftRequest(context.TODO(), r)
s.processInternalRaftRequest(s.ctx, r)
s.w.Trigger(id, ar)
})
}
@ -1551,7 +1558,7 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
Path: membership.StoreClusterVersionKey(),
Val: ver,
}
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
_, err := s.Do(ctx, req)
cancel()
switch err {

View File

@ -721,9 +721,12 @@ func TestDoProposalStopped(t *testing.T) {
// TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
func TestSync(t *testing.T) {
n := newNodeRecorder()
ctx, cancel := context.WithCancel(context.TODO())
srv := &EtcdServer{
r: raftNode{Node: n},
reqIDGen: idutil.NewGenerator(0, time.Time{}),
ctx: ctx,
cancel: cancel,
}
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
@ -761,9 +764,12 @@ func TestSync(t *testing.T) {
// after timeout
func TestSyncTimeout(t *testing.T) {
n := newProposalBlockerRecorder()
ctx, cancel := context.WithCancel(context.TODO())
srv := &EtcdServer{
r: raftNode{Node: n},
reqIDGen: idutil.NewGenerator(0, time.Time{}),
ctx: ctx,
cancel: cancel,
}
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
@ -1185,6 +1191,7 @@ func TestPublish(t *testing.T) {
// simulate that request has gone through consensus
ch <- Response{}
w := wait.NewWithResponse(ch)
ctx, cancel := context.WithCancel(context.TODO())
srv := &EtcdServer{
readych: make(chan struct{}),
Cfg: &ServerConfig{TickMs: 1},
@ -1195,6 +1202,9 @@ func TestPublish(t *testing.T) {
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
ctx: ctx,
cancel: cancel,
}
srv.publish(time.Hour)
@ -1228,6 +1238,7 @@ func TestPublish(t *testing.T) {
// TestPublishStopped tests that publish will be stopped if server is stopped.
func TestPublishStopped(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1},
r: raftNode{
@ -1242,6 +1253,9 @@ func TestPublishStopped(t *testing.T) {
stop: make(chan struct{}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
ctx: ctx,
cancel: cancel,
}
close(srv.stopping)
srv.publish(time.Hour)
@ -1249,6 +1263,7 @@ func TestPublishStopped(t *testing.T) {
// TestPublishRetry tests that publish will keep retry until success.
func TestPublishRetry(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
n := newNodeRecorderStream()
srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1},
@ -1257,6 +1272,8 @@ func TestPublishRetry(t *testing.T) {
stopping: make(chan struct{}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
ctx: ctx,
cancel: cancel,
}
// expect multiple proposals from retrying
ch := make(chan struct{})
@ -1287,6 +1304,7 @@ func TestUpdateVersion(t *testing.T) {
// simulate that request has gone through consensus
ch <- Response{}
w := wait.NewWithResponse(ch)
ctx, cancel := context.WithCancel(context.TODO())
srv := &EtcdServer{
id: 1,
Cfg: &ServerConfig{TickMs: 1},
@ -1296,6 +1314,9 @@ func TestUpdateVersion(t *testing.T) {
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
ctx: ctx,
cancel: cancel,
}
srv.updateClusterVersion("2.0.0")