diff --git a/clientv3/concurrency/election.go b/clientv3/concurrency/election.go index 257cc78af..1d75dde3d 100644 --- a/clientv3/concurrency/election.go +++ b/clientv3/concurrency/election.go @@ -165,15 +165,14 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) { client := e.session.Client() defer close(ch) - lastRev := int64(0) for { - opts := append(v3.WithFirstCreate(), v3.WithRev(lastRev)) - resp, err := client.Get(ctx, e.keyPrefix, opts...) + resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...) if err != nil { return } var kv *mvccpb.KeyValue + var hdr *pb.ResponseHeader if len(resp.Kvs) == 0 { cctx, cancel := context.WithCancel(ctx) @@ -189,18 +188,27 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) { // only accept PUTs; a DELETE will make observe() spin for _, ev := range wr.Events { if ev.Type == mvccpb.PUT { - kv = ev.Kv + hdr, kv = &wr.Header, ev.Kv + // may have multiple revs; hdr.rev = the last rev + // set to kv's rev in case batch has multiple PUTs + hdr.Revision = kv.ModRevision break } } } cancel() } else { - kv = resp.Kvs[0] + hdr, kv = resp.Header, resp.Kvs[0] + } + + select { + case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}: + case <-ctx.Done(): + return } cctx, cancel := context.WithCancel(ctx) - wch := client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision)) + wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1)) keyDeleted := false for !keyDeleted { wr, ok := <-wch @@ -209,7 +217,6 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) { } for _, ev := range wr.Events { if ev.Type == mvccpb.DELETE { - lastRev = ev.Kv.ModRevision keyDeleted = true break }