mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #9840 from liggitt/client-hotloop
Backoff on reestablishing watches when Unavailable errors are encountered
This commit is contained in:
commit
410d28c976
@ -527,6 +527,20 @@ func isHaltErr(ctx context.Context, err error) bool {
|
|||||||
return ev.Code() != codes.Unavailable && ev.Code() != codes.Internal
|
return ev.Code() != codes.Unavailable && ev.Code() != codes.Internal
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isUnavailableErr returns true if the given error is an unavailable error
|
||||||
|
func isUnavailableErr(ctx context.Context, err error) bool {
|
||||||
|
if ctx != nil && ctx.Err() != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
ev, _ := status.FromError(err)
|
||||||
|
// Unavailable codes mean the system will be right back.
|
||||||
|
// (e.g., can't connect, lost leader)
|
||||||
|
return ev.Code() == codes.Unavailable
|
||||||
|
}
|
||||||
|
|
||||||
func toErr(ctx context.Context, err error) error {
|
func toErr(ctx context.Context, err error) error {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -830,10 +830,13 @@ func (w *watchGrpcStream) joinSubstreams() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var maxBackoff = 100 * time.Millisecond
|
||||||
|
|
||||||
// openWatchClient retries opening a watch client until success or halt.
|
// openWatchClient retries opening a watch client until success or halt.
|
||||||
// manually retry in case "ws==nil && err==nil"
|
// manually retry in case "ws==nil && err==nil"
|
||||||
// TODO: remove FailFast=false
|
// TODO: remove FailFast=false
|
||||||
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
|
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
|
||||||
|
backoff := time.Millisecond
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-w.ctx.Done():
|
case <-w.ctx.Done():
|
||||||
@ -849,6 +852,17 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
|
|||||||
if isHaltErr(w.ctx, err) {
|
if isHaltErr(w.ctx, err) {
|
||||||
return nil, v3rpc.Error(err)
|
return nil, v3rpc.Error(err)
|
||||||
}
|
}
|
||||||
|
if isUnavailableErr(w.ctx, err) {
|
||||||
|
// retry, but backoff
|
||||||
|
if backoff < maxBackoff {
|
||||||
|
// 25% backoff factor
|
||||||
|
backoff = backoff + backoff/4
|
||||||
|
if backoff > maxBackoff {
|
||||||
|
backoff = maxBackoff
|
||||||
|
}
|
||||||
|
}
|
||||||
|
time.Sleep(backoff)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return ws, nil
|
return ws, nil
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user