From 5a967eb2a012473c6489afae8d005052b287c840 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 22 Jan 2016 01:40:19 -0800 Subject: [PATCH] storage: publish delete events on lease revocation --- integration/v3_grpc_test.go | 49 +++++++++++++++++++++++++++++++++++++ storage/watchable_store.go | 4 +++ 2 files changed, 53 insertions(+) diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 2bdae66c9..03aae0ac6 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -1057,6 +1057,55 @@ func TestV3LeaseCreateByID(t *testing.T) { } +// TestV3LeaseExpire ensures a key is deleted once a key expires. +func TestV3LeaseExpire(t *testing.T) { + testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error { + // let lease lapse; wait for deleted key + + wAPI := pb.NewWatchClient(clus.RandConn()) + wStream, err := wAPI.Watch(context.TODO()) + if err != nil { + return err + } + + creq := &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 1} + wreq := &pb.WatchRequest{CreateRequest: creq} + if err := wStream.Send(wreq); err != nil { + return err + } + if _, err := wStream.Recv(); err != nil { + // the 'created' message + return err + } + if _, err := wStream.Recv(); err != nil { + // the 'put' message + return err + } + + errc := make(chan error, 1) + go func() { + resp, err := wStream.Recv() + switch { + case err != nil: + errc <- err + case len(resp.Events) != 1: + fallthrough + case resp.Events[0].Type != storagepb.DELETE: + errc <- fmt.Errorf("expected key delete, got %v", resp) + default: + errc <- nil + } + }() + + select { + case <-time.After(15 * time.Second): + return fmt.Errorf("lease expiration too slow") + case err := <-errc: + return err + } + }) +} + // TestV3LeaseKeepAlive ensures keepalive keeps the lease alive. func TestV3LeaseKeepAlive(t *testing.T) { testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error { diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 435d282b7..dde374652 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -68,6 +68,10 @@ func newWatchableStore(b backend.Backend, le lease.Lessor) *watchableStore { synced: make(map[string]map[*watcher]struct{}), stopc: make(chan struct{}), } + if s.le != nil { + // use this store as the deleter so revokes trigger watch events + s.le.SetRangeDeleter(s) + } s.wg.Add(1) go s.syncWatchersLoop() return s