clientv3: start a session with existing lease

This change is needed to handle process restarts with elections. When the
leader process is restarted, it should be able to hang on to the leadership
by using the existing lease.

Fixes #7166
This commit is contained in:
Ravi Gadde 2017-01-28 11:34:26 -08:00
parent d2716fc5ae
commit c586218ec6
2 changed files with 66 additions and 6 deletions

View File

@ -41,11 +41,14 @@ func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
opt(ops)
}
resp, err := client.Grant(ops.ctx, int64(ops.ttl))
if err != nil {
return nil, err
id := ops.leaseID
if id == v3.NoLease {
resp, err := client.Grant(ops.ctx, int64(ops.ttl))
if err != nil {
return nil, err
}
id = v3.LeaseID(resp.ID)
}
id := v3.LeaseID(resp.ID)
ctx, cancel := context.WithCancel(ops.ctx)
keepAlive, err := client.KeepAlive(ctx, id)
@ -98,8 +101,9 @@ func (s *Session) Close() error {
}
type sessionOptions struct {
ttl int
ctx context.Context
ttl int
leaseID v3.LeaseID
ctx context.Context
}
// SessionOption configures Session.
@ -115,6 +119,15 @@ func WithTTL(ttl int) SessionOption {
}
}
// WithLease specifies the existing leaseID to be used for the session.
// This is useful in process restart scenario, for example, to reclaim
// leadership from an election prior to restart.
func WithLease(leaseID v3.LeaseID) SessionOption {
return func(so *sessionOptions) {
so.leaseID = leaseID
}
}
// WithContext assigns a context to the session instead of defaulting to
// using the client context. This is useful for canceling NewSession and
// Close operations immediately without having to close the client. If the

View File

@ -225,3 +225,50 @@ func TestElectionOnPrefixOfExistingKey(t *testing.T) {
t.Fatal(err)
}
}
// TestElectionOnSessionRestart tests that a quick restart of leader (resulting
// in a new session with the same lease id) does not result in loss of
// leadership.
func TestElectionOnSessionRestart(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.RandClient()
session, err := concurrency.NewSession(cli)
if err != nil {
t.Fatal(err)
}
e := concurrency.NewElection(session, "test-elect")
if cerr := e.Campaign(context.TODO(), "abc"); cerr != nil {
t.Fatal(cerr)
}
// ensure leader is not lost to waiter on fail-over
waitSession, werr := concurrency.NewSession(cli)
if werr != nil {
t.Fatal(werr)
}
defer waitSession.Orphan()
waitCtx, waitCancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer waitCancel()
go concurrency.NewElection(waitSession, "test-elect").Campaign(waitCtx, "123")
// simulate restart by reusing the lease from the old session
newSession, nerr := concurrency.NewSession(cli, concurrency.WithLease(session.Lease()))
if nerr != nil {
t.Fatal(nerr)
}
defer newSession.Orphan()
newElection := concurrency.NewElection(newSession, "test-elect")
if ncerr := newElection.Campaign(context.TODO(), "def"); ncerr != nil {
t.Fatal(ncerr)
}
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
if resp := <-newElection.Observe(ctx); len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) != "def" {
t.Errorf("expected value=%q, got response %v", "def", resp)
}
}