mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #4682 from heyitsanthony/clientv3-clientctx
clientv3: include a context in Client
This commit is contained in:
commit
ead5d432a3
@ -45,6 +45,9 @@ type Client struct {
|
|||||||
creds *credentials.TransportAuthenticator
|
creds *credentials.TransportAuthenticator
|
||||||
mu sync.RWMutex // protects connection selection and error list
|
mu sync.RWMutex // protects connection selection and error list
|
||||||
errors []error // errors passed to retryConnection
|
errors []error // errors passed to retryConnection
|
||||||
|
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// EndpointDialer is a policy for choosing which endpoint to dial next
|
// EndpointDialer is a policy for choosing which endpoint to dial next
|
||||||
@ -83,11 +86,23 @@ func NewFromURL(url string) (*Client, error) {
|
|||||||
|
|
||||||
// Close shuts down the client's etcd connections.
|
// Close shuts down the client's etcd connections.
|
||||||
func (c *Client) Close() error {
|
func (c *Client) Close() error {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
if c.cancel == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
c.cancel()
|
||||||
|
c.cancel = nil
|
||||||
c.Watcher.Close()
|
c.Watcher.Close()
|
||||||
c.Lease.Close()
|
c.Lease.Close()
|
||||||
return c.conn.Close()
|
return c.conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ctx is a context for "out of band" messages (e.g., for sending
|
||||||
|
// "clean up" message when another context is canceled). It is
|
||||||
|
// canceled on client Close().
|
||||||
|
func (c *Client) Ctx() context.Context { return c.ctx }
|
||||||
|
|
||||||
// Endpoints lists the registered endpoints for the client.
|
// Endpoints lists the registered endpoints for the client.
|
||||||
func (c *Client) Endpoints() []string { return c.cfg.Endpoints }
|
func (c *Client) Endpoints() []string { return c.cfg.Endpoints }
|
||||||
|
|
||||||
@ -145,10 +160,13 @@ func newClient(cfg *Config) (*Client, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
client := &Client{
|
client := &Client{
|
||||||
conn: conn,
|
conn: conn,
|
||||||
cfg: *cfg,
|
cfg: *cfg,
|
||||||
creds: creds,
|
creds: creds,
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
}
|
}
|
||||||
client.Cluster = NewCluster(client)
|
client.Cluster = NewCluster(client)
|
||||||
client.KV = NewKV(client)
|
client.KV = NewKV(client)
|
||||||
@ -173,6 +191,9 @@ func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.Cli
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
c.errors = append(c.errors, err)
|
c.errors = append(c.errors, err)
|
||||||
}
|
}
|
||||||
|
if c.cancel == nil {
|
||||||
|
return nil, c.ctx.Err()
|
||||||
|
}
|
||||||
if oldConn != c.conn {
|
if oldConn != c.conn {
|
||||||
// conn has already been updated
|
// conn has already been updated
|
||||||
return c.conn, nil
|
return c.conn, nil
|
||||||
|
@ -29,7 +29,6 @@ var (
|
|||||||
|
|
||||||
type Election struct {
|
type Election struct {
|
||||||
client *v3.Client
|
client *v3.Client
|
||||||
ctx context.Context
|
|
||||||
|
|
||||||
keyPrefix string
|
keyPrefix string
|
||||||
|
|
||||||
@ -39,8 +38,8 @@ type Election struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewElection returns a new election on a given key prefix.
|
// NewElection returns a new election on a given key prefix.
|
||||||
func NewElection(ctx context.Context, client *v3.Client, pfx string) *Election {
|
func NewElection(client *v3.Client, pfx string) *Election {
|
||||||
return &Election{client: client, ctx: ctx, keyPrefix: pfx}
|
return &Election{client: client, keyPrefix: pfx}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Campaign puts a value as eligible for the election. It blocks until
|
// Campaign puts a value as eligible for the election. It blocks until
|
||||||
@ -60,7 +59,7 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
|
|||||||
// clean up in case of context cancel
|
// clean up in case of context cancel
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
e.client.Delete(e.ctx, k)
|
e.client.Delete(e.client.Ctx(), k)
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
@ -94,7 +93,7 @@ func (e *Election) Resign() (err error) {
|
|||||||
if e.leaderSession == nil {
|
if e.leaderSession == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_, err = e.client.Delete(e.ctx, e.leaderKey)
|
_, err = e.client.Delete(e.client.Ctx(), e.leaderKey)
|
||||||
e.leaderKey = ""
|
e.leaderKey = ""
|
||||||
e.leaderSession = nil
|
e.leaderSession = nil
|
||||||
return err
|
return err
|
||||||
@ -102,7 +101,7 @@ func (e *Election) Resign() (err error) {
|
|||||||
|
|
||||||
// Leader returns the leader value for the current election.
|
// Leader returns the leader value for the current election.
|
||||||
func (e *Election) Leader() (string, error) {
|
func (e *Election) Leader() (string, error) {
|
||||||
resp, err := e.client.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...)
|
resp, err := e.client.Get(e.client.Ctx(), e.keyPrefix, v3.WithFirstCreate()...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
} else if len(resp.Kvs) == 0 {
|
} else if len(resp.Kvs) == 0 {
|
||||||
|
@ -24,15 +24,14 @@ import (
|
|||||||
// Mutex implements the sync Locker interface with etcd
|
// Mutex implements the sync Locker interface with etcd
|
||||||
type Mutex struct {
|
type Mutex struct {
|
||||||
client *v3.Client
|
client *v3.Client
|
||||||
ctx context.Context
|
|
||||||
|
|
||||||
pfx string
|
pfx string
|
||||||
myKey string
|
myKey string
|
||||||
myRev int64
|
myRev int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMutex(ctx context.Context, client *v3.Client, pfx string) *Mutex {
|
func NewMutex(client *v3.Client, pfx string) *Mutex {
|
||||||
return &Mutex{client, ctx, pfx, "", -1}
|
return &Mutex{client, pfx, "", -1}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock locks the mutex with a cancellable context. If the context is cancelled
|
// Lock locks the mutex with a cancellable context. If the context is cancelled
|
||||||
@ -56,7 +55,7 @@ func (m *Mutex) Lock(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Mutex) Unlock() error {
|
func (m *Mutex) Unlock() error {
|
||||||
if _, err := m.client.Delete(m.ctx, m.myKey); err != nil {
|
if _, err := m.client.Delete(m.client.Ctx(), m.myKey); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
m.myKey = "\x00"
|
m.myKey = "\x00"
|
||||||
@ -73,7 +72,7 @@ func (m *Mutex) Key() string { return m.myKey }
|
|||||||
type lockerMutex struct{ *Mutex }
|
type lockerMutex struct{ *Mutex }
|
||||||
|
|
||||||
func (lm *lockerMutex) Lock() {
|
func (lm *lockerMutex) Lock() {
|
||||||
if err := lm.Mutex.Lock(lm.ctx); err != nil {
|
if err := lm.Mutex.Lock(lm.client.Ctx()); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -84,6 +83,6 @@ func (lm *lockerMutex) Unlock() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewLocker creates a sync.Locker backed by an etcd mutex.
|
// NewLocker creates a sync.Locker backed by an etcd mutex.
|
||||||
func NewLocker(ctx context.Context, client *v3.Client, pfx string) sync.Locker {
|
func NewLocker(client *v3.Client, pfx string) sync.Locker {
|
||||||
return &lockerMutex{NewMutex(ctx, client, pfx)}
|
return &lockerMutex{NewMutex(client, pfx)}
|
||||||
}
|
}
|
||||||
|
@ -49,13 +49,13 @@ func NewSession(client *v3.Client) (*Session, error) {
|
|||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := client.Create(context.TODO(), sessionTTL)
|
resp, err := client.Create(client.Ctx(), sessionTTL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
id := lease.LeaseID(resp.ID)
|
id := lease.LeaseID(resp.ID)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(client.Ctx())
|
||||||
keepAlive, err := client.KeepAlive(ctx, id)
|
keepAlive, err := client.KeepAlive(ctx, id)
|
||||||
if err != nil || keepAlive == nil {
|
if err != nil || keepAlive == nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -99,6 +99,6 @@ func (s *Session) Orphan() {
|
|||||||
// Close orphans the session and revokes the session lease.
|
// Close orphans the session and revokes the session lease.
|
||||||
func (s *Session) Close() error {
|
func (s *Session) Close() error {
|
||||||
s.Orphan()
|
s.Orphan()
|
||||||
_, err := s.client.Revoke(context.TODO(), s.id)
|
_, err := s.client.Revoke(s.client.Ctx(), s.id)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -64,7 +64,7 @@ func electCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func observe(c *clientv3.Client, election string) error {
|
func observe(c *clientv3.Client, election string) error {
|
||||||
e := concurrency.NewElection(context.TODO(), c, election)
|
e := concurrency.NewElection(c, election)
|
||||||
ctx, cancel := context.WithCancel(context.TODO())
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
|
|
||||||
donec := make(chan struct{})
|
donec := make(chan struct{})
|
||||||
@ -94,7 +94,7 @@ func observe(c *clientv3.Client, election string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func campaign(c *clientv3.Client, election string, prop string) error {
|
func campaign(c *clientv3.Client, election string, prop string) error {
|
||||||
e := concurrency.NewElection(context.TODO(), c, election)
|
e := concurrency.NewElection(c, election)
|
||||||
ctx, cancel := context.WithCancel(context.TODO())
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
|
|
||||||
donec := make(chan struct{})
|
donec := make(chan struct{})
|
||||||
|
@ -46,7 +46,7 @@ func lockCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func lockUntilSignal(c *clientv3.Client, lockname string) error {
|
func lockUntilSignal(c *clientv3.Client, lockname string) error {
|
||||||
m := concurrency.NewMutex(context.TODO(), c, lockname)
|
m := concurrency.NewMutex(c, lockname)
|
||||||
ctx, cancel := context.WithCancel(context.TODO())
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
|
|
||||||
// unlock in case of ordinary shutdown
|
// unlock in case of ordinary shutdown
|
||||||
|
@ -40,7 +40,7 @@ func TestElectionWait(t *testing.T) {
|
|||||||
nextc = append(nextc, make(chan struct{}))
|
nextc = append(nextc, make(chan struct{}))
|
||||||
go func(ch chan struct{}) {
|
go func(ch chan struct{}) {
|
||||||
for j := 0; j < leaders; j++ {
|
for j := 0; j < leaders; j++ {
|
||||||
b := concurrency.NewElection(context.TODO(), clus.RandClient(), "test-election")
|
b := concurrency.NewElection(clus.RandClient(), "test-election")
|
||||||
cctx, cancel := context.WithCancel(context.TODO())
|
cctx, cancel := context.WithCancel(context.TODO())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
s, ok := <-b.Observe(cctx)
|
s, ok := <-b.Observe(cctx)
|
||||||
@ -58,7 +58,7 @@ func TestElectionWait(t *testing.T) {
|
|||||||
// elect some leaders
|
// elect some leaders
|
||||||
for i := 0; i < leaders; i++ {
|
for i := 0; i < leaders; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
e := concurrency.NewElection(context.TODO(), clus.RandClient(), "test-election")
|
e := concurrency.NewElection(clus.RandClient(), "test-election")
|
||||||
ev := fmt.Sprintf("electval-%v", time.Now().UnixNano())
|
ev := fmt.Sprintf("electval-%v", time.Now().UnixNano())
|
||||||
if err := e.Campaign(context.TODO(), ev); err != nil {
|
if err := e.Campaign(context.TODO(), ev); err != nil {
|
||||||
t.Fatalf("failed volunteer (%v)", err)
|
t.Fatalf("failed volunteer (%v)", err)
|
||||||
@ -97,7 +97,7 @@ func TestElectionFailover(t *testing.T) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// first leader (elected)
|
// first leader (elected)
|
||||||
e := concurrency.NewElection(context.TODO(), clus.clients[0], "test-election")
|
e := concurrency.NewElection(clus.clients[0], "test-election")
|
||||||
if err := e.Campaign(context.TODO(), "foo"); err != nil {
|
if err := e.Campaign(context.TODO(), "foo"); err != nil {
|
||||||
t.Fatalf("failed volunteer (%v)", err)
|
t.Fatalf("failed volunteer (%v)", err)
|
||||||
}
|
}
|
||||||
@ -115,7 +115,7 @@ func TestElectionFailover(t *testing.T) {
|
|||||||
// next leader
|
// next leader
|
||||||
electedc := make(chan struct{})
|
electedc := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
ee := concurrency.NewElection(context.TODO(), clus.clients[1], "test-election")
|
ee := concurrency.NewElection(clus.clients[1], "test-election")
|
||||||
if eer := ee.Campaign(context.TODO(), "bar"); eer != nil {
|
if eer := ee.Campaign(context.TODO(), "bar"); eer != nil {
|
||||||
t.Fatal(eer)
|
t.Fatal(eer)
|
||||||
}
|
}
|
||||||
@ -132,7 +132,7 @@ func TestElectionFailover(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check new leader
|
// check new leader
|
||||||
e = concurrency.NewElection(context.TODO(), clus.clients[2], "test-election")
|
e = concurrency.NewElection(clus.clients[2], "test-election")
|
||||||
resp, ok = <-e.Observe(cctx)
|
resp, ok = <-e.Observe(cctx)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("could not wait for second election; channel closed")
|
t.Fatalf("could not wait for second election; channel closed")
|
||||||
|
@ -41,7 +41,7 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client)
|
|||||||
lockedC := make(chan *concurrency.Mutex, 1)
|
lockedC := make(chan *concurrency.Mutex, 1)
|
||||||
for i := 0; i < waiters; i++ {
|
for i := 0; i < waiters; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
m := concurrency.NewMutex(context.TODO(), chooseClient(), "test-mutex")
|
m := concurrency.NewMutex(chooseClient(), "test-mutex")
|
||||||
if err := m.Lock(context.TODO()); err != nil {
|
if err := m.Lock(context.TODO()); err != nil {
|
||||||
t.Fatalf("could not wait on lock (%v)", err)
|
t.Fatalf("could not wait on lock (%v)", err)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user