storage: use creation revision to compute txn event types

Fixes #4688
This commit is contained in:
Anthony Romano 2016-03-05 12:53:27 -08:00
parent e93e41cd9a
commit 713f7c056f
2 changed files with 48 additions and 2 deletions

View File

@ -411,6 +411,52 @@ func TestV3WatchCurrentPutOverlap(t *testing.T) {
}
}
// TestV3WatchEmptyKey ensures synced watchers see empty key PUTs as PUT events
func TestV3WatchEmptyKey(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
ws, werr := toGRPC(clus.RandClient()).Watch.Watch(ctx)
if werr != nil {
t.Fatal(werr)
}
req := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
CreateRequest: &pb.WatchCreateRequest{
Key: []byte("foo")}}}
if err := ws.Send(req); err != nil {
t.Fatal(err)
}
if _, err := ws.Recv(); err != nil {
t.Fatal(err)
}
// put a key with empty value
kvc := toGRPC(clus.RandClient()).KV
preq := &pb.PutRequest{Key: []byte("foo")}
if _, err := kvc.Put(context.TODO(), preq); err != nil {
t.Fatal(err)
}
// check received PUT
resp, rerr := ws.Recv()
if rerr != nil {
t.Fatal(rerr)
}
wevs := []*storagepb.Event{
{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{Key: []byte("foo"), CreateRevision: 2, ModRevision: 2, Version: 1},
},
}
if !reflect.DeepEqual(resp.Events, wevs) {
t.Fatalf("got %v, expected %v", resp.Events, wevs)
}
clus.Terminate(t)
}
func TestV3WatchMultipleWatchersSynced(t *testing.T) {
defer testutil.AfterTest(t)
testV3WatchMultipleWatchers(t, 0)

View File

@ -138,8 +138,8 @@ func (s *watchableStore) TxnEnd(txnID int64) error {
rev := s.store.Rev()
evs := make([]storagepb.Event, len(changes))
for i, change := range changes {
switch change.Value {
case nil:
switch change.CreateRevision {
case 0:
evs[i] = storagepb.Event{
Type: storagepb.DELETE,
Kv: &changes[i]}