mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
clientv3: fix retry/streamer error message
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
This commit is contained in:
parent
a447d51f23
commit
4388404f56
@ -113,10 +113,9 @@ func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOp
|
|||||||
return nil, status.Errorf(codes.Unimplemented, "clientv3/retry_interceptor: cannot retry on ClientStreams, set Disable()")
|
return nil, status.Errorf(codes.Unimplemented, "clientv3/retry_interceptor: cannot retry on ClientStreams, set Disable()")
|
||||||
}
|
}
|
||||||
newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...)
|
newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...)
|
||||||
logger.Warn("retry stream intercept", zap.Error(err))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO(mwitkow): Maybe dial and transport errors should be retriable?
|
logger.Error("streamer failed to create ClientStream", zap.Error(err))
|
||||||
return nil, err
|
return nil, err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
|
||||||
}
|
}
|
||||||
retryingStreamer := &serverStreamingRetryingStream{
|
retryingStreamer := &serverStreamingRetryingStream{
|
||||||
client: c,
|
client: c,
|
||||||
@ -185,6 +184,7 @@ func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error {
|
|||||||
if !attemptRetry {
|
if !attemptRetry {
|
||||||
return lastErr // success or hard failure
|
return lastErr // success or hard failure
|
||||||
}
|
}
|
||||||
|
|
||||||
// We start off from attempt 1, because zeroth was already made on normal SendMsg().
|
// We start off from attempt 1, because zeroth was already made on normal SendMsg().
|
||||||
for attempt := uint(1); attempt < s.callOpts.max; attempt++ {
|
for attempt := uint(1); attempt < s.callOpts.max; attempt++ {
|
||||||
if err := waitRetryBackoff(s.ctx, attempt, s.callOpts); err != nil {
|
if err := waitRetryBackoff(s.ctx, attempt, s.callOpts); err != nil {
|
||||||
@ -192,12 +192,13 @@ func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error {
|
|||||||
}
|
}
|
||||||
newStream, err := s.reestablishStreamAndResendBuffer(s.ctx)
|
newStream, err := s.reestablishStreamAndResendBuffer(s.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO(mwitkow): Maybe dial and transport errors should be retriable?
|
s.client.lg.Error("failed reestablishStreamAndResendBuffer", zap.Error(err))
|
||||||
return err
|
return err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
|
||||||
}
|
}
|
||||||
s.setStream(newStream)
|
s.setStream(newStream)
|
||||||
|
|
||||||
|
s.client.lg.Warn("retrying RecvMsg", zap.Error(lastErr))
|
||||||
attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m)
|
attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m)
|
||||||
//fmt.Printf("Received message and indicate: %v %v\n", attemptRetry, lastErr)
|
|
||||||
if !attemptRetry {
|
if !attemptRetry {
|
||||||
return lastErr
|
return lastErr
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user