From e1df2156c71f096733f63549e22c461b7c023059 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Mon, 14 May 2018 11:32:17 -0700 Subject: [PATCH] etcdserver/api/v3rpc: clean up godoc Signed-off-by: Gyuho Lee --- etcdserver/api/v3rpc/watch.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 5a1f621cc..0561edec4 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -42,6 +42,7 @@ type watchServer struct { lg *zap.Logger } +// NewWatchServer returns a new watch server. func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer { return &watchServer{ clusterID: int64(s.Cluster().ID()), @@ -61,6 +62,7 @@ var ( progressReportIntervalMu sync.RWMutex ) +// GetProgressReportInterval returns the current progress report interval (for testing). func GetProgressReportInterval() time.Duration { progressReportIntervalMu.RLock() interval := progressReportInterval @@ -74,20 +76,19 @@ func GetProgressReportInterval() time.Duration { return interval + jitter } +// SetProgressReportInterval updates the current progress report interval (for testing). func SetProgressReportInterval(newTimeout time.Duration) { progressReportIntervalMu.Lock() defer progressReportIntervalMu.Unlock() progressReportInterval = newTimeout } -const ( - // We send ctrl response inside the read loop. We do not want - // send to block read, but we still want ctrl response we sent to - // be serialized. Thus we use a buffered chan to solve the problem. - // A small buffer should be OK for most cases, since we expect the - // ctrl requests are infrequent. - ctrlStreamBufLen = 16 -) +// We send ctrl response inside the read loop. We do not want +// send to block read, but we still want ctrl response we sent to +// be serialized. Thus we use a buffered chan to solve the problem. +// A small buffer should be OK for most cases, since we expect the +// ctrl requests are infrequent. +const ctrlStreamBufLen = 16 // serverWatchStream is an etcd server side stream. It receives requests // from client side gRPC stream. It receives watch events from mvcc.WatchStream, @@ -362,7 +363,7 @@ func (sws *serverWatchStream) sendLoop() { Canceled: canceled, } - if _, hasId := ids[wresp.WatchID]; !hasId { + if _, okID := ids[wresp.WatchID]; !okID { // buffer if id not yet announced wrs := append(pending[wresp.WatchID], wr) pending[wresp.WatchID] = wrs @@ -446,6 +447,7 @@ func (sws *serverWatchStream) sendLoop() { } delete(pending, wid) } + case <-progressTicker.C: sws.mu.Lock() for id, ok := range sws.progress { @@ -455,6 +457,7 @@ func (sws *serverWatchStream) sendLoop() { sws.progress[id] = true } sws.mu.Unlock() + case <-sws.closec: return } @@ -484,6 +487,7 @@ func filterNoPut(e mvccpb.Event) bool { return e.Type == mvccpb.PUT } +// FiltersFromRequest returns "mvcc.FilterFunc" from a given watch create request. func FiltersFromRequest(creq *pb.WatchCreateRequest) []mvcc.FilterFunc { filters := make([]mvcc.FilterFunc, 0, len(creq.Filters)) for _, ft := range creq.Filters {