Merge pull request #8347 from heyitsanthony/use-from-grpc-md

clientv3: use FromOutgoingContext to bucket watches
This commit is contained in:
Anthony Romano 2017-08-01 17:05:56 -07:00 committed by GitHub
commit d543870966
2 changed files with 24 additions and 14 deletions

View File

@ -30,6 +30,7 @@ import (
"github.com/coreos/etcd/pkg/testutil"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
type watcherTest func(*testing.T, *watchctx)
@ -875,8 +876,10 @@ func TestWatchCancelOnServer(t *testing.T) {
cancels := make([]context.CancelFunc, numWatches)
for i := 0; i < numWatches; i++ {
// use WithTimeout to force separate streams in client
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// force separate streams in client
md := metadata.Pairs("some-key", fmt.Sprintf("%d", i))
mctx := metadata.NewOutgoingContext(context.Background(), md)
ctx, cancel := context.WithCancel(mctx)
cancels[i] = cancel
w := client.Watch(ctx, fmt.Sprintf("%d", i), clientv3.WithCreatedNotify())
<-w
@ -933,12 +936,12 @@ func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3))
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
// each unique context "%v" has a unique grpc stream
n := 100
ctxs, ctxc := make([]context.Context, 5), make([]chan struct{}, 5)
for i := range ctxs {
// make "%v" unique
ctxs[i] = context.WithValue(context.TODO(), "key", i)
// make unique stream
md := metadata.Pairs("some-key", fmt.Sprintf("%d", i))
ctxs[i] = metadata.NewOutgoingContext(context.Background(), md)
// limits the maximum number of outstanding watchers per stream
ctxc[i] = make(chan struct{}, 2)
}

View File

@ -25,6 +25,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
)
const (
@ -213,16 +214,15 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
owner: w,
remote: w.remote,
ctx: ctx,
ctxKey: fmt.Sprintf("%v", inctx),
ctxKey: streamKeyFromCtx(inctx),
cancel: cancel,
substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
resumec: make(chan struct{}),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
resumec: make(chan struct{}),
}
go wgs.run()
return wgs
@ -253,7 +253,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
}
ok := false
ctxKey := fmt.Sprintf("%v", ctx)
ctxKey := streamKeyFromCtx(ctx)
// find or allocate appropriate grpc watch stream
w.mu.Lock()
@ -794,3 +794,10 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
return &pb.WatchRequest{RequestUnion: cr}
}
func streamKeyFromCtx(ctx context.Context) string {
if md, ok := metadata.FromOutgoingContext(ctx); ok {
return fmt.Sprintf("%+v", md)
}
return ""
}