mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
*: avoid closing a watch with ID 0 incorrectly
Signed-off-by: Kafuu Chino <KafuuChinoQ@gmail.com> add test 1 1 1
This commit is contained in:
parent
91365174b3
commit
ed10ca13f4
@ -38,6 +38,13 @@ const (
|
|||||||
EventTypePut = mvccpb.PUT
|
EventTypePut = mvccpb.PUT
|
||||||
|
|
||||||
closeSendErrTimeout = 250 * time.Millisecond
|
closeSendErrTimeout = 250 * time.Millisecond
|
||||||
|
|
||||||
|
// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
|
||||||
|
// user-provided ID is available. If pass, an ID will automatically be assigned.
|
||||||
|
AutoWatchID = 0
|
||||||
|
|
||||||
|
// InvalidWatchID represents an invalid watch ID and prevents duplication with an existing watch.
|
||||||
|
InvalidWatchID = -1
|
||||||
)
|
)
|
||||||
|
|
||||||
type Event mvccpb.Event
|
type Event mvccpb.Event
|
||||||
@ -444,7 +451,7 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) {
|
|||||||
|
|
||||||
func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
|
func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
|
||||||
// check watch ID for backward compatibility (<= v3.3)
|
// check watch ID for backward compatibility (<= v3.3)
|
||||||
if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
|
if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") {
|
||||||
w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
|
w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
|
||||||
// failed; no channel
|
// failed; no channel
|
||||||
close(ws.recvc)
|
close(ws.recvc)
|
||||||
@ -475,7 +482,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
|
|||||||
} else if ws.outc != nil {
|
} else if ws.outc != nil {
|
||||||
close(ws.outc)
|
close(ws.outc)
|
||||||
}
|
}
|
||||||
if ws.id != -1 {
|
if ws.id != InvalidWatchID {
|
||||||
delete(w.substreams, ws.id)
|
delete(w.substreams, ws.id)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -537,7 +544,7 @@ func (w *watchGrpcStream) run() {
|
|||||||
// TODO: pass custom watch ID?
|
// TODO: pass custom watch ID?
|
||||||
ws := &watcherStream{
|
ws := &watcherStream{
|
||||||
initReq: *wreq,
|
initReq: *wreq,
|
||||||
id: -1,
|
id: InvalidWatchID,
|
||||||
outc: outc,
|
outc: outc,
|
||||||
// unbuffered so resumes won't cause repeat events
|
// unbuffered so resumes won't cause repeat events
|
||||||
recvc: make(chan *WatchResponse),
|
recvc: make(chan *WatchResponse),
|
||||||
@ -687,7 +694,7 @@ func (w *watchGrpcStream) run() {
|
|||||||
return
|
return
|
||||||
|
|
||||||
case ws := <-w.closingc:
|
case ws := <-w.closingc:
|
||||||
if ws.id != -1 {
|
if ws.id != InvalidWatchID {
|
||||||
// client is closing an established watch; close it on the server proactively instead of waiting
|
// client is closing an established watch; close it on the server proactively instead of waiting
|
||||||
// to close when the next message arrives
|
// to close when the next message arrives
|
||||||
cancelSet[ws.id] = struct{}{}
|
cancelSet[ws.id] = struct{}{}
|
||||||
@ -749,9 +756,9 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
|
|||||||
cancelReason: pbresp.CancelReason,
|
cancelReason: pbresp.CancelReason,
|
||||||
}
|
}
|
||||||
|
|
||||||
// watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to
|
// watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of InvalidWatchID to
|
||||||
// indicate they should be broadcast.
|
// indicate they should be broadcast.
|
||||||
if wr.IsProgressNotify() && pbresp.WatchId == -1 {
|
if wr.IsProgressNotify() && pbresp.WatchId == InvalidWatchID {
|
||||||
return w.broadcastResponse(wr)
|
return w.broadcastResponse(wr)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -906,7 +913,7 @@ func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
|||||||
w.resumec = make(chan struct{})
|
w.resumec = make(chan struct{})
|
||||||
w.joinSubstreams()
|
w.joinSubstreams()
|
||||||
for _, ws := range w.substreams {
|
for _, ws := range w.substreams {
|
||||||
ws.id = -1
|
ws.id = InvalidWatchID
|
||||||
w.resuming = append(w.resuming, ws)
|
w.resuming = append(w.resuming, ws)
|
||||||
}
|
}
|
||||||
// strip out nils, if any
|
// strip out nils, if any
|
||||||
|
@ -16,12 +16,14 @@ package v3rpc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.etcd.io/etcd/auth"
|
"go.etcd.io/etcd/auth"
|
||||||
|
"go.etcd.io/etcd/clientv3"
|
||||||
"go.etcd.io/etcd/etcdserver"
|
"go.etcd.io/etcd/etcdserver"
|
||||||
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||||
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
||||||
@ -294,7 +296,7 @@ func (sws *serverWatchStream) recvLoop() error {
|
|||||||
|
|
||||||
wr := &pb.WatchResponse{
|
wr := &pb.WatchResponse{
|
||||||
Header: sws.newResponseHeader(sws.watchStream.Rev()),
|
Header: sws.newResponseHeader(sws.watchStream.Rev()),
|
||||||
WatchId: creq.WatchId,
|
WatchId: clientv3.InvalidWatchID,
|
||||||
Canceled: true,
|
Canceled: true,
|
||||||
Created: true,
|
Created: true,
|
||||||
CancelReason: cancelReason,
|
CancelReason: cancelReason,
|
||||||
@ -328,7 +330,10 @@ func (sws *serverWatchStream) recvLoop() error {
|
|||||||
sws.fragment[id] = true
|
sws.fragment[id] = true
|
||||||
}
|
}
|
||||||
sws.mu.Unlock()
|
sws.mu.Unlock()
|
||||||
|
} else {
|
||||||
|
id = clientv3.InvalidWatchID
|
||||||
}
|
}
|
||||||
|
|
||||||
wr := &pb.WatchResponse{
|
wr := &pb.WatchResponse{
|
||||||
Header: sws.newResponseHeader(wsrev),
|
Header: sws.newResponseHeader(wsrev),
|
||||||
WatchId: int64(id),
|
WatchId: int64(id),
|
||||||
@ -365,7 +370,7 @@ func (sws *serverWatchStream) recvLoop() error {
|
|||||||
if uv.ProgressRequest != nil {
|
if uv.ProgressRequest != nil {
|
||||||
sws.ctrlStream <- &pb.WatchResponse{
|
sws.ctrlStream <- &pb.WatchResponse{
|
||||||
Header: sws.newResponseHeader(sws.watchStream.Rev()),
|
Header: sws.newResponseHeader(sws.watchStream.Rev()),
|
||||||
WatchId: -1, // response is not associated with any WatchId and will be broadcast to all watch channels
|
WatchId: clientv3.InvalidWatchID, // response is not associated with any WatchId and will be broadcast to all watch channels
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
@ -504,7 +509,12 @@ func (sws *serverWatchStream) sendLoop() {
|
|||||||
|
|
||||||
// track id creation
|
// track id creation
|
||||||
wid := mvcc.WatchID(c.WatchId)
|
wid := mvcc.WatchID(c.WatchId)
|
||||||
if c.Canceled {
|
|
||||||
|
if !(!(c.Canceled && c.Created) || wid == clientv3.InvalidWatchID) {
|
||||||
|
panic(fmt.Sprintf("unexpected watchId: %d, wanted: %d, since both 'Canceled' and 'Created' are true", wid, clientv3.InvalidWatchID))
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.Canceled && wid != clientv3.InvalidWatchID {
|
||||||
delete(ids, wid)
|
delete(ids, wid)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -504,3 +504,58 @@ func TestV3AuthWatchAndTokenExpire(t *testing.T) {
|
|||||||
watchResponse = <-wChan
|
watchResponse = <-wChan
|
||||||
testutil.AssertNil(t, watchResponse.Err())
|
testutil.AssertNil(t, watchResponse.Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestV3AuthWatchErrorAndWatchId0(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
users := []user{
|
||||||
|
{
|
||||||
|
name: "user1",
|
||||||
|
password: "user1-123",
|
||||||
|
role: "role1",
|
||||||
|
key: "k1",
|
||||||
|
end: "k2",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
authSetupUsers(t, toGRPC(clus.Client(0)).Auth, users)
|
||||||
|
|
||||||
|
authSetupRoot(t, toGRPC(clus.Client(0)).Auth)
|
||||||
|
|
||||||
|
c, cerr := clientv3.New(clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "user1", Password: "user1-123"})
|
||||||
|
if cerr != nil {
|
||||||
|
t.Fatal(cerr)
|
||||||
|
}
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
watchStartCh, watchEndCh := make(chan interface{}), make(chan interface{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
wChan := c.Watch(ctx, "k1", clientv3.WithRev(1))
|
||||||
|
watchStartCh <- struct{}{}
|
||||||
|
watchResponse := <-wChan
|
||||||
|
t.Logf("watch response from k1: %v", watchResponse)
|
||||||
|
testutil.AssertTrue(t, len(watchResponse.Events) != 0)
|
||||||
|
watchEndCh <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Chan for making sure that the above goroutine invokes Watch()
|
||||||
|
// So the above Watch() can get watch ID = 0
|
||||||
|
<-watchStartCh
|
||||||
|
|
||||||
|
wChan := c.Watch(ctx, "non-allowed-key", clientv3.WithRev(1))
|
||||||
|
watchResponse := <-wChan
|
||||||
|
testutil.AssertNotNil(t, watchResponse.Err()) // permission denied
|
||||||
|
|
||||||
|
_, err := c.Put(ctx, "k1", "val")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error from Put: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
<-watchEndCh
|
||||||
|
}
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/clientv3"
|
||||||
"go.etcd.io/etcd/etcdserver/api/v3rpc"
|
"go.etcd.io/etcd/etcdserver/api/v3rpc"
|
||||||
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
||||||
"go.etcd.io/etcd/mvcc/mvccpb"
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
@ -396,8 +397,8 @@ func TestV3WatchWrongRange(t *testing.T) {
|
|||||||
if cresp.Canceled != tt.canceled {
|
if cresp.Canceled != tt.canceled {
|
||||||
t.Fatalf("#%d: canceled %v, want %v", i, tt.canceled, cresp.Canceled)
|
t.Fatalf("#%d: canceled %v, want %v", i, tt.canceled, cresp.Canceled)
|
||||||
}
|
}
|
||||||
if tt.canceled && cresp.WatchId != -1 {
|
if tt.canceled && cresp.WatchId != clientv3.InvalidWatchID {
|
||||||
t.Fatalf("#%d: canceled watch ID %d, want -1", i, cresp.WatchId)
|
t.Fatalf("#%d: canceled watch ID %d, want %d", i, cresp.WatchId, clientv3.InvalidWatchID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,13 +19,10 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/clientv3"
|
||||||
"go.etcd.io/etcd/mvcc/mvccpb"
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
|
|
||||||
// user-provided ID is available. If pass, an ID will automatically be assigned.
|
|
||||||
const AutoWatchID WatchID = 0
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
|
ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
|
||||||
ErrEmptyWatcherRange = errors.New("mvcc: watcher range is empty")
|
ErrEmptyWatcherRange = errors.New("mvcc: watcher range is empty")
|
||||||
@ -118,7 +115,7 @@ func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ..
|
|||||||
return -1, ErrEmptyWatcherRange
|
return -1, ErrEmptyWatcherRange
|
||||||
}
|
}
|
||||||
|
|
||||||
if id == AutoWatchID {
|
if id == clientv3.AutoWatchID {
|
||||||
for ws.watchers[ws.nextID] != nil {
|
for ws.watchers[ws.nextID] != nil {
|
||||||
ws.nextID++
|
ws.nextID++
|
||||||
}
|
}
|
||||||
|
@ -233,7 +233,7 @@ func (wps *watchProxyStream) recvLoop() error {
|
|||||||
if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil {
|
if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil {
|
||||||
wps.watchCh <- &pb.WatchResponse{
|
wps.watchCh <- &pb.WatchResponse{
|
||||||
Header: &pb.ResponseHeader{},
|
Header: &pb.ResponseHeader{},
|
||||||
WatchId: -1,
|
WatchId: clientv3.InvalidWatchID,
|
||||||
Created: true,
|
Created: true,
|
||||||
Canceled: true,
|
Canceled: true,
|
||||||
CancelReason: err.Error(),
|
CancelReason: err.Error(),
|
||||||
@ -252,7 +252,7 @@ func (wps *watchProxyStream) recvLoop() error {
|
|||||||
filters: v3rpc.FiltersFromRequest(cr),
|
filters: v3rpc.FiltersFromRequest(cr),
|
||||||
}
|
}
|
||||||
if !w.wr.valid() {
|
if !w.wr.valid() {
|
||||||
w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
|
w.post(&pb.WatchResponse{WatchId: clientv3.InvalidWatchID, Created: true, Canceled: true})
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
wps.nextWatcherID++
|
wps.nextWatcherID++
|
||||||
|
Loading…
x
Reference in New Issue
Block a user