From e6a789d54177807a9f90e4ec90b6c098c16cefc8 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 17 Apr 2017 13:37:12 -0700 Subject: [PATCH 1/2] integration: permit dropping intermediate leader values on observe Weaken TestV3ElectionObserve so it only checks that it observes a strictly monotonically ascending leader transition sequence following the first observed leader. First, the Observe will issue the leader channel before getting a response for its first get; the election revision is only bound after returning the channel. So, Observe can't be expected to always return the leader at the time it was started. Second, Observe fetches the current leader based on its create revision, but begins watching on its ModRevision; this is important so that elections still work in case the leader issues proclamations following a compaction that exceeds its creation revision. So, Observe can't be expected to return the entire proclamation sequence for a single leader. Fixes #7749 --- integration/v3election_grpc_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/integration/v3election_grpc_test.go b/integration/v3election_grpc_test.go index f92c41d30..b573b6e12 100644 --- a/integration/v3election_grpc_test.go +++ b/integration/v3election_grpc_test.go @@ -96,7 +96,7 @@ func TestV3ElectionObserve(t *testing.T) { lc := epb.NewElectionClient(clus.Client(0).ActiveConnection()) - // observe 10 leadership events + // observe leadership events observec := make(chan struct{}) go func() { defer close(observec) @@ -110,9 +110,13 @@ func TestV3ElectionObserve(t *testing.T) { if rerr != nil { t.Fatal(rerr) } - if string(resp.Kv.Value) != fmt.Sprintf("%d", i) { - t.Fatalf(`got observe value %q, expected "%d"`, string(resp.Kv.Value), i) + respV := 0 + fmt.Sscanf(string(resp.Kv.Value), "%d", &respV) + // leader transitions should not go backwards + if respV < i { + t.Fatalf(`got observe value %q, expected >= "%d"`, string(resp.Kv.Value), i) } + i = respV } }() From c1300c81b3272728c983839f7ef7da28e0fbc5be Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 21 Apr 2017 16:19:43 -0700 Subject: [PATCH 2/2] concurrency: clarify Observe semantics; only fetches subsequence --- clientv3/concurrency/election.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/clientv3/concurrency/election.go b/clientv3/concurrency/election.go index 90c7c667c..257cc78af 100644 --- a/clientv3/concurrency/election.go +++ b/clientv3/concurrency/election.go @@ -148,9 +148,13 @@ func (e *Election) Leader(ctx context.Context) (*v3.GetResponse, error) { return resp, nil } -// Observe returns a channel that observes all leader proposal values as -// GetResponse values on the current leader key. The channel closes when -// the context is cancelled or the underlying watcher is otherwise disrupted. +// Observe returns a channel that reliably observes ordered leader proposals +// as GetResponse values on every current elected leader key. It will not +// necessarily fetch all historical leader updates, but will always post the +// most recent leader value. +// +// The channel closes when the context is canceled or the underlying watcher +// is otherwise disrupted. func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse { retc := make(chan v3.GetResponse) go e.observe(ctx, retc)