Merge pull request #5332 from xiang90/sl

*: cancel required leader streams when memeber lost its leader
This commit is contained in:
Xiang Li 2016-05-12 20:24:34 -07:00
commit a880e9c7cb
9 changed files with 217 additions and 44 deletions

View File

@ -29,7 +29,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
opts = append(opts, grpc.Creds(credentials.NewTLS(tls)))
}
opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s)))
opts = append(opts, grpc.StreamInterceptor(metricsStreamInterceptor))
opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s)))
grpcServer := grpc.NewServer(opts...)
pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))

View File

@ -16,6 +16,7 @@ package v3rpc
import (
"strings"
"sync"
"time"
"github.com/coreos/etcd/etcdserver"
@ -28,6 +29,15 @@ import (
"google.golang.org/grpc/metadata"
)
const (
maxNoLeaderCnt = 3
)
type streamsMap struct {
mu sync.Mutex
streams map[grpc.ServerStream]struct{}
}
func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
md, ok := metadata.FromContext(ctx)
@ -42,6 +52,37 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
}
}
func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
smap := monitorLeader(s)
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromContext(ss.Context())
if ok {
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
if s.Leader() == types.ID(raft.None) {
return rpctypes.ErrGRPCNoLeader
}
cctx, cancel := context.WithCancel(ss.Context())
ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}
smap.mu.Lock()
smap.streams[ss] = struct{}{}
smap.mu.Unlock()
defer func() {
smap.mu.Lock()
delete(smap.streams, ss)
smap.mu.Unlock()
cancel()
}()
}
}
return metricsStreamInterceptor(srv, ss, info, handler)
}
}
func metricsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
service, method := splitMethodName(info.FullMethod)
receivedCounter.WithLabelValues(service, method).Inc()
@ -75,3 +116,52 @@ func splitMethodName(fullMethodName string) (string, string) {
}
return "unknown", "unknown"
}
type serverStreamWithCtx struct {
grpc.ServerStream
ctx context.Context
cancel *context.CancelFunc
}
func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
smap := &streamsMap{
streams: make(map[grpc.ServerStream]struct{}),
}
go func() {
election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond
noLeaderCnt := 0
for {
select {
case <-s.StopNotify():
return
case <-time.After(election):
if s.Leader() == types.ID(raft.None) {
noLeaderCnt++
} else {
noLeaderCnt = 0
}
// We are more conservative on canceling existing streams. Reconnecting streams
// cost much more than just rejecting new requests. So we wait until the member
// cannot find a leader for maxNoLeaderCnt election timeouts to cancel existing streams.
if noLeaderCnt >= maxNoLeaderCnt {
smap.mu.Lock()
for ss := range smap.streams {
if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
(*ssWithCtx.cancel)()
<-ss.Context().Done()
}
}
smap.streams = make(map[grpc.ServerStream]struct{})
smap.mu.Unlock()
}
}
}
}()
return smap
}

View File

