diff --git a/clientv3/concurrency/election.go b/clientv3/concurrency/election.go index 90c7c667c..257cc78af 100644 --- a/clientv3/concurrency/election.go +++ b/clientv3/concurrency/election.go @@ -148,9 +148,13 @@ func (e *Election) Leader(ctx context.Context) (*v3.GetResponse, error) { return resp, nil } -// Observe returns a channel that observes all leader proposal values as -// GetResponse values on the current leader key. The channel closes when -// the context is cancelled or the underlying watcher is otherwise disrupted. +// Observe returns a channel that reliably observes ordered leader proposals +// as GetResponse values on every current elected leader key. It will not +// necessarily fetch all historical leader updates, but will always post the +// most recent leader value. +// +// The channel closes when the context is canceled or the underlying watcher +// is otherwise disrupted. func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse { retc := make(chan v3.GetResponse) go e.observe(ctx, retc) diff --git a/integration/v3election_grpc_test.go b/integration/v3election_grpc_test.go index f92c41d30..b573b6e12 100644 --- a/integration/v3election_grpc_test.go +++ b/integration/v3election_grpc_test.go @@ -96,7 +96,7 @@ func TestV3ElectionObserve(t *testing.T) { lc := epb.NewElectionClient(clus.Client(0).ActiveConnection()) - // observe 10 leadership events + // observe leadership events observec := make(chan struct{}) go func() { defer close(observec) @@ -110,9 +110,13 @@ func TestV3ElectionObserve(t *testing.T) { if rerr != nil { t.Fatal(rerr) } - if string(resp.Kv.Value) != fmt.Sprintf("%d", i) { - t.Fatalf(`got observe value %q, expected "%d"`, string(resp.Kv.Value), i) + respV := 0 + fmt.Sscanf(string(resp.Kv.Value), "%d", &respV) + // leader transitions should not go backwards + if respV < i { + t.Fatalf(`got observe value %q, expected >= "%d"`, string(resp.Kv.Value), i) } + i = respV } }()