From c586218ec6f3d53e6504b7e138c49c91b1610043 Mon Sep 17 00:00:00 2001 From: Ravi Gadde Date: Sat, 28 Jan 2017 11:34:26 -0800 Subject: [PATCH] clientv3: start a session with existing lease This change is needed to handle process restarts with elections. When the leader process is restarted, it should be able to hang on to the leadership by using the existing lease. Fixes #7166 --- clientv3/concurrency/session.go | 25 +++++++++++++----- integration/v3_election_test.go | 47 +++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/clientv3/concurrency/session.go b/clientv3/concurrency/session.go index 59267df50..0cb5ea7cf 100644 --- a/clientv3/concurrency/session.go +++ b/clientv3/concurrency/session.go @@ -41,11 +41,14 @@ func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) { opt(ops) } - resp, err := client.Grant(ops.ctx, int64(ops.ttl)) - if err != nil { - return nil, err + id := ops.leaseID + if id == v3.NoLease { + resp, err := client.Grant(ops.ctx, int64(ops.ttl)) + if err != nil { + return nil, err + } + id = v3.LeaseID(resp.ID) } - id := v3.LeaseID(resp.ID) ctx, cancel := context.WithCancel(ops.ctx) keepAlive, err := client.KeepAlive(ctx, id) @@ -98,8 +101,9 @@ func (s *Session) Close() error { } type sessionOptions struct { - ttl int - ctx context.Context + ttl int + leaseID v3.LeaseID + ctx context.Context } // SessionOption configures Session. @@ -115,6 +119,15 @@ func WithTTL(ttl int) SessionOption { } } +// WithLease specifies the existing leaseID to be used for the session. +// This is useful in process restart scenario, for example, to reclaim +// leadership from an election prior to restart. +func WithLease(leaseID v3.LeaseID) SessionOption { + return func(so *sessionOptions) { + so.leaseID = leaseID + } +} + // WithContext assigns a context to the session instead of defaulting to // using the client context. This is useful for canceling NewSession and // Close operations immediately without having to close the client. If the diff --git a/integration/v3_election_test.go b/integration/v3_election_test.go index 9c6e53010..0eb5eae93 100644 --- a/integration/v3_election_test.go +++ b/integration/v3_election_test.go @@ -225,3 +225,50 @@ func TestElectionOnPrefixOfExistingKey(t *testing.T) { t.Fatal(err) } } + +// TestElectionOnSessionRestart tests that a quick restart of leader (resulting +// in a new session with the same lease id) does not result in loss of +// leadership. +func TestElectionOnSessionRestart(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + cli := clus.RandClient() + + session, err := concurrency.NewSession(cli) + if err != nil { + t.Fatal(err) + } + + e := concurrency.NewElection(session, "test-elect") + if cerr := e.Campaign(context.TODO(), "abc"); cerr != nil { + t.Fatal(cerr) + } + + // ensure leader is not lost to waiter on fail-over + waitSession, werr := concurrency.NewSession(cli) + if werr != nil { + t.Fatal(werr) + } + defer waitSession.Orphan() + waitCtx, waitCancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer waitCancel() + go concurrency.NewElection(waitSession, "test-elect").Campaign(waitCtx, "123") + + // simulate restart by reusing the lease from the old session + newSession, nerr := concurrency.NewSession(cli, concurrency.WithLease(session.Lease())) + if nerr != nil { + t.Fatal(nerr) + } + defer newSession.Orphan() + + newElection := concurrency.NewElection(newSession, "test-elect") + if ncerr := newElection.Campaign(context.TODO(), "def"); ncerr != nil { + t.Fatal(ncerr) + } + + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + if resp := <-newElection.Observe(ctx); len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) != "def" { + t.Errorf("expected value=%q, got response %v", "def", resp) + } +}