mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #6698 from heyitsanthony/session-close
concurrency: terminate session.Close if revoke takes longer than TTL
This commit is contained in:
commit
ae99c91903
@ -15,6 +15,8 @@
|
|||||||
package concurrency
|
package concurrency
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
v3 "github.com/coreos/etcd/clientv3"
|
v3 "github.com/coreos/etcd/clientv3"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
@ -25,6 +27,7 @@ const defaultSessionTTL = 60
|
|||||||
// Fault-tolerant applications may use sessions to reason about liveness.
|
// Fault-tolerant applications may use sessions to reason about liveness.
|
||||||
type Session struct {
|
type Session struct {
|
||||||
client *v3.Client
|
client *v3.Client
|
||||||
|
opts *sessionOptions
|
||||||
id v3.LeaseID
|
id v3.LeaseID
|
||||||
|
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
@ -33,25 +36,25 @@ type Session struct {
|
|||||||
|
|
||||||
// NewSession gets the leased session for a client.
|
// NewSession gets the leased session for a client.
|
||||||
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
|
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
|
||||||
ops := &sessionOptions{ttl: defaultSessionTTL}
|
ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
opt(ops)
|
opt(ops)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := client.Grant(client.Ctx(), int64(ops.ttl))
|
resp, err := client.Grant(ops.ctx, int64(ops.ttl))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
id := v3.LeaseID(resp.ID)
|
id := v3.LeaseID(resp.ID)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(client.Ctx())
|
ctx, cancel := context.WithCancel(ops.ctx)
|
||||||
keepAlive, err := client.KeepAlive(ctx, id)
|
keepAlive, err := client.KeepAlive(ctx, id)
|
||||||
if err != nil || keepAlive == nil {
|
if err != nil || keepAlive == nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
donec := make(chan struct{})
|
donec := make(chan struct{})
|
||||||
s := &Session{client: client, id: id, cancel: cancel, donec: donec}
|
s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}
|
||||||
|
|
||||||
// keep the lease alive until client error or cancelled context
|
// keep the lease alive until client error or cancelled context
|
||||||
go func() {
|
go func() {
|
||||||
@ -87,12 +90,16 @@ func (s *Session) Orphan() {
|
|||||||
// Close orphans the session and revokes the session lease.
|
// Close orphans the session and revokes the session lease.
|
||||||
func (s *Session) Close() error {
|
func (s *Session) Close() error {
|
||||||
s.Orphan()
|
s.Orphan()
|
||||||
_, err := s.client.Revoke(s.client.Ctx(), s.id)
|
// if revoke takes longer than the ttl, lease is expired anyway
|
||||||
|
ctx, cancel := context.WithTimeout(s.opts.ctx, time.Duration(s.opts.ttl)*time.Second)
|
||||||
|
_, err := s.client.Revoke(ctx, s.id)
|
||||||
|
cancel()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
type sessionOptions struct {
|
type sessionOptions struct {
|
||||||
ttl int
|
ttl int
|
||||||
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
// SessionOption configures Session.
|
// SessionOption configures Session.
|
||||||
@ -107,3 +114,14 @@ func WithTTL(ttl int) SessionOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithContext assigns a context to the session instead of defaulting to
|
||||||
|
// using the client context. This is useful for canceling NewSession and
|
||||||
|
// Close operations immediately without having to close the client. If the
|
||||||
|
// context is canceled before Close() completes, the session's lease will be
|
||||||
|
// abandoned and left to expire instead of being revoked.
|
||||||
|
func WithContext(ctx context.Context) SessionOption {
|
||||||
|
return func(so *sessionOptions) {
|
||||||
|
so.ctx = ctx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user