@ -19,7 +19,10 @@ import (
"sync"
"time"
"golang.org/x/net/context"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/mvccpb"
@ -105,10 +108,24 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
progress: make(map[mvcc.WatchID]bool),
closec: make(chan struct{}),
}
defer sws.close()
go sws.sendLoop()
return sws.recvLoop()
errc := make(chan error, 1)
go func() {
errc <- sws.recvLoop()
sws.close()
}()
select {
case err := <-errc:
return err
case <-stream.Context().Done():
err := stream.Context().Err()
// the only server-side cancellation is noleader for now.
if err == context.Canceled {
return rpctypes.ErrGRPCNoLeader
}
return err
}
}
func (sws *serverWatchStream) recvLoop() error {

View File

@ -50,20 +50,20 @@ const (
)
func NewBackendQuota(s *EtcdServer) Quota {
if s.cfg.QuotaBackendBytes < 0 {
if s.Cfg.QuotaBackendBytes < 0 {
// disable quotas if negative
plog.Warningf("disabling backend quota")
return &passthroughQuota{}
}
if s.cfg.QuotaBackendBytes == 0 {
if s.Cfg.QuotaBackendBytes == 0 {
// use default size if no quota size given
return &backendQuota{s, backend.DefaultQuotaBytes}
}
if s.cfg.QuotaBackendBytes > backend.MaxQuotaBytes {
plog.Warningf("backend quota %v exceeds maximum quota %v; using maximum", s.cfg.QuotaBackendBytes, backend.MaxQuotaBytes)
if s.Cfg.QuotaBackendBytes > backend.MaxQuotaBytes {
plog.Warningf("backend quota %v exceeds maximum quota %v; using maximum", s.Cfg.QuotaBackendBytes, backend.MaxQuotaBytes)
return &backendQuota{s, backend.MaxQuotaBytes}
}
return &backendQuota{s, s.cfg.QuotaBackendBytes}
return &backendQuota{s, s.Cfg.QuotaBackendBytes}
}
func (b *backendQuota) Available(v interface{}) bool {

View File

@ -134,8 +134,8 @@ func (r *raftNode) start(s *EtcdServer) {
r.done = make(chan struct{})
heartbeat := 200 * time.Millisecond
if s.cfg != nil {
heartbeat = time.Duration(s.cfg.TickMs) * time.Millisecond
if s.Cfg != nil {
heartbeat = time.Duration(s.Cfg.TickMs) * time.Millisecond
}
// set up contention detectors for raft heartbeat message.
// expect to send a heartbeat within 2 heartbeat intervals.
@ -173,7 +173,7 @@ func (r *raftNode) start(s *EtcdServer) {
// it promotes or demotes instead of modifying server directly.
syncC = r.s.SyncTicker
if r.s.lessor != nil {
r.s.lessor.Promote(r.s.cfg.electionTimeout())
r.s.lessor.Promote(r.s.Cfg.electionTimeout())
}
// TODO: remove the nil checking
// current test utility does not provide the stats
@ -238,7 +238,7 @@ func (r *raftNode) start(s *EtcdServer) {
raftDone <- struct{}{}
r.Advance()
case <-syncC:
r.s.sync(r.s.cfg.ReqTimeout())
r.s.sync(r.s.Cfg.ReqTimeout())
case <-r.stopped:
return
}

View File

@ -162,11 +162,11 @@ type EtcdServer struct {
// count the number of inflight snapshots.
// MUST use atomic operation to access this field.
inflightSnapshots int64
Cfg *ServerConfig
readych chan struct{}
r raftNode
cfg *ServerConfig
snapCount uint64
w wait.Wait
@ -369,7 +369,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
srv = &EtcdServer{
readych: make(chan struct{}),
cfg: cfg,
Cfg: cfg,
snapCount: cfg.SnapCount,
errorc: make(chan error, 1),
store: st,
@ -444,7 +444,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
// It also starts a goroutine to publish its server information.
func (s *EtcdServer) Start() {
s.start()
go s.publish(s.cfg.ReqTimeout())
go s.publish(s.Cfg.ReqTimeout())
go s.purgeFile()
go monitorFileDescriptor(s.done)
go s.monitorVersions()
@ -473,11 +473,11 @@ func (s *EtcdServer) start() {
func (s *EtcdServer) purgeFile() {
var serrc, werrc <-chan error
if s.cfg.MaxSnapFiles > 0 {
serrc = fileutil.PurgeFile(s.cfg.SnapDir(), "snap", s.cfg.MaxSnapFiles, purgeFileInterval, s.done)
if s.Cfg.MaxSnapFiles > 0 {
serrc = fileutil.PurgeFile(s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done)
}
if s.cfg.MaxWALFiles > 0 {
werrc = fileutil.PurgeFile(s.cfg.WALDir(), "wal", s.cfg.MaxWALFiles, purgeFileInterval, s.done)
if s.Cfg.MaxWALFiles > 0 {
werrc = fileutil.PurgeFile(s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.done)
}
select {
case e := <-werrc:
@ -623,7 +623,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
plog.Panicf("get database snapshot file path error: %v", err)
}
fn := path.Join(s.cfg.SnapDir(), databaseFilename)
fn := path.Join(s.Cfg.SnapDir(), databaseFilename)
if err := os.Rename(snapfn, fn); err != nil {
plog.Panicf("rename snapshot file error: %v", err)
}
@ -764,7 +764,7 @@ func (s *EtcdServer) LeaderStats() []byte {
func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() }
func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) error {
if s.cfg.StrictReconfigCheck && !s.cluster.IsReadyToAddNewMember() {
if s.Cfg.StrictReconfigCheck && !s.cluster.IsReadyToAddNewMember() {
// If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd.
// In such a case adding a new member is allowed unconditionally
return ErrNotEnoughStartedMembers
@ -784,7 +784,7 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) erro
}
func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error {
if s.cfg.StrictReconfigCheck && !s.cluster.IsReadyToRemoveMember(id) {
if s.Cfg.StrictReconfigCheck && !s.cluster.IsReadyToRemoveMember(id) {
// If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd.
// In such a case removing a member is allowed unconditionally
return ErrNotEnoughStartedMembers
@ -823,7 +823,7 @@ func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.r.lead) }
func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }
func (s *EtcdServer) IsPprofEnabled() bool { return s.cfg.EnablePprof }
func (s *EtcdServer) IsPprofEnabled() bool { return s.Cfg.EnablePprof }
// configure sends a configuration change through consensus and
// then waits for it to be applied to the server. It
@ -939,7 +939,7 @@ func (s *EtcdServer) send(ms []raftpb.Message) {
ok, exceed := s.r.td.Observe(ms[i].To)
if !ok {
// TODO: limit request rate.
plog.Warningf("failed to send out heartbeat on time (exceeded the %dms timeout for %v)", s.cfg.TickMs, exceed)
plog.Warningf("failed to send out heartbeat on time (exceeded the %dms timeout for %v)", s.Cfg.TickMs, exceed)
plog.Warningf("server is likely overloaded")
}
}
@ -1221,7 +1221,7 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
Path: membership.StoreClusterVersionKey(),
Val: ver,
}
ctx, cancel := context.WithTimeout(context.Background(), s.cfg.ReqTimeout())
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
_, err := s.Do(ctx, req)
cancel()
switch err {
@ -1241,7 +1241,7 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
return ErrCanceled
case context.DeadlineExceeded:
curLeadElected := s.r.leadElectedTime()
prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.cfg.ElectionTicks) * time.Duration(s.cfg.TickMs) * time.Millisecond)
prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond)
if start.After(prevLeadLost) && start.Before(curLeadElected) {
return ErrTimeoutDueToLeaderFail
}

View File

@ -174,7 +174,7 @@ func TestApplyRepeat(t *testing.T) {
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
},
cfg: &ServerConfig{},
Cfg: &ServerConfig{},
store: st,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -525,7 +525,7 @@ func TestApplyConfChangeError(t *testing.T) {
srv := &EtcdServer{
r: raftNode{Node: n},
cluster: cl,
cfg: &ServerConfig{},
Cfg: &ServerConfig{},
}
_, err := srv.applyConfChange(tt.cc, nil)
if err != tt.werr {
@ -629,7 +629,7 @@ func TestDoProposal(t *testing.T) {
for i, tt := range tests {
st := mockstore.NewRecorder()
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
Cfg: &ServerConfig{TickMs: 1},
r: raftNode{
Node: newNodeCommitter(),
storage: mockstorage.NewStorageRecorder(""),
@ -661,7 +661,7 @@ func TestDoProposal(t *testing.T) {
func TestDoProposalCancelled(t *testing.T) {
wt := mockwait.NewRecorder()
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
Cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: newNodeNop()},
w: wt,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -683,7 +683,7 @@ func TestDoProposalCancelled(t *testing.T) {
func TestDoProposalTimeout(t *testing.T) {
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
Cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: newNodeNop()},
w: mockwait.NewNop(),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -699,7 +699,7 @@ func TestDoProposalTimeout(t *testing.T) {
func TestDoProposalStopped(t *testing.T) {
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
Cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: newNodeNop()},
w: mockwait.NewNop(),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -789,7 +789,7 @@ func TestSyncTrigger(t *testing.T) {
n := newReadyNode()
st := make(chan time.Time, 1)
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
Cfg: &ServerConfig{TickMs: 1},
r: raftNode{
Node: n,
raftStorage: raft.NewMemoryStorage(),
@ -847,7 +847,7 @@ func TestSnapshot(t *testing.T) {
st := mockstore.NewRecorder()
p := mockstorage.NewStorageRecorder("")
srv := &EtcdServer{
cfg: &ServerConfig{},
Cfg: &ServerConfig{},
r: raftNode{
Node: newNodeNop(),
raftStorage: s,
@ -889,7 +889,7 @@ func TestTriggerSnap(t *testing.T) {
st := mockstore.NewRecorder()
p := mockstorage.NewStorageRecorderStream("")
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
Cfg: &ServerConfig{TickMs: 1},
snapCount: uint64(snapc),
r: raftNode{
Node: newNodeCommitter(),
@ -955,7 +955,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
rs := raft.NewMemoryStorage()
tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
s := &EtcdServer{
cfg: &ServerConfig{
Cfg: &ServerConfig{
DataDir: testdir,
},
r: raftNode{
@ -1045,7 +1045,7 @@ func TestAddMember(t *testing.T) {
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
},
cfg: &ServerConfig{},
Cfg: &ServerConfig{},
store: st,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -1085,7 +1085,7 @@ func TestRemoveMember(t *testing.T) {
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
},
cfg: &ServerConfig{},
Cfg: &ServerConfig{},
store: st,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -1156,7 +1156,7 @@ func TestPublish(t *testing.T) {
w := wait.NewWithResponse(ch)
srv := &EtcdServer{
readych: make(chan struct{}),
cfg: &ServerConfig{TickMs: 1},
Cfg: &ServerConfig{TickMs: 1},
id: 1,
r: raftNode{Node: n},
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
@ -1197,7 +1197,7 @@ func TestPublish(t *testing.T) {
// TestPublishStopped tests that publish will be stopped if server is stopped.
func TestPublishStopped(t *testing.T) {
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
Cfg: &ServerConfig{TickMs: 1},
r: raftNode{
Node: newNodeNop(),
transport: rafthttp.NewNopTransporter(),
@ -1216,7 +1216,7 @@ func TestPublishStopped(t *testing.T) {
func TestPublishRetry(t *testing.T) {
n := newNodeRecorder()
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
Cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: n},
w: mockwait.NewNop(),
done: make(chan struct{}),
@ -1241,7 +1241,7 @@ func TestUpdateVersion(t *testing.T) {
w := wait.NewWithResponse(ch)
srv := &EtcdServer{
id: 1,
cfg: &ServerConfig{TickMs: 1},
Cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: n},
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
cluster: &membership.RaftCluster{},

View File

@ -179,7 +179,7 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
leader := s.cluster.Member(s.Leader())
for i := 0; i < 5 && leader == nil; i++ {
// wait an election
dur := time.Duration(s.cfg.ElectionTicks) * time.Duration(s.cfg.TickMs) * time.Millisecond
dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond
select {
case <-time.After(dur):
leader = s.cluster.Member(s.Leader())
@ -193,7 +193,7 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
for _, url := range leader.PeerURLs {
lurl := url + "/leases"
ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.cfg.peerDialTimeout())
ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.Cfg.peerDialTimeout())
if err == nil {
break
}

View File

@ -979,6 +979,8 @@ func TestTLSGRPCAcceptSecureAll(t *testing.T) {
}
func TestGRPCRequireLeader(t *testing.T) {
t.Parallel()
defer testutil.AfterTest(t)
cfg := ClusterConfig{Size: 3}
@ -1004,3 +1006,67 @@ func TestGRPCRequireLeader(t *testing.T) {
t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
}
}
func TestGRPCStreamRequireLeader(t *testing.T) {
t.Parallel()
defer testutil.AfterTest(t)
cfg := ClusterConfig{Size: 3}
clus := newClusterV3NoClients(t, &cfg)
defer clus.Terminate(t)
client, err := NewClientV3(clus.Members[0])
if err != nil {
t.Fatalf("failed to create client (%v)", err)
}
defer client.Close()
wAPI := toGRPC(client).Watch
md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
ctx := metadata.NewContext(context.Background(), md)
wStream, err := wAPI.Watch(ctx)
if err != nil {
t.Fatalf("wAPI.Watch error: %v", err)
}
clus.Members[1].Stop(t)
clus.Members[2].Stop(t)
// existing stream should be rejected
_, err = wStream.Recv()
if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
}
// new stream should also be rejected
wStream, err = wAPI.Watch(ctx)
if err != nil {
t.Fatalf("wAPI.Watch error: %v", err)
}
_, err = wStream.Recv()
if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
}
clus.Members[1].Restart(t)
clus.Members[2].Restart(t)
clus.waitLeader(t, clus.Members)
time.Sleep(time.Duration(2*electionTicks) * tickDuration)
// new stream should also be OK now after we restarted the other members
wStream, err = wAPI.Watch(ctx)
if err != nil {
t.Fatalf("wAPI.Watch error: %v", err)
}
wreq := &pb.WatchRequest{
RequestUnion: &pb.WatchRequest_CreateRequest{
CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")},
},
}
err = wStream.Send(wreq)
if err != nil {
t.Errorf("err = %v, want nil", err)
}
}