diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 52a73288a..e4517e9ef 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -30,6 +30,9 @@ const ( // accept large request which might block raft stream. User // specify a large value might end up with shooting in the foot. maxRequestBytes = 1.5 * 1024 * 1024 + + // max timeout for waiting a v3 request to go through raft. + maxV3RequestTimeout = 5 * time.Second ) type RaftKV interface { @@ -283,14 +286,17 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern ch := s.w.Register(r.ID) - s.r.Propose(ctx, data) + cctx, cancel := context.WithTimeout(ctx, maxV3RequestTimeout) + defer cancel() + + s.r.Propose(cctx, data) select { case x := <-ch: return x.(*applyResult), nil - case <-ctx.Done(): + case <-cctx.Done(): s.w.Trigger(r.ID, nil) // GC wait - return nil, ctx.Err() + return nil, cctx.Err() case <-s.done: return nil, ErrStopped } diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index c400d8b25..95d93c174 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -16,6 +16,7 @@ package integration import ( "fmt" + "math/rand" "reflect" "testing" "time" @@ -74,6 +75,41 @@ func TestV3PutOverwrite(t *testing.T) { } } +// TestPutRestart checks if a put after an unrelated member restart succeeds +func TestV3PutRestart(t *testing.T) { + // this test might block for 5 seconds, make it parallel to speed up the test. + t.Parallel() + + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + kvIdx := rand.Intn(3) + kvc := toGRPC(clus.Client(kvIdx)).KV + + stopIdx := kvIdx + for stopIdx == kvIdx { + stopIdx = rand.Intn(3) + } + + clus.clients[stopIdx].Close() + clus.Members[stopIdx].Stop(t) + clus.Members[stopIdx].Restart(t) + c, cerr := NewClientV3(clus.Members[stopIdx]) + if cerr != nil { + t.Fatal(cerr) + } + clus.clients[stopIdx] = c + + ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) + defer cancel() + reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} + _, err := kvc.Put(ctx, reqput) + if err != nil && err == ctx.Err() { + t.Fatalf("expected grpc error, got local ctx error (%v)", err) + } +} + // TestV3CompactCurrentRev ensures keys are present when compacting on current revision. func TestV3CompactCurrentRev(t *testing.T) { defer testutil.AfterTest(t)