diff --git a/clientv3/concurrency/session.go b/clientv3/concurrency/session.go index 59267df50..0cb5ea7cf 100644 --- a/clientv3/concurrency/session.go +++ b/clientv3/concurrency/session.go @@ -41,11 +41,14 @@ func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) { opt(ops) } - resp, err := client.Grant(ops.ctx, int64(ops.ttl)) - if err != nil { - return nil, err + id := ops.leaseID + if id == v3.NoLease { + resp, err := client.Grant(ops.ctx, int64(ops.ttl)) + if err != nil { + return nil, err + } + id = v3.LeaseID(resp.ID) } - id := v3.LeaseID(resp.ID) ctx, cancel := context.WithCancel(ops.ctx) keepAlive, err := client.KeepAlive(ctx, id) @@ -98,8 +101,9 @@ func (s *Session) Close() error { } type sessionOptions struct { - ttl int - ctx context.Context + ttl int + leaseID v3.LeaseID + ctx context.Context } // SessionOption configures Session. @@ -115,6 +119,15 @@ func WithTTL(ttl int) SessionOption { } } +// WithLease specifies the existing leaseID to be used for the session. +// This is useful in process restart scenario, for example, to reclaim +// leadership from an election prior to restart. +func WithLease(leaseID v3.LeaseID) SessionOption { + return func(so *sessionOptions) { + so.leaseID = leaseID + } +} + // WithContext assigns a context to the session instead of defaulting to // using the client context. This is useful for canceling NewSession and // Close operations immediately without having to close the client. If the diff --git a/integration/v3_election_test.go b/integration/v3_election_test.go index 9c6e53010..0eb5eae93 100644 --- a/integration/v3_election_test.go +++ b/integration/v3_election_test.go @@ -225,3 +225,50 @@ func TestElectionOnPrefixOfExistingKey(t *testing.T) { t.Fatal(err) } } + +// TestElectionOnSessionRestart tests that a quick restart of leader (resulting +// in a new session with the same lease id) does not result in loss of +// leadership. +func TestElectionOnSessionRestart(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + cli := clus.RandClient() + + session, err := concurrency.NewSession(cli) + if err != nil { + t.Fatal(err) + } + + e := concurrency.NewElection(session, "test-elect") + if cerr := e.Campaign(context.TODO(), "abc"); cerr != nil { + t.Fatal(cerr) + } + + // ensure leader is not lost to waiter on fail-over + waitSession, werr := concurrency.NewSession(cli) + if werr != nil { + t.Fatal(werr) + } + defer waitSession.Orphan() + waitCtx, waitCancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer waitCancel() + go concurrency.NewElection(waitSession, "test-elect").Campaign(waitCtx, "123") + + // simulate restart by reusing the lease from the old session + newSession, nerr := concurrency.NewSession(cli, concurrency.WithLease(session.Lease())) + if nerr != nil { + t.Fatal(nerr) + } + defer newSession.Orphan() + + newElection := concurrency.NewElection(newSession, "test-elect") + if ncerr := newElection.Campaign(context.TODO(), "def"); ncerr != nil { + t.Fatal(ncerr) + } + + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + if resp := <-newElection.Observe(ctx); len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) != "def" { + t.Errorf("expected value=%q, got response %v", "def", resp) + } +}