mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #12803 from cwedgwood/metrics-3.4
etcdserver: fix incorrect metrics generated when clients cancel watches
This commit is contained in:
commit
82eae9227c
@ -217,8 +217,8 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
|
||||
return rpctypes.ErrGRPCNoLeader
|
||||
}
|
||||
|
||||
cctx, cancel := context.WithCancel(ss.Context())
|
||||
ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}
|
||||
ctx := newCancellableContext(ss.Context())
|
||||
ss = serverStreamWithCtx{ctx: ctx, ServerStream: ss}
|
||||
|
||||
smap.mu.Lock()
|
||||
smap.streams[ss] = struct{}{}
|
||||
@ -228,7 +228,8 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
|
||||
smap.mu.Lock()
|
||||
delete(smap.streams, ss)
|
||||
smap.mu.Unlock()
|
||||
cancel()
|
||||
// TODO: investigate whether the reason for cancellation here is useful to know
|
||||
ctx.Cancel(nil)
|
||||
}()
|
||||
}
|
||||
}
|
||||
@ -237,10 +238,52 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
|
||||
}
|
||||
}
|
||||
|
||||
// cancellableContext wraps a context with new cancellable context that allows a
|
||||
// specific cancellation error to be preserved and later retrieved using the
|
||||
// Context.Err() function. This is so downstream context users can disambiguate
|
||||
// the reason for the cancellation which could be from the client (for example)
|
||||
// or from this interceptor code.
|
||||
type cancellableContext struct {
|
||||
context.Context
|
||||
|
||||
lock sync.RWMutex
|
||||
cancel context.CancelFunc
|
||||
cancelReason error
|
||||
}
|
||||
|
||||
func newCancellableContext(parent context.Context) *cancellableContext {
|
||||
ctx, cancel := context.WithCancel(parent)
|
||||
return &cancellableContext{
|
||||
Context: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
// Cancel stores the cancellation reason and then delegates to context.WithCancel
|
||||
// against the parent context.
|
||||
func (c *cancellableContext) Cancel(reason error) {
|
||||
c.lock.Lock()
|
||||
c.cancelReason = reason
|
||||
c.lock.Unlock()
|
||||
c.cancel()
|
||||
}
|
||||
|
||||
// Err will return the preserved cancel reason error if present, and will
|
||||
// otherwise return the underlying error from the parent context.
|
||||
func (c *cancellableContext) Err() error {
|
||||
c.lock.RLock()
|
||||
defer c.lock.RUnlock()
|
||||
if c.cancelReason != nil {
|
||||
return c.cancelReason
|
||||
}
|
||||
return c.Context.Err()
|
||||
}
|
||||
|
||||
type serverStreamWithCtx struct {
|
||||
grpc.ServerStream
|
||||
ctx context.Context
|
||||
cancel *context.CancelFunc
|
||||
|
||||
// ctx is used so that we can preserve a reason for cancellation.
|
||||
ctx *cancellableContext
|
||||
}
|
||||
|
||||
func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
|
||||
@ -272,7 +315,7 @@ func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
|
||||
smap.mu.Lock()
|
||||
for ss := range smap.streams {
|
||||
if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
|
||||
(*ssWithCtx.cancel)()
|
||||
ssWithCtx.ctx.Cancel(rpctypes.ErrGRPCNoLeader)
|
||||
<-ss.Context().Done()
|
||||
}
|
||||
}
|
||||
|
@ -35,6 +35,8 @@ var (
|
||||
ErrGRPCLeaseExist = status.New(codes.FailedPrecondition, "etcdserver: lease already exists").Err()
|
||||
ErrGRPCLeaseTTLTooLarge = status.New(codes.OutOfRange, "etcdserver: too large lease TTL").Err()
|
||||
|
||||
ErrGRPCWatchCanceled = status.New(codes.Canceled, "etcdserver: watch canceled").Err()
|
||||
|
||||
ErrGRPCMemberExist = status.New(codes.FailedPrecondition, "etcdserver: member ID already exist").Err()
|
||||
ErrGRPCPeerURLExist = status.New(codes.FailedPrecondition, "etcdserver: Peer URLs already exists").Err()
|
||||
ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err()
|
||||
|
@ -206,15 +206,25 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
|
||||
}
|
||||
}()
|
||||
|
||||
// TODO: There's a race here. When a stream is closed (e.g. due to a cancellation),
|
||||
// the underlying error (e.g. a gRPC stream error) may be returned and handled
|
||||
// through errc if the recv goroutine finishes before the send goroutine.
|
||||
// When the recv goroutine wins, the stream error is retained. When recv loses
|
||||
// the race, the underlying error is lost (unless the root error is propagated
|
||||
// through Context.Err() which is not always the case (as callers have to decide
|
||||
// to implement a custom context to do so). The stdlib context package builtins
|
||||
// may be insufficient to carry semantically useful errors around and should be
|
||||
// revisited.
|
||||
select {
|
||||
case err = <-errc:
|
||||
if err == context.Canceled {
|
||||
err = rpctypes.ErrGRPCWatchCanceled
|
||||
}
|
||||
close(sws.ctrlStream)
|
||||
|
||||
case <-stream.Context().Done():
|
||||
err = stream.Context().Err()
|
||||
// the only server-side cancellation is noleader for now.
|
||||
if err == context.Canceled {
|
||||
err = rpctypes.ErrGRPCNoLeader
|
||||
err = rpctypes.ErrGRPCWatchCanceled
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -49,6 +49,7 @@ func metricsTest(cx ctlCtx) {
|
||||
{"/metrics", fmt.Sprintf("etcd_mvcc_delete_total 3")},
|
||||
{"/metrics", fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version)},
|
||||
{"/metrics", fmt.Sprintf(`etcd_cluster_version{cluster_version="%s"} 1`, version.Cluster(version.Version))},
|
||||
{"/metrics", fmt.Sprintf(`grpc_server_handled_total{grpc_code="Canceled",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"} 6`)},
|
||||
{"/health", `{"health":"true"}`},
|
||||
} {
|
||||
i++
|
||||
@ -58,7 +59,9 @@ func metricsTest(cx ctlCtx) {
|
||||
if err := ctlV3Del(cx, []string{fmt.Sprintf("%d", i)}, 1); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := ctlV3Watch(cx, []string{"k", "--rev", "1"}, []kvExec{{key: "k", val: "v"}}...); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
if err := cURLGet(cx.epc, cURLReq{endpoint: test.endpoint, expected: test.expected, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
|
||||
cx.t.Fatalf("failed get with curl (%v)", err)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user